Source code for msmbuilder.drift

"""
Compute the drift in trajectories under different distance metrics
using new distance_metrics.py code
"""
from __future__ import print_function, division, absolute_import
import numpy as np


def _drift_single_trajectory(metric, trajectory, tau):
    """
    Compute the drift in your desired metric between all pairs of
    conformations in the supplied trajectory which are seperated
    by tau frames.

    Parameters
    ----------
    metric : msmbuilder.metrics.AbstractDistanceMetric
        The distance metric to compute distances with
    trajectory : msmbuilder.Trajectory
        The trajectory to compute the distances
    tau : {int, np.ndarray}
        tau can be either a positive integer or an array of positive
        integers. If tau is an integer, the return value is a 1D array
        of length equal to the length of the trajectory minus tau
        containing the pairwise distance of all conformations in
        trajectory seperated by tau frames. If tau is an array of
        integers of length n, the return value is a 2D array with n
        rows and a number of columns equal to the length of the
        trajectory minus min(tau). The ith row of the returned array
        contains the pairwise distance of all of the conformations in
        the supplied trajectory separated by tau[i] frames. The final
        i entries in row i will be padded with -1s to ensure that the
        output 2D arrau is rectangular.

    Returns
    -------
    distances : np.ndarray
        1D or 2D array of the drifts, depending on whether tau is a single
        number or an array
    """
    # make sure tau is a 1D numpy array of positive ints, or make it into one
    tau = __typecheck_tau(tau)
    # if not isinstance(metric, AbstractDistanceMetric):
    #   raise TypeError('metric must be an instance of AbstractDistanceMetric. you supplied a %s' % metric)

    traj_length = trajectory['XYZList'].shape[0]
    ptraj = metric.prepare_trajectory(trajectory)
    distances = -1 * np.ones((len(tau), traj_length - np.min(tau)))

    for i in xrange(traj_length - np.min(tau)):
        comp_indices = [elem for elem in tau + i if elem < traj_length]
        d = metric.one_to_many(ptraj, ptraj, i, comp_indices)
        # these distances are the ith column
        distances[0:len(comp_indices), i] = d

    # if there was only 1 element in tau, reshape output so its 1D
    # if distances.shape == (1, traj_length - np.min(tau)):
    #    distances = np.reshape(distances, traj_length - np.min(tau))
    return distances


def drift(metric, trajectories, taus):
    if 'XYZList' in trajectories:
        trajectories = [trajectories]
    if isinstance(taus, int):
        taus = [taus]

    output = [np.array([])] * len(taus)
    for trajectory in trajectories:
        d = _drift_single_trajectory(metric, trajectory, taus)
        for i in range(len(taus)):
            #length_i = d.shape[1] - (taus[i] - np.min(taus))
            selected_d = np.ma.masked_less(d[i, :], 0)
            selected_d = np.ma.compressed(selected_d)
            output[i] = np.hstack((output[i], selected_d))

    return output


[docs]def square_drift(metric, trajectories, tau): output = drift(metric, trajectories, tau) for i, row in enumerate(output): output[i] = row ** 2 return output
[docs]def get_epsilon_neighborhoods(metric, ptraj, tau): output = [] N = len(ptraj) for i in xrange(N): if i < tau: output.append(metric.one_to_many(ptraj, ptraj, i, [i + tau])[0]) elif i >= N - tau: output.append(metric.one_to_many(ptraj, ptraj, i, [i - tau])[0]) else: output.append(metric.one_to_many(ptraj, ptraj, i, [i - tau, i + tau]).max()) return output
[docs]def hitting_time(metric, trajectory, epsilon): """ For each frame in trajectories, determine the time it takes to go more than epsilon distance from where it started. Returns: a masked array of integers of length equal to the length of the trajectory. The masked values correspond to frames for which the hitting time could not be determined (maybe the trajectory wasn't long enough so it never left the epsilon-ball) """ traj_length = len(trajectory) ptraj = metric.prepare_trajectory(trajectory) window_length = 8 output = -1 * np.ones(len(trajectory), dtype=np.int) for i in xrange(traj_length): found = False mult = 1 start = i + 1 while not found: forward_window = np.arange(start, start + window_length * mult) forward_window = forward_window[np.where(forward_window < traj_length)] if len(forward_window) == 0: break # print forward_window d = metric.one_to_many(ptraj, ptraj, i, forward_window) where = np.where(d > epsilon)[0] found = len(where) != 0 if not found: start += window_length * mult mult *= 2 if found: first = np.min(where) output[i] = forward_window[first] - i # print d[first - 1], d[first] # xprint i, first, forward_window # print i # print forward_window[np.min(where)] # print d[np.min(where)] # return np.ma.masked_equal(output, -1) return output
def __typecheck_tau(tau): """make sure tau is a 1D numpy array of positive ints, or make it into one if possible unambiguously""" if isinstance(tau, int): if tau < 0: raise TypeError('Tau cannot be negative') tau = np.array([tau]) else: tau = np.array(tau) if not len(tau.shape) == 1: raise TypeError('Tau must be a 1D array or an int. You supplied %s' % tau) # ensure positive if not np.all(tau == np.abs(tau)): raise TypeError('Taus must be all positive.') # ensure ints if not np.all(tau == np.array(tau, dtype='int')): raise TypeError('Taus must be all integers.') return tau
Versions