Source code for rpylib.distribution.samplingfactory

"""Simple factory function to create a sampling method"""

from itertools import product
from typing import Union

import numpy as np

from rpylib.model.levymodel.levymodel import LevyMeasure, LevyModel
from .pairing import (
    PairingToZd,
    Szudzik,
    RosenbergStrong,
    StatesManager,
    Domain,
    Boundary,
    PairingToZ1d,
)
from .sampling import SamplingMethod
from .variate.alias import AliasMethod
from .variate.binarysearchtree import BinarySearchTree
from .variate.binarysearchtreeadapted import (
    BinarySearchTreeAdapted,
    BinarySearchTreeAdapted1D,
)
from .variate.huffmantree import HuffmanTree
from .variate.inversion import InversionMethod
from .variate.table import TableMethod
from ..grid.spatial import CTMCGrid
from ..model.levycopulamodel import LevyCopulaModel


[docs]def compute_intensity_of_jumps( model: Union[LevyModel, LevyCopulaModel], grid: CTMCGrid ): h_left = grid.middle(grid.left_point(grid.origin_coordinate), grid.origin) h_right = grid.middle(grid.origin, grid.right_point(grid.origin_coordinate)) if model.dimension_model() == 1: h_left = [h_left] h_right = [h_right] intervals = [ [[h_l, h_r], [grid.axes[k][0], h_l], [h_r, grid.axes[k][-1]]] for k, (h_l, h_r) in enumerate(zip(h_left, h_right)) ] cartesian_product = product(*intervals) # discard first set which is [h_l1, h_r1]x[h_l2, h_r2]x...x[h_ln, h_rn] # we are calculating the measure on the complement of this very set next(cartesian_product) intensity_of_jumps = 0 for c_set in cartesian_product: a, b = zip(*c_set) intensity_of_jumps += model.mass(a=a, b=b) return intensity_of_jumps
[docs]def create_q_vector(levy_measure: LevyMeasure, grid: CTMCGrid) -> np.array: int_lm = levy_measure.integrate m_middle = grid.origin_coordinate q = np.zeros(len(grid.axes[0]), dtype=np.float) for k, x in enumerate(grid.axes[0]): if k != m_middle: x_left = grid.middle(grid.left_point(k), x) x_right = grid.middle(x, grid.right_point(k)) q[k] = int_lm(x_left, x_right) return q
[docs]def create_vec_jump_matrix(q_vector: np.array, init_state, intensity_of_jumps: float): res = q_vector / intensity_of_jumps res[init_state.value] = 0.0 return res
[docs]def create_matrix_jump_matrix(q_matrix: np.array, init_state) -> np.array: qi = -q_matrix[init_state] res = q_matrix / qi res[init_state] = 0.0 return res
[docs]def create_sampling_method( model, levy_measure, method: SamplingMethod, grid: CTMCGrid, is_levy_copula: bool, intensity_of_jumps: float, ): if method == SamplingMethod.INVERSION: return create_sampling_inversion_method( grid, model, intensity_of_jumps, is_levy_copula ) if method == SamplingMethod.BINARYSEARCHTREEADAPTED1D: return BinarySearchTreeAdapted1D( model=model, grid=grid, intensity_of_jumps=intensity_of_jumps ) if not is_levy_copula: init_state = grid.origin_coordinate q_vector = create_q_vector(levy_measure, grid) jump_vector = create_vec_jump_matrix( q_vector=q_vector, init_state=init_state, intensity_of_jumps=intensity_of_jumps, ) left, right = levy_measure.support() pivot = grid.origin_coordinate if left < 0 < right: def states(k): return -pivot.value + np.array(k) elif left == 0: def states(k): return np.array(k) elif right == 0: def states(k): return -pivot.value + np.array(k) elif is_levy_copula: if method == SamplingMethod.BINARYSEARCHTREEADAPTED: return BinarySearchTreeAdapted(model=model, grid=grid) raise ValueError( "create_sampling_method not yet implemented for levy copula, use SamplingMethod.INVERSION" ) else: raise ValueError("Unexpected error when creating the sampling method") methods = { SamplingMethod.ALIAS: AliasMethod, SamplingMethod.TABLE: TableMethod, SamplingMethod.BINARYSEARCHTREE: BinarySearchTree, SamplingMethod.HUFFMANNTREE: HuffmanTree, } return methods[method](jump_vector, states)
[docs]def create_sampling_inversion_method(grid, model, intensity_of_jumps, is_levy_copula): if is_levy_copula: if model.dimension() == 2: pairing = PairingToZd(pairing=Szudzik(), dimension=model.dimension()) else: # don't use Szudzik which is not a general n-dimensional pairing function pairing = PairingToZd( pairing=RosenbergStrong(), dimension=model.dimension() ) else: left_grid_size = grid.origin_coordinate.value right_grid_size = len(grid.axes[0]) - left_grid_size - 1 pairing = PairingToZ1d((-left_grid_size, right_grid_size), omit_zero=True) def probability_to_jump_to_state(state_increment): state = grid.origin_coordinate + state_increment value = grid[state] mid_point_left = grid.middle(grid.left_point(state), value) mid_point_right = grid.middle(value, grid.right_point(state)) state_mass = model.mass(mid_point_left, mid_point_right) state_mass = max(state_mass, 0) # avoid side effects for very small state_mass prob = state_mass / intensity_of_jumps return prob boundary = Boundary() domain = Domain(boundary=boundary, grid=grid, pairing=pairing) state_manager = StatesManager(pairing=pairing, domain=domain, grid=grid) return InversionMethod( probability_to_jump_to_state=probability_to_jump_to_state, state_manager=state_manager, )