Source code for rpylib.montecarlo.statistic.statistic

"""Store the simulations of an estimator (underlying, payoff) and add tools to describe its statistics:
moments, density, etc.

"""

import copy
from collections.abc import Callable
from enum import IntEnum

import matplotlib.pyplot as plt

from .tools import *
from ...process.process import ProcessRepresentation


[docs]class NoStatistic: """NoStatistic object: statistics for this variable are not needed"""
[docs] def __init__(self): pass
[docs] def add(self, simulation: int, variable: np.array) -> None: pass
[docs] def extend(self, mc_path) -> None: pass
def __repr__(self) -> str: return ""
[docs] def plot_density(self): pass
[docs]class Density: """Define and plot the density function of a stochastic underlying modelled by a Lévy model"""
[docs] def __init__(self, label: str, process_representation: ProcessRepresentation): """ :param label: label of the density plot :param process_representation: how the process is represented by the model (identity or log) """ self.label = label self.density = None self.process_representation = process_representation
def __call__(self, stats: np.array, title: str = "") -> None: """Plot the histogram of the data :param stats: statistics to be plotted :param title: title of the figure :return: a plot """ dim = stats.shape[1] stats_to_use = [stats] if len(stats.shape) == 3: # that's mlmc stats with fine and coarse values stats_to_use = [stats[..., PT.FP], stats[..., PT.CP]] dim *= 2 fig, axs = plt.subplots(1, dim) for index, this_stats in enumerate(stats_to_use): for k, simulations in enumerate(this_stats.T): if dim == 1: ax = axs else: ax = axs[index * this_stats.shape[1] + k] n, bins, patches = ax.hist( simulations, bins="auto", density=True, facecolor="green", alpha=0.40, ) mean_val, stddev_val = mean(simulations), stddev(simulations) if stddev_val <= 1e-8: # if the stddev is very small, we just take a fraction of the mean so that # the min/max values are not equal to the mean stddev_val = 0.1 * mean_val axes = plt.gca() min_val = mean_val - 5 * stddev_val max_val = mean_val + 5 * stddev_val if self.process_representation == ProcessRepresentation.LOG: min_val = max(0, min_val) axes.set_xlim([min_val, max_val]) if self.density: y = self.density[k](bins) ax.plot(bins, y, "r--", alpha=0.60) ax.set_xlabel(self.label + "_" + str(k), fontsize="xx-small") # Tweak spacing to prevent clipping of the y-label plt.subplots_adjust(left=0.15) if title: plt.title(label=title) plt.show()
[docs] def set_density(self, density): self.density = density
[docs]class Statistic: """Statistics object to manage the simulated variables and handle requests such as plotting their density"""
[docs] def __init__( self, label: str, shape: tuple, mc_paths: int, process_representation: ProcessRepresentation, ): """ :param label: label of the simulated variables :param shape: shape of the input data :param mc_paths: number of simulations in the Monte-Carlo engine """ self.density_plot = Density( label=label, process_representation=process_representation ) self.stats = np.empty(shape=(mc_paths, *shape))
[docs] def plot_density(self): return self.density_plot(stats=self.stats)
[docs] def add(self, simulation: int, variable: np.array) -> None: self.stats[simulation] = variable
[docs] def extend(self, mc_path) -> None: """ Extend the data to support extra number of Monte-Carlo paths :param mc_path: extra number of paths """ if self.stats.shape[0] < mc_path: delta = mc_path - self.stats.shape[0] zero_padding = tuple((0, 0) for _ in range(len(self.stats.shape) - 1)) self.stats = np.pad(self.stats, ((0, delta), *zero_padding), "constant")
# we just pad the first dimension which corresponds to the number of Monte-Carlo paths def __repr__(self) -> str: return "Statistic"
[docs]class MCStatistics: """Statistics object when used with a standard Monte-Carlo engine"""
[docs] def __init__( self, *, payoff_statistics: Statistic, spot_underlying_statistics: Statistic = NoStatistic(), control_variates_statistics: Statistic = NoStatistic() ): """ :param payoff_statistics: statistics object handling the simulations of the payoff :param spot_underlying_statistics: statistics object handling the simulations of the spot underlying :param control_variates_statistics: statistics object handling the simulations of the control variates """ self._payoff_statistics = payoff_statistics self._spot_underlying_statistics = spot_underlying_statistics self._control_variates_statistics = ( control_variates_statistics # ->statistics of the control variates ) self._payoff_statistics_with_cv = copy.deepcopy( payoff_statistics ) # -> statistics of the adjusted payoff
# with the control variates
[docs] def add(self, simulation: int, path_manager: "MCPath") -> None: self._spot_underlying_statistics.add(simulation, path_manager.spot_underlying) self._payoff_statistics.add(simulation, path_manager.payoff) self._control_variates_statistics.add( simulation, path_manager.payoff_control_variates )
[docs] def extend(self, mc_path) -> None: self._spot_underlying_statistics.extend(mc_path) self._payoff_statistics.extend(mc_path) self._control_variates_statistics.extend(mc_path) self._payoff_statistics_with_cv.extend(mc_path)
def __repr__(self) -> str: return self._payoff_statistics.__repr__() def _get_payoff_statistics(self, no_control_variates: bool = False) -> Statistic: if no_control_variates or isinstance( self._control_variates_statistics, NoStatistic ): return self._payoff_statistics else: return self._payoff_statistics_with_cv
[docs] def plot_spot_density(self): return self._spot_underlying_statistics.plot_density()
[docs] def plot_payoff_density(self): return self._payoff_statistics.plot_density()
[docs] def price(self, no_control_variates: bool = False) -> np.array: return self.get_mean(no_control_variates=no_control_variates)
[docs] def mc_stddev(self, no_control_variates: bool = False) -> np.array: stats_obj = self._get_payoff_statistics(no_control_variates) res = mc_stddev(stats_obj.stats) return res[0] if res.size == 1 else res
[docs] def get_mean(self, no_control_variates: bool = False) -> np.array: stats_obj = self._get_payoff_statistics(no_control_variates) res = mean(stats_obj.stats) return res[0] if res.size == 1 else res
[docs] def get_variance(self, no_control_variates: bool = False) -> np.array: stats_obj = self._get_payoff_statistics(no_control_variates) res = mean(stats_obj.stats) return res[0] if res.size == 1 else res
[docs]class MLMCResults: """Handle the output of the Multilevel Monte-Carlo"""
[docs] def __init__( self, Nl: np.array, sum_cost: np.array, all_pl_fine: list, all_pl_coarse: list ): """ :param Nl: number of Monte-Carlo paths for each level l :param sum_cost: array of the cost for each level l :param all_pl_fine: correction terms for the `fine` payoff for each level l :param all_pl_coarse: correction terms for the `coarse` payoff for each level l """ dp = [ pl_fine - pl_coarse for pl_fine, pl_coarse in zip(all_pl_fine, all_pl_coarse) ] self._ncms_dp = NonCenteredMoments(dp) self._ncms_fine = NonCenteredMoments(all_pl_fine) self._sum_cost = sum_cost self.Nl = Nl
@property def mean_level_l(self): val = self._ncms_fine.ncm_first return val @property def var_level_l(self): val = self._ncms_fine.ncm_second - self.mean_level_l**2 return val @property def kurtosis(self): nsum1 = self._ncms_dp.ncm_first nsum2 = self._ncms_dp.ncm_second nsum3 = self._ncms_dp.ncm_third nsum4 = self._ncms_dp.ncm_fourth val = (nsum4 - 4 * nsum3 * nsum1 + 6 * nsum2 * nsum1**2 - 3 * nsum1**4) / ( np.maximum(1, nsum2 - nsum1**2) ) ** 2 return val @property def ml(self): """Mean of the correction terms :math:`|P_l - P_{l-1}|` for the level l""" val = np.absolute(self._ncms_dp.ncm_first) return val @property def vl(self): """Variance of the correction terms :math:`|P_l - P_{l-1}|` for the level l""" val = np.maximum(0.0, self._ncms_dp.ncm_second - self._ncms_dp.ncm_first**2) return val @property def cl(self): """Algorithm cost associated to the level l""" val = self._sum_cost / self.Nl return val @property def cost(self): """total cost of the Multilevel Monte-Carlo algorithm""" val = np.sum(self._sum_cost) return val @property def consistency_check(self): val = (self.ml[1:] - self.mean_level_l[1:] + self.mean_level_l[:1]) / ( 3 * ( np.sqrt(self.vl[1:]) + np.sqrt(self.var_level_l[1:]) + np.sqrt(self.var_level_l[:1]) ) ) return val
[docs]class PT(IntEnum): FP = 0 # fine process position CP = 1 # coarse process position
[docs]class MLMCStatistics: """Statistics object when used with a Multilevel Monte-Carlo engine"""
[docs] def __init__(self, mc_statistics: list[MCStatistics], create_statistic): """ :param mc_statistics: list of the statistics object for each level l :param create_statistic: helper function to create individual statistics """ self.mc_statistics = mc_statistics self.create_statistic = create_statistic self.mlmc_results: MLMCResults = None
[docs] def add(self, simulation: int, level: int, path_manager: "MCPath") -> None: self.mc_statistics[level].add(simulation, path_manager)
[docs] def set_mlmc_results(self, Nl: np.array, sum_cost: np.array): all_pl_fine = [ self.simulation_payoff_with_fine_process(level=level) for level in range(len(Nl)) ] all_pl_coarse = [ self.simulation_payoff_with_coarse_process(level=level) for level in range(len(Nl)) ] self.mlmc_results = MLMCResults( Nl=Nl, sum_cost=sum_cost, all_pl_fine=all_pl_fine, all_pl_coarse=all_pl_coarse, )
[docs] def add_mc_statistic(self): self.mc_statistics.append(self.create_statistic())
[docs] def extend(self, mc_paths): while len(self.mc_statistics) < len(mc_paths): self.add_mc_statistic() for level, mc_stat in enumerate(self.mc_statistics): mc_stat.extend(mc_paths[level])
[docs] def plot_spot_density(self, level: int = None): if level is None: # then plot for all the levels for stats in self.mc_statistics: stats._spot_underlying_statistics.plot_density() else: return self.mc_statistics[level]._spot_underlying_statistics.plot_density()
def _get_payoff_statistics( self, level: int, no_control_variates: bool = False ) -> Statistic: stats_level_l = self.mc_statistics[level] if no_control_variates or isinstance( stats_level_l._control_variates_statistics, NoStatistic ): return stats_level_l._payoff_statistics else: return stats_level_l._payoff_statistics_with_cv
[docs] def price(self, no_control_variates: bool = False) -> float: res = 0 for level, mc_stat in enumerate(self.mc_statistics): stats_obj = self._get_payoff_statistics( level=level, no_control_variates=no_control_variates ) mean_val = mean(stats_obj.stats) res += mean_val[0, PT.FP] - mean_val[0, PT.CP] return res
[docs] def mc_stddev(self, no_control_variates: bool = False) -> float: res = 0 for level, mc_stat in enumerate(self.mc_statistics): pl_fine = self.simulation_payoff_with_fine_process( level=level, no_control_variates=no_control_variates ) pl_coarse = self.simulation_payoff_with_coarse_process( level=level, no_control_variates=no_control_variates ) stddev_val = mc_stddev(pl_fine - pl_coarse) res += stddev_val return res
[docs] def simulation_payoff_with_fine_process( self, level, start: int = None, end: int = None, no_control_variates: bool = False, ) -> np.array: stats = self._get_payoff_statistics( level=level, no_control_variates=no_control_variates ).stats return stats[(slice(start, end), 0, PT.FP)]
[docs] def simulation_payoff_with_coarse_process( self, level, start: int = None, end: int = None, no_control_variates: bool = False, ) -> np.array: stats = self._get_payoff_statistics( level=level, no_control_variates=no_control_variates ).stats return stats[(slice(start, end), 0, PT.CP)]
[docs]def create_mc_statistics( mc_paths: int, underlying_density: [Callable[[float], float]], control_variates: "ControlVariates", payoff_dimension: int, process_representation: ProcessRepresentation, activate_spot_statistics: bool, spot_dimension: int, ) -> MCStatistics: """Factory method which returns the MCStatistics object :param mc_paths: number of Monte-Carlo paths :param underlying_density: theoretical underlying density :param control_variates: control variates object :param payoff_dimension: dimension of the payoff (that is the number of underlyings passed to the payoff function) :param process_representation: process representation in the model :param activate_spot_statistics: if True, compute and save the simulated values of the modelled underlying process :param spot_dimension: dimension of the modelled underlying process """ from ...product.product import NoControlVariates # always create the payoff statistics payoff_statistics = Statistic( "payoff", shape=(payoff_dimension,), mc_paths=mc_paths, process_representation=process_representation, ) spot_underlying_statistics = NoStatistic() if activate_spot_statistics: spot_underlying_statistics = Statistic( "spot", shape=(spot_dimension,), mc_paths=mc_paths, process_representation=process_representation, ) if underlying_density is not None: spot_underlying_statistics.density_plot.set_density(underlying_density) if isinstance(control_variates, NoControlVariates): control_variates_statistics = NoStatistic() else: control_variates_statistics = Statistic( "CV", shape=(len(control_variates.products), payoff_dimension), mc_paths=mc_paths, process_representation=process_representation, ) return MCStatistics( payoff_statistics=payoff_statistics, spot_underlying_statistics=spot_underlying_statistics, control_variates_statistics=control_variates_statistics, )
[docs]def create_mlmc_statistics( mc_paths: int, underlying_density: [Callable[[float], float]], initial_level: int, control_variates: "ControlVariates", payoff_dimension: int, process_representation: ProcessRepresentation, ) -> MLMCStatistics: """Factory method which returns the MLMCStatistics object :param mc_paths: number of Monte-Carlo paths :param underlying_density: theoretical underlying density :param initial_level: initial number of levels :param control_variates: control variates object :param payoff_dimension: dimension of the payoff (that is the number of underlyings passed to the payoff function) :param process_representation: process representation in the model """ from ...product.product import NoControlVariates payoff_stat = Statistic( "payoff", shape=(payoff_dimension, 2), mc_paths=mc_paths, process_representation=process_representation, ) # coarse payoff is set to 0 for convenience spot_underlying_stat = NoStatistic() if underlying_density is not None: spot_dimension = len(underlying_density) spot_underlying_stat = Statistic( "spot", shape=(spot_dimension,), mc_paths=mc_paths, process_representation=process_representation, ) spot_underlying_stat.density_plot.set_density(underlying_density) if isinstance(control_variates, NoControlVariates): control_variate_statistics0 = NoStatistic() else: control_variate_statistics0 = Statistic( "CV", shape=(len(control_variates.products), payoff_dimension), mc_paths=mc_paths, process_representation=process_representation, ) mc_stat0 = MCStatistics( payoff_statistics=payoff_stat, spot_underlying_statistics=spot_underlying_stat, control_variates_statistics=control_variate_statistics0, ) def create_statistic(nb_of_paths=0): # coarse and fine simulations for each spot and control variates underlying # -> for each simulation (row), each underlying (col) has a fine value and a coarse value # that is to say the last dimension always corresponds to the fine/coarse values c_payoff_stat = Statistic( "payoff", shape=(payoff_dimension, 2), mc_paths=nb_of_paths, process_representation=process_representation, ) c_spot_underlying_stat = NoStatistic() if underlying_density is not None: c_spot_underlying_stat = Statistic( "spot", shape=(spot_dimension, 2), mc_paths=nb_of_paths, process_representation=process_representation, ) # one needs two functions for both the coarse and fine process c_spot_underlying_stat.density_plot.set_density( underlying_density + underlying_density ) if isinstance(control_variates, NoControlVariates): control_variate_statistics = NoStatistic() else: control_variate_statistics = Statistic( "CV", shape=(len(control_variates.products), payoff_dimension, 2), mc_paths=nb_of_paths, process_representation=process_representation, ) return MCStatistics( payoff_statistics=c_payoff_stat, spot_underlying_statistics=c_spot_underlying_stat, control_variates_statistics=copy.deepcopy(control_variate_statistics), ) mc_stats = [mc_stat0] + [ create_statistic(mc_paths) for _ in range(1, initial_level + 1) ] return MLMCStatistics(mc_stats, create_statistic)