"""Description of a coupled process for a Lévy-driven SDE
"""
import copy
import numpy as np
from .couplinglevycopula import CouplingProcessLevyCopula
from .couplingmarkovchain import CouplingMarkovChain
from ..markovchain.markovchainsde import MarkovChainSDE, MarkovChainLevyLiborModel
from ...distribution.sampling import SamplingMethod
from ...grid.spatial import CTMCGrid
from ...model.levydrivensde.levydrivensde import LevyDrivenSDEModel
from ...model.levydrivensde.levylibormodel import LevyLiborModel
from ...montecarlo.path import StochasticSDEPath
from ...product.product import Product
[docs]class CouplingSDE:
[docs] def __init__(
self, model: LevyDrivenSDEModel, grid: CTMCGrid, method: SamplingMethod
):
self.level = 0
self.model = model
self.method = method
self.epsilon = grid.h ** model.driver.blumenthal_getoor_index()
# only for level 0 - note that, by design, it must be named self.fine_process
if isinstance(model, LevyLiborModel): # FIXME: not elegant
self.fine_process = MarkovChainLevyLiborModel(
model=model, method=method, grid=grid
)
else:
self.fine_process = MarkovChainSDE(model=model, method=method, grid=grid)
self._process_representation = self.fine_process.process_representation
self._spots = self.fine_process.deterministic_path(np.zeros(shape=1))
if model.dimension_model() == 1:
self.driver_coupling_process = CouplingMarkovChain(
model=model.driver, method=method, grid=grid
)
else:
self.driver_coupling_process = CouplingProcessLevyCopula(
levy_copula_model=model.driver, grid=grid, method=method
)
self.mc_drift_h = None
self.mc_drift_2h = None
[docs] def one_simulation_cost(self, product) -> float:
cost_fine_process = self.fine_process.one_simulation_cost(product=product)
cost_coupling = 0
if self.level > 0:
# for each path, the cost is the cost of simulating the process + the cost of the SDE algorithm
# which is in big O of intensity of jumps and which term is already present in the former one.
cost_coupling = self.driver_coupling_process.one_simulation_cost(
product=product
)
return cost_fine_process + cost_coupling
[docs] def reset_one_simulation_cost(self) -> None:
self.fine_process.reset_one_simulation_cost()
self.driver_coupling_process.reset_one_simulation_cost()
[docs] def initialisation(self, product: Product) -> None:
if self.level == 0:
self.fine_process.initialisation(product=product)
self.mc_drift_h = self.fine_process.markov_chain.process_drift()
else:
self.driver_coupling_process.initialisation(
product=product, max_step_epsilon=self.epsilon
)
[docs] def pre_computation(self, mc_paths: int, product: Product) -> None:
if self.level == 0:
self.fine_process.pre_computation(mc_paths=mc_paths, product=product)
else:
self.driver_coupling_process.pre_computation(
mc_paths=mc_paths, product=product
)
[docs] def simulate_one_path(self) -> object:
return self.fine_process.simulate_one_path()
[docs] def simulate_one_path_with_coupling(self):
mc_path = self.driver_coupling_process.simulate_one_path_with_coupling()
nb_of_jumps = mc_path.jump_times.size
dimension = self.model.dimension()
zi = np.stack((np.array([self.model.x0]).T, np.array([self.model.x0]).T))
z_drift = np.zeros(shape=(2, dimension, nb_of_jumps))
z_diffusion = np.zeros(shape=(2, dimension, nb_of_jumps))
z_jump = np.zeros(shape=(2, dimension, nb_of_jumps))
a = self.model.a
sde_drift = self.fine_process.sde_drift
mc_drift = np.stack(
(np.atleast_2d(self.mc_drift_h), np.atleast_2d(self.mc_drift_2h))
)
for i, (t, dt, dL, dW) in enumerate(
zip(
mc_path.jump_times[:-1],
np.diff(mc_path.jump_times),
np.diff(mc_path.jump_path).T,
np.diff(mc_path.diffusion_path).T,
),
start=1,
):
sde_drift_val = np.stack((sde_drift(t, zi[0]), sde_drift(t, zi[1])))
a_zi = a(t, zi)
d_mu = a_zi @ mc_drift
d_diffusion = a_zi @ np.atleast_2d(dW).T[..., np.newaxis]
d_jump = a_zi @ np.atleast_2d(dL).T[..., np.newaxis]
drift_dt = (sde_drift_val + d_mu) * dt
zi += drift_dt + d_jump + d_diffusion
z_drift[..., i] = drift_dt.reshape((2, dimension))
z_diffusion[..., i] = d_diffusion.reshape((2, dimension))
z_jump[..., i] = d_jump.reshape((2, dimension))
drift = np.cumsum(z_drift, axis=-1)
diffusion = np.cumsum(z_diffusion, axis=-1)
jump = np.cumsum(z_jump, axis=-1)
return StochasticSDEPath(drift, mc_path.jump_times, diffusion, jump)
[docs] def next_level(self, mc_paths, path_managers, product: Product, _: float = None):
self.level += 1
self.epsilon = (
self.driver_coupling_process.grid.h / 2
) ** self.model.driver.blumenthal_getoor_index()
self.mc_drift_2h = copy.deepcopy(self.mc_drift_h)
self.initialisation(product=product)
# note that the grid is refined in the next call:
self.driver_coupling_process.next_level(
mc_paths=mc_paths,
path_managers=None,
product=product,
max_step_epsilon=self.epsilon,
)
self.mc_drift_h = self.driver_coupling_process.fine_process.process_drift()
path_manager_level_l = copy.deepcopy(path_managers[-1])
path_manager_level_l.update(self._process_representation)
# the drift is directly dealt with in the simulation function, hence the following function for both the coarse
# and fine processes:
def _deterministic_path(_: np.array) -> np.array:
return self._spots
def coupling_deterministic_path(times):
return np.array([_deterministic_path(times), _deterministic_path(times)])
path_manager_level_l.deterministic_path = coupling_deterministic_path
path_managers.append(path_manager_level_l)