"""Binary Search Tree method. The term `adapted` means two things:
- the probabilities of the tree are computed on the fly, contrary to the `usual` binary search tree where the
probabilities are pre-computed.
- it is adapted to our purpose, that is to the simulation CTMC processes, but the code could be made generic to be
`adapted` to other simulation algorithms
"""
from functools import lru_cache
from itertools import product
import numpy as np
from rpylib.model.levymodel.levymodel import LevyModel
from ..sampling import Sampling
from ..univariate.uniform import Uniform
from ...grid.grid import Coordinates
from ...grid.spatial import CTMCGrid
[docs]class Point:
"""Representation of a point in the state grid"""
__slots__ = ("coordinates", "value")
[docs] def __init__(self, coordinates, value):
"""
:param coordinates: coordinates (x1, x2, x3,...)
:param value: values (y1, y2, y3, ...)
"""
self.coordinates = coordinates
self.value = value
def __repr__(self):
return f"Point({self.coordinates}, {self.value})"
[docs]class BinarySearchTreeAdapted1D(Sampling):
[docs] def __init__(self, model: LevyModel, grid: CTMCGrid, intensity_of_jumps: float):
super().__init__()
self.model = model
self.axis = grid.axes[0]
self.uniform = Uniform()
self.intensity_of_jumps = intensity_of_jumps
self.origin_coordinate = grid.origin_coordinate.value
self._proba_left_axis = 0
if self.intensity_of_jumps > 0:
self._proba_left_axis = (
model.mass(-np.inf, -grid.h / 2) / self.intensity_of_jumps
)
self._coordinates_left_axis = 0, self.origin_coordinate - 1
self._coordinates_right_axis = self.origin_coordinate + 1, len(self.axis) - 1
@lru_cache(maxsize=2**18)
def _compute_probability(self, a, b):
return self.model.mass(a, b) / self.intensity_of_jumps
[docs] def sample(self, size: int = 1) -> np.array:
res = [self.sample_with_u(u) for u in self.uniform.sample(size=size)]
return res
[docs] def sample_with_u(self, u: float):
axis = self.axis
left, right = self._coordinates_left_axis
current_p = u
if u > self._proba_left_axis:
left, right = self._coordinates_right_axis
current_p -= self._proba_left_axis
while left != right:
middle = (left + right) // 2
l, r = left, middle # choose left interval by default
a, b = 0.5 * (axis[max(0, l - 1)] + axis[l]), 0.5 * (
axis[r] + axis[min(len(axis) - 1, r + 1)]
)
p = self._compute_probability(a, b)
if current_p > p:
left = min(right, middle + 1)
current_p -= p
else:
right = middle
state_increment = left - self.origin_coordinate
return state_increment
[docs]class BinarySearchTreeAdapted(Sampling):
[docs] def __init__(self, model: LevyModel, grid: CTMCGrid):
super().__init__()
(
ps,
cs,
is_axis,
precomputed_cum_p_for_axes,
intensity_of_jumps,
) = self._pre_computation(model=model, grid=grid)
self._buckets_probabilities = ps
self._buckets_coordinates = cs
self._cum_ps = np.cumsum(ps)
self._is_axis = is_axis
self._precomputed_cum_p_for_axes = precomputed_cum_p_for_axes
self.intensity_of_jumps = intensity_of_jumps
self.grid = grid
self.model = model
self.uniform = Uniform(high=sum(ps))
@staticmethod
def _pre_computation(model: LevyModel, grid: CTMCGrid):
origin_coordinate = grid.origin_coordinate
h_left = grid.middle(grid.left_point(grid.origin_coordinate), grid.origin)
h_right = grid.middle(grid.origin, grid.right_point(grid.origin_coordinate))
# break each axis into [-R, -h/2] x ]-h/2, h/2[ x [h/2, R]
# and compute the coordinates for these 3 pieces for each axis
# we sort them as ]-h/2, h/2[, [-R, -h/2], [h/2, R] see the next comment
intervals = []
for k, axis in enumerate(grid.axes):
axis_origin_c = origin_coordinate[k]
hl, hr = h_left[k], h_right[k]
cs = (
[axis_origin_c, axis_origin_c],
[0, axis_origin_c - 1],
[axis_origin_c + 1, len(axis) - 1],
)
vs = (hl, hr), (axis[0], hl), (hr, axis[-1])
pts = tuple(
(Point(cl, vl), Point(cr, vr)) for (cl, cr), (vl, vr) in zip(cs, vs)
)
intervals.append(pts)
cartesian_product = product(*intervals)
# discard first set which is [h_l1, h_r1]x[h_l2, h_r2]x...x[h_ln, h_rn]
# we are calculating the measure of all the sets on the complement of this very set
next(cartesian_product)
intensity_of_jumps = 0
masses = []
buckets_coordinates = []
is_cached_axis = []
low_nb_of_pts = len(grid.axes) * len(grid.axes[0]) < 10_001
for c_set in cartesian_product:
a_pts, b_pts = zip(*c_set)
a = [a_pt.value for a_pt in a_pts]
b = [b_pt.value for b_pt in b_pts]
a_c = [a_pt.coordinates for a_pt in a_pts]
b_c = [b_pt.coordinates for b_pt in b_pts]
p = model.mass(a, b)
intensity_of_jumps += p
is_an_axis_and_low_nb_pts = (
low_nb_of_pts and sum(l != r for l, r in zip(a_c, b_c)) == 1
)
masses.append(p)
buckets_coordinates.append(list(zip(a_c, b_c)))
is_cached_axis.append(is_an_axis_and_low_nb_pts)
precomputed_cum_p_for_axes = {}
for k, is_an_axis in enumerate(is_cached_axis):
if is_an_axis:
a_c, b_c = list(zip(*buckets_coordinates[k]))
axis_nb = next(k for k, (l, r) in enumerate(zip(a_c, b_c)) if l != r)
ps = []
pt_list = list(a_c)
for c in range(a_c[axis_nb], b_c[axis_nb] + 1):
pt_list[axis_nb] = c
pt = Coordinates(pt_list)
left_middle_point = grid.middle(grid.left_point(pt), grid[pt])
right_middle_point = grid.middle(grid[pt], grid.right_point(pt))
p = (
model.mass(a=left_middle_point, b=right_middle_point)
/ intensity_of_jumps
)
ps.append(p)
precomputed_cum_p_for_axes[k] = np.cumsum(ps)
probabilities = [mass / intensity_of_jumps for mass in masses]
return (
probabilities,
buckets_coordinates,
is_cached_axis,
precomputed_cum_p_for_axes,
intensity_of_jumps,
)
@lru_cache(maxsize=2**18)
def _compute_probability(self, a, b):
return self.model.mass(a, b) / self.intensity_of_jumps
[docs] def cost(self) -> int:
return self.uniform.cost()
[docs] def reset_sampling_cost(self) -> None:
self.uniform.reset_sampling_cost()
[docs] def sample(self, size: int = 1) -> np.array:
res = self.sample_with_us(self.uniform.sample(size=size))
return res
[docs] def sample_with_us(self, us: np.array):
# find the bucket where to sample the state
bucket_positions = np.searchsorted(self._cum_ps, us)
bucket_coordinates = [
self._buckets_coordinates[bucket_position]
for bucket_position in bucket_positions
]
positions = np.where(bucket_positions > 0)
us[positions] -= np.array(
[self._cum_ps[bucket_positions[position] - 1] for position in positions]
).ravel()
state_increments = []
for these_bucket_coordinates, bucket_position, prob in zip(
bucket_coordinates, bucket_positions, us
):
if self._is_axis[bucket_position]:
# find the position of the state
state_ith_pos = np.searchsorted(
self._precomputed_cum_p_for_axes[bucket_position], prob
)
a_c, b_c = list(zip(*these_bucket_coordinates))
state = tuple(
l if l == r else l + state_ith_pos for l, r in zip(a_c, b_c)
)
state_increment = tuple(
v - o for v, o in zip(state, self.grid.origin_coordinate)
)
else:
state_increment = self.sample_one_bucket(
coordinates=these_bucket_coordinates, probability=prob
)
state_increments.append(state_increment)
return state_increments
[docs] def sample_one_bucket(self, coordinates, probability: float):
result = list(coordinates)
grid = self.grid
axes = grid.axes
current_probability = probability
while any(l != r for l, r in result):
# iterate the binary search successively on each axis
for k, ((left, right), axis) in enumerate(zip(result, axes)):
if left != right:
middle = (right + left) // 2
result[k] = left, middle
a_c, b_c = zip(*result)
a_cc = Coordinates(a_c)
b_cc = Coordinates(b_c)
a = grid.middle(grid.left_point(a_cc), grid[a_cc])
b = grid.middle(grid[b_cc], grid.right_point(b_cc))
p = self._compute_probability(a, b)
if current_probability > p:
result[k] = min(right, middle + 1), right
current_probability -= p
state_increment = tuple(
c[0] - o for c, o in zip(result, grid.origin_coordinate)
)
return state_increment