Source code for rpylib.montecarlo.multilevel.engine

"""Multilevel Monte-Carlo engine

     .. note::
        - the discounting is deterministic by design
        - the implementation supports multiprocessing
"""

import copy
import logging

import numpy as np
import pathos.multiprocessing as mp
from tqdm import tqdm

from ..configuration import ConfigurationMultiLevel
from ..path import create_path, MLMCPath
from ..statistic.statistic import create_mlmc_statistics, MLMCStatistics
from ...numerical.cosmethod import COSPricer
from ...process.coupling.couplingprocess import CouplingProcess
from ...process.markovchain.markovchainlevycopula import MarkovChainLevyCopula
from ...process.markovchain.markovchainsde import MarkovChainSDE
from ...product.product import Product


[docs]def helper_create_fun(this_cos_pricer, maturity): def fun(s): return this_cos_pricer.density(time=maturity, s=s) return fun
[docs]class Engine: """Multilevel Monte-Carlo engine"""
[docs] def __init__( self, configuration: ConfigurationMultiLevel, coupling_process: CouplingProcess ): """The Multilevel Monte-Carlo relies on the simulation of the fine and coarse processes linked by a coupling. :param configuration: Multilevel Monte-Carlo configuration :param coupling_process: stochastic coupling process of the fine and coarse processes """ self.configuration = configuration self.coupling_process = coupling_process self.path_managers: [MLMCPath] = [] self.statistics: MLMCStatistics = None
[docs] def initialisation(self, product: Product) -> None: """ :param product: financial product to price """ product.payoff_underlying.check_consistency( process_dimension=self.coupling_process.model.dimension_model() ) maturity = product.maturity product.update(self.coupling_process.fine_process.process_representation) self.coupling_process.initialisation(product) self.configuration.initialisation(product) path_manager = create_path( self.configuration, self.coupling_process.fine_process.deterministic_path ) self.path_managers.append(path_manager) self.coupling_process.pre_computation( mc_paths=self.configuration.initial_mc_paths, product=product ) underlying_density = None if not ( isinstance(self.coupling_process.fine_process, MarkovChainLevyCopula) or isinstance(self.coupling_process.fine_process, MarkovChainSDE) ): cos_pricer = COSPricer(self.coupling_process.model) underlying_density = [helper_create_fun(cos_pricer, maturity)] if isinstance(self.coupling_process.fine_process, MarkovChainLevyCopula): lcm = self.coupling_process.model cos_pricers = [COSPricer(model) for model in lcm.models] underlying_density = [ helper_create_fun(cos_pricer, maturity) for cos_pricer in cos_pricers ] self.statistics = create_mlmc_statistics( mc_paths=self.configuration.initial_mc_paths, underlying_density=underlying_density, initial_level=self.configuration.initial_level, control_variates=self.configuration.control_variates, payoff_dimension=product.payoff.dimension(), process_representation=self.coupling_process.model.process_representation, )
[docs] def compute_level_l( self, level: int, current_mc_paths: int, extra_mc_paths: int, coupling_process: CouplingProcess, product: Product, df: float, statistics: MLMCStatistics, ) -> None: """Run the Multilevel Monte-Carlo for the level l :param level: current level :param current_mc_paths: number of Monte-Carlo paths applied to the level l :param extra_mc_paths: extra number of paths to be applied to the level l :param coupling_process: coupling process defining the coupling between the fine and coarse processes :param product: product to price :param df: discount factor function :param statistics: statistics object """ path_manager = self.path_managers[level] cv = self.configuration.control_variates if level == 0: simulation_path = coupling_process.simulate_one_path path_manager_process = path_manager.process_l0 else: simulation_path = coupling_process.simulate_one_path_with_coupling path_manager_process = path_manager.process nb_of_processes = self.configuration.nb_of_processes if nb_of_processes == 1: # single process version self.configuration.initialisation_seed() for iteration in range(extra_mc_paths): simulated_path = simulation_path() path_manager.set_to_path(simulated_path) path_manager_process(product, cv) path_manager.discount(df) statistics.add(current_mc_paths + iteration, level, path_manager) # payoff -> [pl, plm], dp = pl - plm // pl: fine process, plm: coarse process else: description = "Computing level=" + str(level) def callback(res): for it, mp_simulated_path in res: path_manager.set_to_path(mp_simulated_path) path_manager_process(product, cv) path_manager.discount(df) statistics.add(current_mc_paths + it, level, path_manager) # payoff -> [pl, plm], dp = pl - plm // pl: fine process, plm: coarse process def simulating_one_path(it): return it, simulation_path() def initializer(): return self.configuration.initialisation_seed( nb_of_processes is None or nb_of_processes > 1 ) with mp.Pool(processes=nb_of_processes, initializer=initializer) as pool: pool.map_async( simulating_one_path, tqdm(range(extra_mc_paths), desc=description, leave=True), callback=callback, ).get() cv.compute_coefficients_mlmc(statistics.mc_statistics[level], level=level)
[docs] def price(self, product: Product, rmse: float) -> MLMCStatistics: """Pricing of the product by the Multilevel Monte-Carlo engine :param product: product to price :param rmse: root-mean square error """ self.initialisation(product) for path_manager in self.path_managers: path_manager.update( self.coupling_process.fine_process.process_representation ) df = self.coupling_process.fine_process.df(product.maturity) N0 = self.configuration.initial_mc_paths L = self.configuration.initial_level level_max = self.configuration.maximum_level cr = self.configuration.convergence_rates alpha_0, beta_0, gamma_0 = cr.alpha, cr.beta, cr.gamma alpha = 0 if alpha_0 is None else alpha_0 beta = 0 if beta_0 is None else beta_0 gamma = 0 if gamma_0 is None else gamma_0 ml_processes = [copy.deepcopy(self.coupling_process)] Nl = np.zeros(shape=L + 1, dtype=int) dNl = N0 * np.ones(shape=L + 1, dtype=int) sum_cost = np.zeros(shape=L + 1) statistics = self.statistics # use linear regression to estimate alpha, beta and gamma if not given def log2_regression(regress_to, max_val=0.5): mat = np.ones((L, 2)) mat[:, 0] = range(1, L + 1) with np.errstate( divide="ignore" ): # ignore 'divide by zero' warning as this is managed in 'res' x = np.linalg.lstsq(mat, np.log2(regress_to[1:]), rcond=None)[0] res = max(max_val, -x[0]) return res while np.sum(dNl) > 0: for level in range(L + 1): if level > 0: if ( len(ml_processes) < level + 1 ): # create it by copying the process at level l-1 as starting point logging.info("mlmc: adding level=" + str(level)) ml_process_level_l = copy.deepcopy(ml_processes[-1]) ml_process_level_l.next_level( Nl[level], self.path_managers, product ) ml_processes.append(ml_process_level_l) ml_processes[level].reset_one_simulation_cost() ml_processes[level].pre_computation( mc_paths=dNl[level], product=product ) self.compute_level_l( level, Nl[level], dNl[level], ml_processes[level], product, df, statistics, ) Nl[level] += dNl[level] sum_cost[level] += ( ml_processes[level].one_simulation_cost(product=product) * dNl[level] ) # compute absolute average and variance self.statistics.set_mlmc_results(Nl, sum_cost) ml = self.statistics.mlmc_results.ml vl = self.statistics.mlmc_results.vl cl = self.statistics.mlmc_results.cl # work-around for possible zero values for level in range(3, L + 1): ml[level] = np.maximum(ml[level], 0.5 * ml[level - 1] / 2**alpha) vl[level] = np.maximum(vl[level], 0.5 * vl[level - 1] / 2**beta) # if the convergence rates are not passed by the user, then they are estimated by linear regression if alpha_0 is None: alpha = log2_regression(ml) if beta_0 is None: beta = log2_regression(vl) if gamma_0 is None: gamma = log2_regression(cl) # set optimal number of additional paths Ns = self.configuration.convergence_criteria.compute_mc_paths(rmse, vl, cl) if Ns[0] > 10_000_000: logging.warning( "Be patient, more than 10M of paths are used for the first level in the MLMC" ) if Ns[0] > 50_000_000: logging.error( "The number of paths for the first level in the MLMC is more than 50M," "the rmse is probably to small in this case" ) dNl = np.maximum(0, Ns - Nl) if np.sum(dNl[dNl > 0.01 * Nl]) == 0: # test for convergence has_converged = self.configuration.convergence_criteria.criteria( alpha, ml, rmse ) if has_converged or L == level_max: self.statistics.set_mlmc_results(Nl=Nl, sum_cost=sum_cost) return self.statistics L += 1 Nl = np.append(Nl, 1) vl = np.append(vl, vl[-1] / (2**beta)) cl = np.append(cl, cl[-1] * (2**gamma)) # optimal Nl Ns = self.configuration.convergence_criteria.compute_mc_paths( rmse, vl, cl ) dNl = np.maximum(0, Ns - Nl) sum_cost = np.append(sum_cost, 0.0) next_process = copy.deepcopy(ml_processes[-1]) next_process.reset_one_simulation_cost() next_process.next_level(dNl[-1], self.path_managers, product=product) ml_processes.append(next_process) self.statistics.extend(Nl + dNl) logging.warning("Initial number of Monte-Carlo paths is probably too low") self.statistics.set_mlmc_results(Nl=Nl, sum_cost=sum_cost) return self.statistics
[docs] def price_with_constant_mc_paths_and_level(self, product) -> MLMCStatistics: """Same as the :func:`price` function but with constant number of paths and a fixed number of levels :param product: product to price """ mc_paths = self.configuration.initial_mc_paths max_level = self.configuration.maximum_level self.initialisation(product) for path_manager in self.path_managers: path_manager.update( self.coupling_process.fine_process.process_representation ) df = self.coupling_process.fine_process.df(product.maturity) ml_process_level_l = copy.deepcopy(self.coupling_process) sum_cost = np.zeros(shape=max_level + 1) self.statistics.extend([mc_paths] * (max_level + 1)) statistics = self.statistics for level in range(max_level + 1): if level > 0: ml_process_level_l.next_level( mc_paths=mc_paths, path_managers=self.path_managers, product=product ) self.compute_level_l( level=level, current_mc_paths=0, extra_mc_paths=mc_paths, coupling_process=ml_process_level_l, product=product, df=df, statistics=statistics, ) sum_cost[level] = ( ml_process_level_l.one_simulation_cost(product=product) * mc_paths ) Nl = mc_paths * np.ones(max_level + 1) statistics.set_mlmc_results(Nl=Nl, sum_cost=sum_cost) return statistics