"""One-dimensional CTMC implementation"""
import copy
import numpy as np
from ...distribution.sampling import SamplingMethod
from ...distribution.samplingfactory import (
create_sampling_method,
compute_intensity_of_jumps,
)
from ...grid.spatial import CTMCGrid
from ...model.levycopulamodel import LevyCopulaModel
from ...model.levymodel.levymodel import LevyModel, LevyRepresentation
from ...process.levyprocess import (
LevyProcess,
SimulationFixedTimes,
SimulationWithJumpTimes,
SimulationMaximumStep,
simulate_diffusion_with_brownian_increments,
)
from ...product.payoff import PayoffDates
from ...product.product import Product
[docs]def vol_adjustment(model: LevyModel, h: float):
"""For processes with infinite variation, the small jumps of size less than h are approximated by a Brownian motion.
This function computes the corresponding coefficient.
:param model: Lévy model
:param h: spatial step h
"""
value = 0
if not model.levy_triplet.nu.jump_of_finite_variation():
a = max(-h / 2, -1)
b = min(h / 2, 1)
c_h = model.levy_triplet.nu.integrate_against_xx(a, b)
value = np.sqrt(c_h)
return value
[docs]def compute_mu_h(levy_measure, grid: CTMCGrid, axis: np.array, origin: int) -> float:
"""Computation of the drift mu_h of the CTMC
:param levy_measure: Lévy measure of the process
:param grid: states grid
:param axis: axis
:param origin: origin coordinate
:return: the drift mu_h
"""
integral = levy_measure.integrate
middle = grid.middle
right_point = grid.right_point
mu_h = 0
mid_point_left = middle(grid.left_point(0), axis[0])
for position, xi in enumerate(axis):
if position != origin:
mid_point_right = middle(xi, right_point(position))
mu_h += np.array(xi) * integral(mid_point_left, mid_point_right)
mid_point_left = mid_point_right
else:
mid_point_left = middle(grid.left_point(origin + 1), axis[origin + 1])
return mu_h
[docs]class MarkovChain:
"""Markov Chain object storing the simulation times, its values and state increments."""
__slots__ = ("times", "values", "states_increments")
[docs] def __init__(
self, times: np.array, values: list[np.array], states_increments: list[np.array]
):
"""
:param times: simulation times
:param values: Markov chain values
:param states_increments: Markov chain increments
"""
self.times = times
self.values = values
self.states_increments = states_increments
[docs]class MarkovChainProcess(LevyProcess):
"""Description of a Markov-Chain process approximating a Lévy model over spatial states axes
:param model: Lévy model
:param grid: states grid
:param method: sampling method to simulate the discrete states
"""
[docs] def __init__(self, model: LevyModel, method: SamplingMethod, grid: CTMCGrid):
model_tilde = copy.deepcopy(model)
model_tilde.truncate_levy_measure(truncations=grid.truncations[0])
model_tilde.levy_triplet.set_representation(LevyRepresentation.TILDE)
super().__init__(model_tilde)
self.grid = grid
is_levy_copula = isinstance(model_tilde, LevyCopulaModel)
self.intensity_of_jumps = compute_intensity_of_jumps(
model=model_tilde, grid=grid
)
self.sampling = create_sampling_method(
model=model_tilde,
levy_measure=model_tilde.levy_triplet.nu,
method=method,
grid=grid,
is_levy_copula=is_levy_copula,
intensity_of_jumps=self.intensity_of_jumps,
)
vol_adj = vol_adjustment(model_tilde, grid.h)
self.equivalent_diffusion_coefficient = np.sqrt(
model_tilde.diffusion_coefficient() ** 2 + vol_adj**2
)
self._process_drift = None
self._path_simulation: MCSimulation = MCSimulation()
[docs] def process_drift(self) -> np.array:
return self._process_drift
[docs] def intensity(self) -> float:
return self.intensity_of_jumps
[docs] def one_simulation_cost(self, product) -> float:
cost_markov_chain = np.log(self.grid.number_of_points())
cost_simulation = self.intensity_of_jumps
return cost_simulation * (self.model.dimension() + cost_markov_chain)
[docs] def reset_one_simulation_cost(self) -> None:
pass
[docs] def initialisation(self, product: Product, max_step_epsilon: float = None) -> None:
if max_step_epsilon is not None:
self._path_simulation = MCSimulationMaximumStep(
self, epsilon=max_step_epsilon
)
elif product.payoff.payoff_dates_type == PayoffDates.DETERMINISTIC:
self._path_simulation = MCSimulationFixedTimes(self)
else:
self._path_simulation = MCSimulationWithJumpTimes(self)
levy_measure = self.model.levy_triplet.nu
mu_h = compute_mu_h(
levy_measure=self.model.levy_triplet.nu,
grid=self.grid,
axis=self.grid.axes[0],
origin=self.grid.origin_coordinate.value,
)
v = 0.0 if self.model.jump_of_finite_variation() else 1.0
mu_tilde = levy_measure.integrate_against_x(
-np.inf, -v
) + levy_measure.integrate_against_x(v, np.inf)
self._process_drift = (
self.model.drift() + self.model.levy_triplet.a + mu_tilde - mu_h
)
[docs]class MCSimulation:
"""Method to simulation the Markov Chain via Monte-Carlo"""
[docs] def simulate_markov_chain(self) -> MarkovChain:
raise NotImplementedError
[docs] def simulate_jumps(self):
raise NotImplementedError
[docs] @staticmethod
def helper_simulate_markov_chain(
grid, sampling, all_nb_of_jumps
) -> tuple[list[np.array], list[np.array]]:
"""simulate one path of the non-deterministic part of the underlying
:param grid: states grid
:param sampling: sampling method
:param all_nb_of_jumps: number of jumps for each time interval
"""
values = [np.empty(shape=nb_of_jumps) for nb_of_jumps in all_nb_of_jumps]
all_states_increments = [
np.empty(shape=nb_of_jumps) for nb_of_jumps in all_nb_of_jumps
]
pivot_position = grid.origin_coordinate
for k, nb_of_jumps in enumerate(all_nb_of_jumps):
states_increments = sampling(size=nb_of_jumps)
values[k] = np.cumsum(
[grid[pivot_position + increment] for increment in states_increments]
)
all_states_increments[k] = states_increments
return values, all_states_increments
[docs]class MCSimulationFixedTimes(MCSimulation, SimulationFixedTimes):
"""Simulation of the Markov Chain at (pre-defined) fixed times"""
[docs] def __init__(self, process: MarkovChainProcess):
SimulationFixedTimes.__init__(self, process)
self._grid = process.grid
self._sampling = process.sampling.sample
[docs] def simulate_markov_chain(self) -> MarkovChain:
"""simulate one path of the non-deterministic part of the underlying"""
all_nb_of_jumps = self._poisson_rv.popleft()
values, all_states_increments = self.helper_simulate_markov_chain(
self._grid, self._sampling, all_nb_of_jumps
)
return MarkovChain(self._times[1:], values, all_states_increments)
[docs] @staticmethod
def project(values):
definitive_values = np.zeros(shape=len(values), dtype=float)
for k, sliceStates in enumerate(values):
if sliceStates.shape[0]:
definitive_values[k] = sliceStates[-1]
return definitive_values
[docs] def simulate_jumps(self):
mc = self.simulate_markov_chain()
return self.project(mc.values)
[docs] def simulate_diffusion(self, sqrt_dts):
stddev = sqrt_dts * self.process.equivalent_diffusion_coefficient
return simulate_diffusion_with_brownian_increments(
stddev, self._brownian_increments.popleft()
)
[docs]class MCSimulationWithJumpTimes(MCSimulation, SimulationWithJumpTimes):
"""Simulation of the Markov Chain at stochastic jump times"""
[docs] def __init__(self, process: MarkovChainProcess):
SimulationWithJumpTimes.__init__(self, process)
self._grid = process.grid
self._sampling = process.sampling.sample
[docs] def simulate_markov_chain(self) -> MarkovChain:
"""simulate one path of the non-deterministic part of the underlying"""
jump_times = np.array([])
all_nb_of_jumps = []
jump_times_simulation = self.process.jump_times_from_nb_of_jumps
nb_jump_dt = self.process.nb_jump_dt
for k, (tp, tm) in enumerate(zip(self._times[1:], self._times)):
dt = tp - tm
new_jumps = tm + jump_times_simulation(dt, nb_jump_dt(dt))
jump_times = np.append(jump_times, new_jumps)
all_nb_of_jumps.append(new_jumps.size)
values, all_states_increments = self.helper_simulate_markov_chain(
self._grid, self._sampling, all_nb_of_jumps
)
return MarkovChain(jump_times, values, all_states_increments)
[docs] def simulate_jumps(self):
mc = self.simulate_markov_chain()
jump_values = np.concatenate(mc.values).ravel().astype(float)
jump_times = mc.times
return jump_times, jump_values
[docs] def simulate_diffusion(self, sqrt_dts):
stddev = sqrt_dts * self.process.equivalent_diffusion_coefficient
brownian_increments = np.random.normal(size=stddev.size)
return simulate_diffusion_with_brownian_increments(stddev, brownian_increments)
[docs]class MCSimulationMaximumStep(MCSimulationWithJumpTimes, SimulationMaximumStep):
"""Simulation of the Markov Chain at stochastic jump times with maximum time step of size epsilon"""
[docs] def __init__(self, process: MarkovChainProcess, epsilon: float):
MCSimulationWithJumpTimes.__init__(self, process)
SimulationMaximumStep.__init__(self, process, epsilon)
[docs] def simulate_jumps(self):
jump_times, jump_values = super().simulate_jumps()
if jump_times.size == 0:
aug_jump_times = jump_times
aug_jump_values = jump_values
else:
aug_jump_times, aug_jump_values = self.build_finer_grid(
jump_times, jump_values
)
return aug_jump_times, aug_jump_values