Source code for rpylib.process.markovchain.markovchainlevycopula

"""
Lévy copula simulated through a Markov Chain process

"""

import copy

import numpy as np
import pathos.multiprocessing as mp
import scipy.integrate
import scipy.linalg

from rpylib.distribution.sampling import SamplingMethod
from rpylib.distribution.samplingfactory import (
    create_sampling_method,
    compute_intensity_of_jumps,
)
from rpylib.grid.spatial import CTMCGrid
from rpylib.model.levycopulamodel import LevyCopulaModel
from rpylib.model.levymodel.levymodel import LevyRepresentation
from rpylib.montecarlo.path import StochasticJumpPath, StochasticPath
from rpylib.numerical.tools import sign
from rpylib.process.levyprocess import LevyProcess
from rpylib.process.levyprocess import (
    SimulationFixedTimes,
    SimulationWithJumpTimes,
    SimulationMaximumStep,
)
from rpylib.process.markovchain.markovchain import MarkovChain, compute_mu_h
from rpylib.product.payoff import PayoffDates
from rpylib.product.product import Product


[docs]def vol_adjustment_ij(i: int, j: int, h: float, levy_model: LevyCopulaModel): """For processes with infinite variation, the small jumps of size less than h are approximated by a Brownian motion. This function computes the corresponding diffusion matrix. :param i: i-th coordinate :param j: j-th coordinate :param levy_model: Lévy copula model :param h: spatial step h .. seealso:: function :func:`vol_adjustment` """ V = 0.0 if levy_model.jump_of_finite_variation() else 1.0 a = [max(-V, -h / 2)] * levy_model.dimension() b = [min(+V, +h / 2)] * levy_model.dimension() Ah = list(zip(a, b)) mass = levy_model.mass def to_integrate_ii(*s): if (s_i := s[i]) == 0: return 0 else: a[i], b[i] = (s_i, h / 2) if s_i > 0 else (-h / 2, s_i) val = sign(s_i) * s_i * mass(a=a, b=b) return val def to_integrate_ij(*s): s_i, s_j = s[i], s[j] if s_i == 0 or s_j == 0: return 0 else: a[i], b[i] = (s_i, h / 2) if s_i > 0 else (-h / 2, s_i) a[j], b[j] = (s_j, h / 2) if s_j > 0 else (-h / 2, s_j) val = sign(s_i) * sign(s_j) * mass(a=a, b=b) return val options = {"epsabs": 1e-3} if i == j: root = scipy.integrate.nquad(func=to_integrate_ii, ranges=Ah, opts=options) res = root[0] * 2 / h ** (levy_model.dimension() - 1) else: root = scipy.integrate.nquad(func=to_integrate_ij, ranges=Ah, opts=options) res = root[0] / h ** (levy_model.dimension() - 2) return res
[docs]class MarkovChainLevyCopula(LevyProcess): """Lévy copula simulated via a Markov Chain process"""
[docs] def __init__( self, levy_copula_model: LevyCopulaModel, grid: CTMCGrid, method: SamplingMethod ): """ :param levy_copula_model: Lévy copula model :param grid: states grid :param method: sampling method """ model_tilde = copy.deepcopy(levy_copula_model) model_tilde.truncate_levy_measure(truncations=grid.truncations) for model in model_tilde.models: model.levy_triplet.set_representation(LevyRepresentation.TILDE) super().__init__(model=model_tilde) self.grid = grid self.method = method self.intensity_of_jumps = compute_intensity_of_jumps( model=model_tilde, grid=grid ) self.sampling = create_sampling_method( model=model_tilde, levy_measure=None, method=method, grid=grid, is_levy_copula=True, intensity_of_jumps=self.intensity_of_jumps, ) self._dimension = model_tilde.dimension() self._h = grid.h self._process_drift = None self._path_simulation: MCLevyCopulaSimulation = MCLevyCopulaSimulation( process=self )
[docs] def process_drift(self) -> np.array: return self._process_drift
[docs] def intensity(self) -> float: return self.intensity_of_jumps
[docs] def one_simulation_cost(self, product) -> float: cost_markov_chain = np.log(self.grid.number_of_points()) cost_simulation = self.intensity_of_jumps return cost_simulation * (self.model.dimension() + cost_markov_chain)
[docs] def reset_one_simulation_cost(self) -> None: super().reset_one_simulation_cost() self.sampling.reset_sampling_cost()
[docs] def initialisation(self, product: Product, max_step_epsilon: float = None) -> None: if max_step_epsilon is not None: self._path_simulation = MCLevyCopulaSimulationMaximumStep( self, max_step_epsilon ) elif product.payoff.payoff_dates_type == PayoffDates.DETERMINISTIC: self._path_simulation = MCLevyCopulaSimulationFixedTimes(self) else: self._path_simulation = MCLevyCopulaSimulationWithJumpTimes(self) models = self.model.models mu_h = np.array( [ [ compute_mu_h( levy_measure=model.levy_triplet.nu, grid=self.grid, axis=self.grid.axes[k], origin=self.grid.origin_coordinate.value[k], ) for k, model in enumerate(models) ] ] ).T V = 0.0 if self.model.jump_of_finite_variation() else 1.0 mu_tilde = np.array( [ [ model.levy_triplet.nu.integrate_against_x(-np.inf, -V) + model.levy_triplet.nu.integrate_against_x(V, np.inf) for model in models ] ] ).T a_drift = np.array([[model.levy_triplet.a for model in models]]).T self._process_drift = self.model.drift() + a_drift + mu_tilde - mu_h
[docs]class MCLevyCopulaSimulation: """Simulation method of Markov Chain process for Lévy copulas"""
[docs] def __init__(self, process: MarkovChainLevyCopula): self.process = process models = process.model.models model = process.model dimension = model.dimension() h = process.grid.h model_variance = np.diag([m.diffusion_coefficient() ** 2 for m in models]) adj_matrix = np.zeros_like(model_variance) if not model.jump_of_finite_variation(): nb_of_processes = ((dimension + 1) * dimension) // 2 with mp.Pool(processes=nb_of_processes) as pool: res = [ pool.apply_async(vol_adjustment_ij, args=(i, j, h, model)) for i in range(dimension) for j in range(i, dimension) ] outputs = iter([p.get() for p in res]) for i in range(dimension): adj_matrix[i, i] = next(outputs) for j in range(i + 1, dimension): adj_matrix[i, j] = adj_matrix[j, i] = next(outputs) variance_matrix = np.dot(adj_matrix, adj_matrix.T) + model_variance diffusion_matrix = scipy.linalg.sqrtm(variance_matrix) self.diffusion_matrix = diffusion_matrix
[docs] def simulate_markov_chain(self) -> MarkovChain: raise NotImplementedError
[docs] def simulate_jumps(self): raise NotImplementedError
[docs] def helper_simulate_levy_copula_markov_chain( self, all_nb_of_jumps ) -> tuple[list[np.array], list[np.array]]: """simulate one path of the non-deterministic part of the underlying""" all_values = [np.empty(shape=nb_of_jumps) for nb_of_jumps in all_nb_of_jumps] all_states_increments = [ np.empty(shape=nb_of_jumps) for nb_of_jumps in all_nb_of_jumps ] grid = self.process.grid sample = self.process.sampling.sample pivot_position = grid.origin_coordinate for k, nb_of_jumps in enumerate(all_nb_of_jumps): states_increments = sample(size=nb_of_jumps) xy = ( grid[pivot_position + state_increment] for state_increment in states_increments ) all_values[k] = np.cumsum(np.array(list(xy)), axis=0) all_states_increments[k] = states_increments return all_values, all_states_increments
[docs] def helper_simulate_diffusion_part(self, sqrt_time_increments, brownian_increments): diff_coefficient = self.diffusion_matrix @ brownian_increments scaled_diff_coefficient = sqrt_time_increments * diff_coefficient return np.cumsum(scaled_diff_coefficient, axis=1)
[docs]class MCLevyCopulaSimulationFixedTimes(MCLevyCopulaSimulation, SimulationFixedTimes): """Simulation of a Markov Chain for Lévy copula at (pre-defined) fixed times"""
[docs] def __init__(self, process: MarkovChainLevyCopula): MCLevyCopulaSimulation.__init__(self, process) SimulationFixedTimes.__init__(self, process) self._dimension = self.process.model.dimension()
[docs] def simulate_one_path(self) -> StochasticPath: # simulate the jump values simulated_jumps = self.simulate_jumps() jumps = np.hstack( (np.zeros(self._dimension)[:, np.newaxis], simulated_jumps[:, np.newaxis]) ) # simulate the diffusion part simulated_diffusion = self.simulate_diffusion_part() diffusion = np.hstack( (np.zeros(self._dimension)[:, np.newaxis], simulated_diffusion) ) return StochasticJumpPath(self._times, diffusion, jumps)
[docs] def simulate_markov_chain(self) -> MarkovChain: """simulate one path of the non-deterministic part of the underlying""" all_nb_of_jumps = self._poisson_rv.popleft() values, all_states_increments = self.helper_simulate_levy_copula_markov_chain( all_nb_of_jumps ) return MarkovChain(self._times[1:], values, all_states_increments)
[docs] @staticmethod def project(values, dim): zero = (0.0,) * dim definitive_values = ( sliceStates[-1] if sliceStates.size else zero for sliceStates in values ) return np.array(*definitive_values)
[docs] def simulate_jumps(self): mc = self.simulate_markov_chain() return self.project(mc.values, self._dimension)
[docs] def simulate_diffusion_part(self): brownian_increments = self._brownian_increments.popleft() return self.helper_simulate_diffusion_part(self._sqrt_dts, brownian_increments)
[docs]class MCLevyCopulaSimulationWithJumpTimes( MCLevyCopulaSimulation, SimulationWithJumpTimes ): """Simulation of a Markov Chain for Lévy copula at the stochastic jump times"""
[docs] def __init__(self, process: MarkovChainLevyCopula): MCLevyCopulaSimulation.__init__(self, process) SimulationWithJumpTimes.__init__(self, process) self._dimension = self.process.model.dimension()
[docs] def simulate_one_path(self) -> StochasticPath: # simulate the jump values jump_times, jump_values = self.simulate_jumps() # add values for t=0 and t=maturity jump_times = np.concatenate(([0.0], +jump_times, [self._maturity])) final_jumps = ( np.zeros(self._dimension) if jump_values.size == 0 else np.array([jp[-1] for jp in jump_values]) ) if jump_values.size > 0: jumps = np.concatenate( ( np.zeros(self._dimension)[:, np.newaxis], jump_values, final_jumps[:, np.newaxis], ), axis=1, ) else: jumps = np.zeros(shape=(self._dimension, 2)) simulated_diffusion = self.simulate_diffusion_part(np.sqrt(np.diff(jump_times))) diffusion = np.concatenate( (np.zeros(self._dimension)[:, np.newaxis], simulated_diffusion), axis=1 ) return StochasticJumpPath(jump_times, diffusion, jumps)
[docs] def simulate_markov_chain(self) -> MarkovChain: """simulate one path of the non-deterministic part of the underlying""" jump_times = np.array([]) all_nb_of_jumps = [] jump_times_simulation = self.process.jump_times_from_nb_of_jumps nb_jump_dt = self.process.nb_jump_dt for k, (tp, tm) in enumerate(zip(self._times[1:], self._times)): dt = tp - tm new_jumps = jump_times_simulation(dt, nb_jump_dt(dt)) + tm jump_times = np.append(jump_times, new_jumps) all_nb_of_jumps.append(new_jumps.size) values, all_states_increments = self.helper_simulate_levy_copula_markov_chain( all_nb_of_jumps ) return MarkovChain(jump_times, values, all_states_increments)
[docs] def simulate_jumps(self): mc = self.simulate_markov_chain() jump_values = np.concatenate(mc.values, axis=-1).T jump_times = mc.times return jump_times, jump_values
[docs] def simulate_diffusion_part(self, sqrt_time_increments): nb = self._dimension * sqrt_time_increments.size brownian_increments = np.random.normal(size=nb).reshape( (self._dimension, sqrt_time_increments.size) ) return self.helper_simulate_diffusion_part( sqrt_time_increments, brownian_increments )
[docs]class MCLevyCopulaSimulationMaximumStep( MCLevyCopulaSimulationWithJumpTimes, SimulationMaximumStep ): """Simulation of a Markov Chain for Lévy copula at the stochastic jump times with maximum time increment of size epsilon"""
[docs] def __init__(self, process: MarkovChainLevyCopula, epsilon: float): MCLevyCopulaSimulationWithJumpTimes.__init__(self, process) SimulationMaximumStep.__init__(self, process, epsilon)
[docs] def simulate_jumps(self): jump_times, jump_values = MCLevyCopulaSimulationWithJumpTimes.simulate_jumps( self ) if jump_times.size == 0: return jump_times, jump_values else: return self.build_finer_grid(jump_times, jump_values)