"""Description of a coupled process for a Lévy copula
The Multi-level Monte-Carlo uses a coupled process defined as a coarse process and a fine process
"""
import copy
from itertools import product
from types import MethodType
import numpy as np
from .helper import create_build_finer_grid_fun
from ..markovchain.markovchainlevycopula import MarkovChainLevyCopula
from ...distribution.sampling import SamplingMethod
from ...distribution.univariate.uniform import Uniform
from ...grid.grid import CoordinateND
from ...grid.spatial import CTMCGrid
from ...model.levycopulamodel import LevyCopulaModel
from ...montecarlo.path import StochasticJumpPath
from ...montecarlo.statistic.statistic import PT
from ...product.payoff import PayoffDates
from ...product.product import Product
[docs]class CouplingProcessLevyCopula:
"""Coupling of a Lévy Copula Markov Chain process"""
[docs] def __init__(
self, levy_copula_model: LevyCopulaModel, grid: CTMCGrid, method: SamplingMethod
):
"""
:param levy_copula_model: Lévy copula to be approximated
:param grid: states grid
:param method: simulation algorithm method
"""
self.level = 0
self.model = levy_copula_model
self.grid = grid
self.method = method
self.fine_process = MarkovChainLevyCopula(
levy_copula_model=levy_copula_model, grid=grid, method=method
)
self._path_coupling_simulation: CouplingLevyCopulaSimulation = None
self._uniform = Uniform()
self._diffusion_matrix_h = None
self._diffusion_matrix_2h = None
[docs] def one_simulation_cost(self, product: Product) -> float:
cost_fine_process = self.fine_process.one_simulation_cost(product=product)
cost_coupling = 0
if self.level > 0:
# at each jump, we need to simulate a uniform for the coupling, and we need to compute the payoff function
# for the coarse process (the latter cost is neglected with regard to the former)
cost_coupling = self.fine_process.intensity_of_jumps
return cost_fine_process + cost_coupling
[docs] def reset_one_simulation_cost(self) -> None:
self._uniform.reset_sampling_cost()
self.fine_process.reset_one_simulation_cost()
[docs] def initialisation(self, product: Product, max_step_epsilon: float = None) -> None:
self.fine_process.initialisation(
product=product, max_step_epsilon=max_step_epsilon
)
self._diffusion_matrix_h = self.fine_process._path_simulation.diffusion_matrix
if max_step_epsilon is not None:
self._path_coupling_simulation = CouplingLevyCopulaSimulationMaximumStep(
self, epsilon=max_step_epsilon
)
elif product.payoff.payoff_dates_type == PayoffDates.DETERMINISTIC:
self._path_coupling_simulation = CouplingLevyCopulaSimulationFixedTimes(
self
)
else:
self._path_coupling_simulation = CouplingLevyCopulaSimulationWithJumpTimes(
self
)
[docs] def pre_computation(self, mc_paths: int, product: Product) -> None:
self.fine_process.pre_computation(mc_paths, product)
self._path_coupling_simulation.pre_computation(mc_paths, product)
[docs] def simulate_one_path(self) -> object:
return self.fine_process.simulate_one_path()
[docs] def simulate_one_path_with_coupling(self):
return self._path_coupling_simulation.simulate_one_path_with_coupling()
[docs] def next_level(
self, mc_paths, path_managers, product: Product, max_step_epsilon: float = None
):
if path_managers is not None:
freeze_spots = copy.deepcopy(
self.fine_process.deterministic_path(np.zeros(shape=1))
)
freeze_process_drift = (
copy.deepcopy(self.fine_process.deterministic_path(np.ones(shape=1)))
- freeze_spots
)
def coarse_deterministic_path(times: np.array) -> np.array:
return freeze_spots + freeze_process_drift * times
self.level += 1
self.grid.refine()
self._diffusion_matrix_2h = self.fine_process._path_simulation.diffusion_matrix
self.fine_process = MarkovChainLevyCopula(
levy_copula_model=self.model, grid=self.grid, method=self.method
)
self.initialisation(product=product, max_step_epsilon=max_step_epsilon)
self.pre_computation(mc_paths=mc_paths, product=product)
self._diffusion_matrix_h = self.fine_process._path_simulation.diffusion_matrix
if path_managers is not None:
fine_deterministic_path = self.fine_process.deterministic_path
path_manager_levell = copy.deepcopy(path_managers[-1])
def coupling_deterministic_path(times):
return np.array(
[fine_deterministic_path(times), coarse_deterministic_path(times)]
)
path_manager_levell.deterministic_path = coupling_deterministic_path
path_managers.append(path_manager_levell)
[docs]class CouplingLevyCopulaSimulation:
[docs] def __init__(self, coupling_process: CouplingProcessLevyCopula):
self.coupling_process = coupling_process
[docs] def pre_computation(self, mc_paths: int, product: Product) -> None:
raise NotImplementedError
[docs] def simulate_diffusion_with_coupling(self, sqrt_dts):
brownian_increments = (
self.coupling_process.fine_process._path_simulation._brownian_increments.popleft()
)
diff_coefficients_fine = (
self.coupling_process._diffusion_matrix_h @ brownian_increments
)
diff_coefficients_coarse = (
self.coupling_process._diffusion_matrix_2h @ brownian_increments
)
scaled_diff_coefficients_fine = sqrt_dts * diff_coefficients_fine
scaled_diff_coefficients_coarse = sqrt_dts * diff_coefficients_coarse
return np.cumsum(scaled_diff_coefficients_fine, axis=1), np.cumsum(
scaled_diff_coefficients_coarse, axis=1
)
[docs] def simulate_jumps_with_coupling(self):
raise NotImplementedError
[docs] def simulate_one_path_with_coupling(self):
raise NotImplementedError
def __coupling_state(
self, increment: tuple[int], axis_coordinates: list[int] = None
):
dim = len(increment)
if axis_coordinates is None:
axis_coordinates = list(range(dim))
grid = self.coupling_process.grid
if len(axis_coordinates) == 0:
return grid[grid.origin_coordinate + increment]
if any(not increment[(j := i)] % 2 for i in axis_coordinates):
# no projection on this axis
axis_coordinates.remove(j)
return self.__coupling_state(increment, axis_coordinates)
else:
# projection on the axis defined by the indices in axis_coordinates
position = grid.origin_coordinate + increment
mass = self.coupling_process.model.mass
value = grid[position]
u = self.coupling_process._uniform.sample()
projected_position = CoordinateND(position[k] for k in axis_coordinates)
projected_value = tuple(value[k] for k in axis_coordinates)
projected_mid_left_value = grid.middle(
grid.left_point(projected_position), projected_value
)
projected_mid_right_value = grid.middle(
projected_value, grid.right_point(projected_position)
)
total_mass = mass(
projected_mid_left_value, projected_mid_right_value, axis_coordinates
)
probability = 0
for p in product([-1, 1], repeat=len(axis_coordinates)):
p_value = grid[projected_position + p]
p_middle_value = grid.middle(p_value, projected_value)
min_max = tuple(
(min(p1, p2), max(p1, p2))
for p1, p2 in zip(projected_value, p_middle_value)
)
p_left_value, p_right_value = zip(*min_max)
p_mass = mass(p_left_value, p_right_value, axis_coordinates)
probability += p_mass / total_mass
if u <= probability:
res = tuple(
p_value[axis_coordinates.index(k)]
if k in axis_coordinates
else value[k]
for k in range(dim)
)
return np.array(res)
raise ValueError(
"couplinglevycopula::__coupling_state -> Numerical error? probability={:6f}, u={:6f}".format(
probability, u
)
)
def _coupling_states_for_a_slice(self, slice_fine_states: np.array):
if len(slice_fine_states):
slice_coupling_values = [None] * len(slice_fine_states)
current_value = np.array(self.coupling_process.grid.origin)
for k, deltaFineState in enumerate(slice_fine_states):
current_value += self.__coupling_state(deltaFineState)
slice_coupling_values[k] = current_value.copy()
else:
slice_coupling_values = [np.array([0.0, 0.0])] * len(slice_fine_states)
return slice_coupling_values
[docs]class CouplingLevyCopulaSimulationFixedTimes(CouplingLevyCopulaSimulation):
[docs] def __init__(self, coupling_process: CouplingProcessLevyCopula):
super().__init__(coupling_process=coupling_process)
self._sqrt_dts = None
self._times = None
[docs] def pre_computation(self, mc_paths: int, product: Product) -> None:
times = product.times_grid()
self._times = times
self._sqrt_dts = np.sqrt(np.diff(times))
[docs] def simulate_jumps_with_coupling(self):
# simulate jump times and jump values of the finer process
fine_mc = (
self.coupling_process.fine_process._path_simulation.simulate_markov_chain()
)
fine_states_increments = fine_mc.states_increments
fines_states_allvalues = fine_mc.values
dim = self.coupling_process.fine_process._dimension
fines_states_values = np.zeros(shape=(dim, len(fine_states_increments) + 1))
coarse_states_values = np.zeros(shape=(dim, len(fine_states_increments) + 1))
for k, (slice_fine_states, slice_fine_values) in enumerate(
zip(fine_states_increments, fines_states_allvalues)
):
if slice_fine_states:
slice_coarse_values = self._coupling_states_for_a_slice(
slice_fine_states
)
fines_states_values[:, k + 1] = slice_fine_values[-1]
coarse_states_values[:, k + 1] = slice_coarse_values[-1]
return fines_states_values, coarse_states_values
[docs] def simulate_one_path_with_coupling(self):
# simulate the jump part first
jumps_h, jumps_2h = self.simulate_jumps_with_coupling()
# simulate the diffusion part
diff_h, diff_2h = self.simulate_diffusion_with_coupling(self._sqrt_dts)
diff = np.zeros(shape=(2, diff_h.shape[0], 1 + diff_h.shape[1]))
diff[PT.FP, :, 1:] = diff_h
diff[PT.CP, :, 1:] = diff_2h
# jumps already have the 0 at t=0
jumps = np.zeros(shape=(2, jumps_h.shape[0], jumps_h.shape[1]))
jumps[PT.FP, ...] = jumps_h
jumps[PT.CP, ...] = jumps_2h
return StochasticJumpPath(self._times, diff, jumps)
[docs]class CouplingLevyCopulaSimulationWithJumpTimes(CouplingLevyCopulaSimulation):
[docs] def __init__(self, coupling_process: CouplingProcessLevyCopula):
super().__init__(coupling_process=coupling_process)
self._maturity = None
self._dimension = self.coupling_process.model.dimension_model()
[docs] def pre_computation(self, mc_paths: int, product: Product) -> None:
self._maturity = product.maturity
[docs] def simulate_diffusion_with_coupling(self, sqrt_dts):
nb = self._dimension * sqrt_dts.size
brownian_increments = np.random.normal(size=nb).reshape(
(self._dimension, sqrt_dts.size)
)
diff_coeff_fine = (
self.coupling_process._diffusion_matrix_h @ brownian_increments
)
diff_coeff_coarse = (
self.coupling_process._diffusion_matrix_2h @ brownian_increments
)
scaled_diff_coeff_fine = sqrt_dts * diff_coeff_fine
scaled_diff_coeff_coarse = sqrt_dts * diff_coeff_coarse
return np.cumsum(scaled_diff_coeff_fine, axis=1), np.cumsum(
scaled_diff_coeff_coarse, axis=1
)
[docs] def simulate_jumps_with_coupling(self):
# simulate jump times and jump values of the finer process
fine_mc = (
self.coupling_process.fine_process._path_simulation.simulate_markov_chain()
)
fine_states_increments = fine_mc.states_increments
fines_states_allvalues = fine_mc.values
jump_times = fine_mc.times
coarse_states_values = np.empty_like(fines_states_allvalues)
for k, (slice_fine_states, slice_fine_values) in enumerate(
zip(fine_states_increments, fines_states_allvalues)
):
if slice_fine_states:
slice_coarse_values = self._coupling_states_for_a_slice(
slice_fine_states
)
coarse_states_values[k] = slice_coarse_values
fines_states_values = np.concatenate(fines_states_allvalues).T
coarse_states_values = np.concatenate(coarse_states_values).T
return jump_times, fines_states_values, coarse_states_values
[docs] def simulate_one_path_with_coupling(self):
# simulate the jump part first
jump_times, jumps_h, jumps_2h = self.simulate_jumps_with_coupling()
jump_times = np.concatenate(([0.0], +jump_times, [self._maturity]))
final_fine_jump = (
np.zeros(self._dimension)
if jumps_h.size == 0
else np.array([jp[-1] for jp in jumps_h])
)
final_coarse_jump = (
np.zeros(self._dimension)
if jumps_2h.size == 0
else np.array([jp[-1] for jp in jumps_2h])
)
if jumps_h.size > 0:
jumps_h = np.concatenate(
(
np.zeros(self._dimension)[:, np.newaxis],
jumps_h,
final_fine_jump[:, np.newaxis],
),
axis=1,
)
else:
jumps_h = np.zeros(shape=(self._dimension, 2))
if jumps_2h.size > 0:
jumps_2h = np.concatenate(
(
np.zeros(self._dimension)[:, np.newaxis],
jumps_2h,
final_coarse_jump[:, np.newaxis],
),
axis=1,
)
else:
jumps_2h = np.zeros(shape=(self._dimension, 2))
diff_h, diff_2h = self.simulate_diffusion_with_coupling(
np.sqrt(np.diff(jump_times))
)
diff = np.zeros(shape=(2, diff_h.shape[0], 1 + diff_h.shape[1]))
diff[PT.FP, :, 1:] = diff_h
diff[PT.CP, :, 1:] = diff_2h
# jumps already have the 0 at t=0
jumps = np.zeros(shape=(2, jumps_h.shape[0], jumps_h.shape[1]))
jumps[PT.FP, ...] = jumps_h
jumps[PT.CP, ...] = jumps_2h
return StochasticJumpPath(jump_times, diff, jumps)
[docs]class CouplingLevyCopulaSimulationMaximumStep(
CouplingLevyCopulaSimulationWithJumpTimes
):
[docs] def __init__(self, coupling_process, epsilon: float):
super().__init__(coupling_process=coupling_process)
self.epsilon = epsilon
[docs] def pre_computation(self, mc_paths: int, product: Product) -> None:
super().pre_computation(mc_paths=mc_paths, product=product)
build_finer_grid = create_build_finer_grid_fun(
epsilon=self.epsilon, maturity=product.maturity
)
self.build_finer_grid = MethodType(build_finer_grid, self)
[docs] def simulate_jumps_with_coupling(self):
(
jump_times,
fine_all_values,
coarse_all_values,
) = super().simulate_jumps_with_coupling()
if jump_times.size == 0:
return jump_times, fine_all_values, coarse_all_values
else:
return self.build_finer_grid(jump_times, fine_all_values, coarse_all_values)