Source code for chainopy.markov_chain

import math
from typing import List, Union

import numpy as np

from ._exceptions import _handle_exceptions
from ._visualizations import _visualize_tpm, _visualize_chain
from ._fileio import _save_model_markovchain, _load_model_markovchain, _load_text
from ._caching import _cache
from ._backend import (
    _simulate,
    _absorbing,
    _stationary_dist,
    _learn_matrix,
    _is_communicating,
)


[docs] class MarkovChain: """A class containing Fundamental Functions for Discrete Time Markov Chains. This class provides a comprehensive suite of methods for working with discrete-time Markov chains (DTMCs), including validation, simulation, and analysis functionalities. It supports learning transition probability matrices (TPMs) from data, checking various properties of the Markov chain (e.g., ergodicity, aperiodicity, symmetry), and computing distributions and probabilities related to the chain's behavior over time. Attributes ---------- tpm : np.ndarray Transition probability matrix (TPM) representing the Markov chain. states : List[str] List of state names corresponding to the TPM. eigendecom : bool Flag indicating if the TPM is eigendecomposable. eigenvalues : np.ndarray Eigenvalues of the TPM. eigenvectors : np.ndarray Eigenvectors of the TPM. epsilon : float Small value to avoid numerical issues in calculations. """ __slots__ = "tpm", "states", "eigendecom", "eigenvalues", "eigenvectors", "epsilon"
[docs] def __init__( self, p: Union[np.ndarray, None] = None, states: Union[List[str], None] = None ) -> None: if p is not None: p = np.array(p, dtype=np.float64) if states is None: if p is not None: states = [str(i) for i in range(len(p))] self.tpm = p self.states = states self.eigendecom = None self.eigenvalues = None self.eigenvectors = None self.epsilon = 1e-16 self._validate_transition_matrix(p, states, epsilon=1e-16)
def __repr__(self) -> str: if self.tpm is None: return ( f"<Object of type MarkovChain with uninitialized transition matrix " f"\n" f"and unknown states>" ) return ( f"<Object of type MarkovChain with {self.tpm.shape[0]} x {self.tpm.shape[1]} sized transition matrix " f"\n" f"and {len(self.states)} states>" ) def __str__(self) -> str: return self.__repr__() def _validate_transition_matrix( self, p: np.ndarray, states: List[str], epsilon ) -> None: def elements_range(arr): if np.any((arr < 0) | (arr > 1)): raise ValueError("All elements in the TPM must be between 0 and 1.") if p is not None: elements_range(p) self.eigendecom = self._is_eigendecomposable() if ( self.eigendecom and (self.eigenvectors is None) and (self.eigenvalues is None) ): self.eigenvalues, self.eigenvectors = np.linalg.eig(p) else: self.eigenvalues = None self.eigenvectors = None if states is not None: if len(states) != len(set(states)): raise ValueError( "Names of the states \ should be Unique" ) if not (p.shape[0] == len(states) and p.shape[0] == p.shape[1]): raise ValueError(f"Invalid TPM {p}") if not np.allclose(np.sum(p, axis=1), 1, atol=epsilon * len(states)): raise ValueError( f"Rows of the Transition Probability Matrix \ (TPM) {p} must sum to 1." )
[docs] @_handle_exceptions @_cache(class_method=True) def fit(self, data: Union[str, list], epsilon: float = 1e-16) -> np.ndarray: """ Learn Transition Matrix from Sequence (list or str) of Data. Each Unique Word is considered a State. It will override the current transition-matrix. Args ---- data: Union[str, list] Data on which the MarkovChain model must be fitted. epsilon: float Small dummy value to avoid zeros in the Transition-Matrix Returns ------- ndarray: Transition - Matrix based on `data` Usage ----- >>> chainopy.MarkovChain().fit("My name is John.") """ return self._learn_matrix(data=data, epsilon=epsilon)
def _learn_matrix(self, data: Union[str, list], epsilon: float) -> np.ndarray: _tpm, _states = _learn_matrix.learn_matrix_cython(data, epsilon=epsilon) self.tpm = _tpm self.epsilon = epsilon self.states = _states self._validate_transition_matrix(self.tpm, self.states, self.epsilon) return _tpm
[docs] @_handle_exceptions def simulate(self, initial_state: str, n_steps: int) -> List[str]: """ Simulate the Markov Chain for `n_steps` steps. Args ---- initial_state: str State from which the simulation starts n_steps: int Number of steps to simulate the chain for Returns ------- list: Contains states attained during simulation """ return _simulate._simulate_cython(self.states, self.tpm, initial_state, n_steps)
@staticmethod def _vectorize(states: List[str], initial_state: str) -> np.ndarray: if initial_state in states: init = states.index(initial_state) initial_vect = np.zeros((1, len(states))) initial_vect[0, init] = 1 else: raise ValueError("Initial state not found in the list of states.") return initial_vect
[docs] def adjacency_matrix(self) -> np.ndarray: """ Returns ------- ndarray: Adjacency matrix of the chain. """ return (self.tpm > self.epsilon).astype(int)
[docs] def predict(self, initial_state: str) -> str: """ Return the next most likely states. Args ---- initial_state : str Initial state. Returns ------- str: Next most likely state. """ initial_vect = self._vectorize(self.states, initial_state) return self.states[np.argmax(initial_vect @ self.tpm)]
[docs] @_handle_exceptions def nstep_distribution(self, n_steps: int) -> np.ndarray: """ Calculates the distribution of the Markov Chain after n-steps. Args ---- n_steps : int Number of steps. Returns ------- ndarray: Distribution of the Markov Chain. """ is_eigendecom = self.eigendecom eigvals = self.eigenvalues eigvecs = self.eigenvectors def diag_power_matmul(): if is_eigendecom: if (eigvals is not None) and (eigvecs is not None): D = np.diag(eigvals) return np.real(eigvecs @ D**n_steps @ np.linalg.inv(eigvecs)) result = diag_power_matmul() return result if is_eigendecom else np.linalg.matrix_power(self.tpm, n_steps)
[docs] @_cache(class_method=True) def is_ergodic(self) -> bool: """ Checks if the Markov chain is ergodic. Returns ------- bool: True if the Markov chain is ergodic, False otherwise. """ return self.is_irreducible() and self.is_aperiodic()
[docs] @_cache(class_method=True) def is_symmetric(self) -> bool: """ Checks if the Markov chain is symmetric. Returns ------- bool: True if the Markov chain is symmetric, False otherwise. """ return np.allclose(self.tpm, self.tpm.transpose())
[docs] @_cache(class_method=True) def stationary_dist(self) -> np.ndarray: """ Returns the stationary distribution of the Markov chain. Returns ------- ndarray: Stationary distribution. """ tpm_T = self.tpm.transpose() return _stationary_dist.cython_stationary_dist(tpm_T)
[docs] @_handle_exceptions @_cache(class_method=True) def is_communicating(self, state1: str, state2: str, threshold: int = 1000) -> bool: """ Checks if two states are communicating. Args ---- state1 : str First state. state2 : str Second state. threshold : int, optional Threshold for convergence. Defaults to 1000. Returns ------- bool: True if the states are communicating, False otherwise. """ return _is_communicating.is_communicating_cython( self.tpm, self.states, state1, state2, threshold )
[docs] @_cache(class_method=True) def is_irreducible(self) -> bool: """ Checks if the Markov chain is irreducible. Returns ------- bool: True if the Markov chain is irreducible, False otherwise. """ return all( any(self.is_communicating(state1, state2) for state2 in self.states) for state1 in self.states )
def _absorbing_state_indices(self) -> List[int]: return _absorbing._absorbing_states_indices(self.tpm)
[docs] @_cache(class_method=True) def absorbing_states(self) -> List[str]: """ Returns all absorbing states. Returns ------- List[str]: Absorbing states. """ indices = self._absorbing_state_indices() return [self.states[i] for i in indices]
[docs] @_cache(class_method=True) def is_absorbing(self) -> bool: """ Checks if the Markov chain is absorbing. Returns ------- bool: True if the Markov chain is absorbing, False otherwise. """ absorbing_states_ = self.absorbing_states() if len(absorbing_states_) == 0: return False transient_states = set(self.states) - set(absorbing_states_) for i in absorbing_states_: if all( _is_communicating._is_partially_communicating( self.tpm, self.states, state, i, threshold=1000 ) for state in transient_states ): return True return False
[docs] @_cache(class_method=True) def is_aperiodic(self) -> bool: """ Checks if the Markov chain is aperiodic. Returns ------- bool: True if the Markov chain is aperiodic, False otherwise. """ if self.period() == 1: return True return False
[docs] @_cache(class_method=True) def period(self) -> int: """ Returns the period of the Markov chain. Returns ------- int: Period of the Markov chain. """ if np.any(np.diag(self.tpm) > self.epsilon): return 1 communicating_states = [] if self.is_irreducible(): communicating_states = self.states else: for i in self.states: if all(self.is_communicating(i, s) for s in self.states): communicating_states.append(i) else: raise ValueError( "Chain should be Irreducible to \ calculate Period" ) def return_time(state): n = 1 idx = self.states.index(state) while True: _dist = self.nstep_distribution(n)[idx, idx] if not (np.isclose(_dist, 0)): return n n += 1 return_times = [return_time(cstate) for cstate in communicating_states] return math.gcd(*return_times)
@_cache(class_method=True) def _is_eigendecomposable(self) -> bool: eigenvalues, _ = np.linalg.eig(self.tpm) unique_eigenvalues = np.unique(eigenvalues) return len(unique_eigenvalues) == self.tpm.shape[0]
[docs] @_handle_exceptions @_cache(class_method=True) def is_transient(self, state: str) -> bool: """ Checks if a state is transient. Args ---- state : str State to check. Returns ------- bool: True if the state is transient, False otherwise. """ state_idx = self.states.index(state) _fundamental_matrix = self.fundamental_matrix() if _fundamental_matrix is not None: absorbing_idx = self._absorbing_state_indices() _fm_state_indices = [ x for x in list(range(len(self.states))) if x not in absorbing_idx ] if state_idx in _fm_state_indices: if np.isclose(_fundamental_matrix[state_idx, state_idx], np.inf): return False else: n_step_tpm = self.tpm _truth_vals = [] while True: if n_step_tpm[state_idx][state_idx] < 1: _truth_vals.append(True) else: return False j = n_step_tpm n_step_tpm = n_step_tpm @ self.tpm if np.allclose(n_step_tpm, j): break if not np.all(np.array(_truth_vals)): return False return True
[docs] @_handle_exceptions def is_recurrent(self, state: str) -> bool: """ Checks if a state is recurrent. Args ---- state : str State to check. Returns ------- bool: True if the state is recurrent, False otherwise. """ return not self.is_transient(state)
[docs] @_cache(class_method=True) def fundamental_matrix(self) -> Union[np.ndarray, None]: """ Returns the fundamental matrix. Returns ------- Union[ndarray, None]: Fundamental matrix. """ absorbing_indices = self._absorbing_state_indices() k = len(self.states) - len(absorbing_indices) if not self.is_absorbing(): return None I = np.identity(k) Q = np.delete(self.tpm, absorbing_indices, axis=0) Q = np.delete(Q, absorbing_indices, axis=1) return np.linalg.inv(I - Q)
[docs] @_cache(class_method=True) def absorption_probabilities(self) -> np.ndarray: """ Returns the absorption probabilities matrix for each state. Returns ------- ndarray: Absorption probabilities matrix. """ fundamental_matrix = self.fundamental_matrix() if fundamental_matrix is not None: absorbing_indices = self._absorbing_state_indices() return fundamental_matrix[:, absorbing_indices] else: raise ValueError( "Cannot compute absorption probabilities \ for non-absorbing Markov chains." )
[docs] @_cache(class_method=True) def expected_time_to_absorption(self) -> np.ndarray: """ Returns the expected time to absorption for each state. Returns ------- ndarray: Expected time to absorption. """ absorption_probs = self.absorption_probabilities() return np.sum(absorption_probs, axis=1)
[docs] @_cache(class_method=True) def expected_number_of_visits(self) -> np.ndarray: """ Returns the expected number of visits to each state before absorption. Returns ------- ndarray: Expected number of visits. """ absorption_probs = self.absorption_probabilities() return np.reciprocal(1 - absorption_probs)
[docs] @_cache(class_method=True) def expected_hitting_time(self, state: str) -> Union[float, None]: """ Returns the expected hitting time to reach the given absorbing state. Args ---- state : str Absorbing state. Returns ------- Union[float, None]: Expected hitting time. """ fundamental_matrix = self.fundamental_matrix() if fundamental_matrix is not None: absorbing_indices = self._absorbing_state_indices() state_index = self.states.index(state) return fundamental_matrix[state_index, absorbing_indices].sum() else: raise ValueError( "Cannot compute expected hitting time for non-absorbing Markov chains." )
[docs] def visualize_transition_matrix(self): """ Visualize the Transition Matrix """ _visualize_tpm(self.tpm, self.states)
[docs] def visualize_chain(self): """ Visualize the Markov Chain """ _visualize_chain(self.tpm, self.states, self.epsilon)
[docs] @_handle_exceptions def save_model(self, filename: str, epsilon: float = 1e-16): """ Save Model as a JSON Object. If tpm is sparsifyable, it stores tpm as a sparse matrix. Args ---- filename : str Name of the file to save. epsilon: float Small dummy value to avoid zeros in the Transition-Matrix """ _save_model_markovchain(self, filename, epsilon=epsilon)
[docs] @_handle_exceptions def load_model(self, path: str): """ Load a ChainoPy Model stored as a JSON Object and return as a `MarkovChain` object. Args ---- path : str Path to the file. Raises: ------ ValueError: If the file cannot be loaded. """ result = _load_model_markovchain(path) if len(result) == 3: self.tpm, self.states, self.eigendecom, self.epsilon = result elif len(result) == 4: self.tpm, self.states, self.eigendecom, self.epsilon = result elif len(result) == 5: ( self.tpm, self.states, self.eigendecom, self.eigenvalues, self.eigenvectors, ) = result elif len(result) == 6: ( self.tpm, self.states, self.eigendecom, self.eigenvalues, self.eigenvectors, self.epsilon, ) = result self.tpm = np.where(self.tpm == 0, self.epsilon, self.tpm)
[docs] @_cache(class_method=True) def marginal_dist(self, state: str): """ Args ---- state: str State for which to calculate the marginal distribution Returns ------- float: marginal distribution of a state """ _idx = self.states.index(state) return np.sum(self.tpm[:, _idx])
[docs] def fit_from_file(self, path: str, epsilon: float = 1e-16): """ Args ---- path: str path to the text file epsilon: float small value to avoid zero division Returns ------- ndarray: Transition Matrix trained from the text file. If `self.tpm` is None. Then this sets `self.tpm` to the new transition-matrix. """ _data_list = _load_text(path) if (_data_list is None) or (len(_data_list) == 0): raise ValueError("Invalid contents of the text file.") return _learn_matrix.learn_matrix_cython(_data_list, epsilon)