Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/msmhelper/msm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

"""
__all__ = [
'bh_test',
'buchete_hummer_test',
'chapman_kolmogorov_test',
'ck_test',
'estimate_markov_model',
Expand All @@ -27,6 +29,8 @@
]

from .tests import (
bh_test,
buchete_hummer_test,
chapman_kolmogorov_test,
ck_test,
)
Expand Down
173 changes: 172 additions & 1 deletion src/msmhelper/msm/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
# Copyright (c) 2019-2023, Daniel Nagel
# All rights reserved.
import decorit
import numba
import numpy as np

from msmhelper import utils
from msmhelper import msm, utils
from msmhelper.statetraj import LumpedStateTraj, StateTraj


Expand Down Expand Up @@ -152,6 +153,176 @@
}


@decorit.alias('bh_test')
def buchete_hummer_test(trajs, lagtime, tmax):
r"""Calculate the Buchete Hummer test.

This method estimates the Buchete Hummer autocorrelation test. Projecting
the state trajectory onto the right eigenvectors of the row normalized
transition matrix

$$C_{lm} (t) = \langle \phi_l[s(\tau +t)] \phi_m[S(\tau)]\rangle$$

where \(\phi_i\) is the \(i\)-th right eigenvector. Buchete and Hummer[^2]
showed that for a Markovian system it obeys an exponentil decay,
corresponds to

$$C_{lm} (t) = \delta_{lm} \exp(-t / t_k)$$

with the implied timescale \(t_k = - \tau_\text{lag} / \ln \lambda_k\).

[^2]: Buchete and Hummer, **Coarse master equations for peptide folding
dynamics**, *J. Phys. Chem.*, 112, 6057-6069 (2008),
doi:[10.1021/jp0761665](https://doi.org/10.1021/jp0761665)

Parameters
----------
trajs : StateTraj or list or ndarray or list of ndarray
State trajectory/trajectories. The states should start from zero and
need to be integers.
lagtime : int
Lagtimes for estimating the markov model given in [frames].
tmax : int
Longest time to evaluate the CK equation given in [frames].

Returns
-------
bhtest : dict
Dictionary holding for each lagtime the ckequation and with 'md' the
reference.

"""
# format input
trajs = StateTraj(trajs)

# check that lag times are array of integers
if not isinstance(lagtime, int) or lagtime < 0:
raise TypeError(
'Lagtimes needs to be positive integers, but {0}.'.format(lagtime),
)

if not isinstance(tmax, int) or tmax < 0:
raise TypeError(
'tmax needs to be a positive integer, but {0}'.format(tmax),
)

return _buchete_hummer_test(trajs, lagtime, tmax)


def _project_states_onto_vector(trajs, vector):
"""Shift states to corresponding vector values.

Parameters
----------
trajs : StateTraj or ndarray or list or list of ndarrays
1D data or a list of data.
vector : ndarray or list
Values which will be used instead of states.

Returns
-------
array : ndarray
Shifted data in same shape as input.

"""
trajs = StateTraj(trajs)
vector = np.atleast_1d(vector)

# check data-type
if len(vector) != trajs.nstates:
raise TypeError(

Check warning on line 233 in src/msmhelper/msm/tests.py

View check run for this annotation

Codecov / codecov/patch

src/msmhelper/msm/tests.py#L233

Added line #L233 was not covered by tests
'Vector of wrong length. {0} '.format(len(vector)) +
'vs. {0}'.format(trajs.nstates),
)

# flatten data
limits = np.cumsum([len(traj) for traj in trajs.trajs])
array = trajs.trajs_flatten

# convert to np.array
states = np.arange(trajs.nstates)

# shift data
conv = np.arange(trajs.nstates, dtype=np.float64)
conv[states] = vector
array = conv[array]

# reshape and return
return np.split(array, limits)[:-1]


def _buchete_hummer_test(trajs, lagtime, tmax):
r"""Calculate the Buchete Hummer equation."""
times = _calc_times(lagtime=lagtime, tmax=tmax)
ntimes = len(times)
bheq = np.empty((trajs.nstates - 1, ntimes))

# estimate Markov model
tmat, _ = msm.estimate_markov_model(trajs, lagtime=lagtime)

# symmetrize matrix
tmat = np.sqrt(tmat * tmat.T)

# get orthogonal eigenvectors of symmetric matrix
evals, evecs = msm.utils.linalg.right_eigenvectors(tmat)
pi, *evecs = evecs # first eigenvector is equilibrium population

# stationary distribution
pi = pi**2 / np.sum(pi**2)

# normalize eigenvectors to be eigenvectors of the non symmetric transition
# matrix
evecs = np.array(evecs) / np.sqrt(pi[np.newaxis, :])

for idx, evec in enumerate(evecs):
trajs_proj = _project_states_onto_vector(trajs, evec)

if not numba.config.DISABLE_JIT: # pragma: no cover
trajs_proj = numba.typed.List(trajs_proj)

bheq[idx] = _autocorrelation(trajs_proj, times)

# for idx_col, time in enumerate(times):
# # autocorrelation function
# c_nn, norm = 0, 0
# for traj_proj in trajs_proj:
# c_nn += np.sum(traj_proj[: -time] * traj_proj[time:])
# norm += len(traj_proj[time:])
#
# bheq[idx, idx_col] = c_nn / norm
Comment on lines +285 to +292

Check notice

Code scanning / CodeQL

Commented-out code

This comment appears to contain commented-out code.

# calculate reference (MD)
bheq_ref = np.empty((trajs.nstates - 1, ntimes))
for idx, eigenval in enumerate(evals[1:]):
bheq_ref[idx] = np.exp(times * np.log(eigenval) / lagtime)

return {
lagtime: {'bh': bheq, 'time': times},
'md': {'bh': bheq_ref, 'time': times},
}


@numba.njit
def _autocorrelation(trajs, times):
"""Generate a simple transition count matrix from multiple trajectories."""
# initialize matrix
bheq = np.zeros(len(times), dtype=np.float64)

for idx, time in enumerate(times):
# autocorrelation function
c_nn, norm = 0, 0
for traj in trajs:
for val_from, val_to in zip( # noqa: WPS519
traj[: -time], traj[time:],
):
c_nn += val_from * val_to
norm += len(traj[time:])

bheq[idx] = c_nn / norm

return bheq


def _calc_times(lagtime, tmax):
"""Return time array."""
steps = int(np.floor(tmax / lagtime))
Expand Down
27 changes: 27 additions & 0 deletions test/test_msm_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,30 @@ def test__chapman_kolmogorov_test(trajs, lagtime, tmax, result):
)
else:
np.testing.assert_array_almost_equal(cktest[key], result[key])


@pytest.mark.parametrize('trajs, lagtime, tmax', [
([1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1], 2, 5),
])
def test_buchete_hummer_test(trajs, lagtime, tmax):
"""Test Buchete Hummer test."""
# check float as lag times
with pytest.raises(TypeError):
_ = tests.bh_test(trajs, lagtime=[1.2], tmax=tmax)

# check negative lag times
with pytest.raises(TypeError):
_ = tests.bh_test(trajs, lagtime=-1, tmax=tmax)

# check maximal time negative
with pytest.raises(TypeError):
_ = tests.bh_test(trajs, lagtime=lagtime, tmax=-1)

# check maximal time float
with pytest.raises(TypeError):
_ = tests.bh_test(trajs, lagtime=lagtime, tmax=5.7)

# check if all keys exists
bhtest = tests.bh_test(trajs, lagtime=lagtime, tmax=tmax)
assert 'md' in bhtest
assert lagtime in bhtest