Source code for msmbuilder.reduce.tICA

from __future__ import print_function, division, absolute_import

import numpy as np
import scipy.linalg
from time import time
import logging
from mdtraj import io
from mdtraj.utils.six.moves import cPickle

from msmbuilder.metrics import Vectorized
from msmbuilder.reduce import AbstractDimReduction

logger = logging.getLogger(__name__)


[docs]class tICA(AbstractDimReduction): """ tICA is a class for calculating the matrices required to do time-structure based independent component analysis (tICA). It can be used to calculate both the time-lag correlation matrix and covariance matrix. The advantage it has is that you can calculate the matrix for a large dataset by "training" smaller pieces of the dataset at a time. Notes ----- It can be shown that the time-lag correlation matrix is the same as: C = E[Outer(X[t], X[t+lag])] - Outer(E[X[t]], E[X[t+lag]]) Because of this it is possible to calculate running sums corresponding to variables A, B, D: A = E[X[t]] B = E[X[t+lag]] D = E[Outer(X[t], X[t+lag])] Then at the end we can calculate C: C = D - Outer(A, B) Finally we can get a symmetrized C' from our estimate of C, for example by adding the transpose: C' = (C + C^T) / 2 There is, in fact, an MLE estimator for ech matrix C, and S: S = E[Outer(X[t], X[t])] The MLE estimators are: \mu = 1 / (2(N - lag)) \sum_{t=1}^{N - lag} X[t] + X[t + lag] C = 1 / (2(N - lag)) * \sum_{t=1}^{N - lag} Outer(X[t] - \mu, X[t + lag] - \mu) + Outer(X[t + lag] - \mu, X[t] - \mu) S = 1 / (2(N - lag)) * \sum_{t=1}^{N - lag} Outer(X[t] - \mu, X[t] - \mu) + Outer(X[t + lag] - \mu, X[t + lag] - \mu) """
[docs] def __init__(self, lag, calc_cov_mat=True, prep_metric=None, size=None): """ Create an empty tICA object. To add data to the object, use the train method. Parameters ---------- lag: int The lag to use in calculating the time-lag correlation matrix. If zero, then only the covariance matrix is calculated calc_cov_mat: bool, optional if lag > 0, then will also calculate the covariance matrix prep_metric: msmbuilder.metrics.Vectorized subclass instance, optional metric to use to prepare trajectories. If not specified, then you must pass prepared trajectories to the train method, via the kwarg "prep_trajectory" size: int, optional the size is the number of coordinates for the vector representation of the protein. If None, then the first trained vector will be used to initialize it. Notes ----- To load an already constructed tICA object, use `tICA.load()`. """ self.corrs = None self.sum_t = None self.sum_t_dt = None # The above containers hold a running sum that is used to # calculate the time-lag correlation matrix as well as the # covariance matrix self.corrs_lag0 = None # needed for calculating the covariance # matrix self.sum_all = None self.trained_frames = 0 self.total_frames = 0 # Track how many frames we've trained self.lag = int(lag) if self.lag < 0: raise Exception("lag must be non-negative.") elif self.lag == 0: # If we have lag=0 then we don't need to # calculate the covariance matrix twice self.calc_cov_mat = False else: self.calc_cov_mat = calc_cov_mat if prep_metric is None: self.prep_metric = None logger.warn("no metric specified, you must pass prepared" " trajectories to the train and project methods") else: if not isinstance(prep_metric, Vectorized): raise Exception("prep_metric must be an instance of a " "subclass of msmbuilder.metrics.Vectorized") self.prep_metric = prep_metric self.size = size if not self.size is None: self.initialze(size) # containers for the solutions: self.timelag_corr_mat = None self.cov_mat = None self.vals = None self.vecs = None self._sorted = False
[docs] def initialize(self, size): """ initialize the containers for the calculation Parameters ---------- size : int The size of the square matrix will be (size, size) """ self.size = size self.corrs = np.zeros((size, size), dtype=float) self.sum_t = np.zeros(size, dtype=float) self.sum_t_dt = np.zeros(size, dtype=float) self.sum_all = np.zeros(size, dtype=float) if self.calc_cov_mat: self.corrs_lag0_t = np.zeros((size, size), dtype=float) self.corrs_lag0_t_dt = np.zeros((size, size), dtype=float)
[docs] def train(self, trajectory=None, prep_trajectory=None): """ add a trajectory to the calculation Parameters: ----------- trajectory: msmbuilder.Trajectory, optional trajectory object prep_trajectory: np.ndarray, optional prepared trajectory object Remarks: -------- must input one of trajectory or prep_trajectory (if both are given, then prep_trajectory is used.) """ if not prep_trajectory is None: data_vector = prep_trajectory elif not trajectory is None: data_vector = self.prep_metric.prepare_trajectory(trajectory) else: raise Exception("need to input one of trajectory or prep_trajectory") a = time() # For debugging we are tracking the time each step takes if self.size is None: # then we haven't started yet, so set up the containers self.initialize(size=data_vector.shape[1]) if data_vector.shape[1] != self.size: raise Exception("Input vector is not the right size. axis=1 should " "be length %d. Vector has shape %s" % (self.size, str(data_vector.shape))) if data_vector.shape[0] <= self.lag: logger.warn("Data vector is too short (%d) " "for this lag (%d)", data_vector.shape[0], self.lag) return b = time() if self.lag != 0: self.corrs += data_vector[:-self.lag].T.dot(data_vector[self.lag:]) self.sum_t += data_vector[:-self.lag].sum(axis=0) self.sum_t_dt += data_vector[self.lag:].sum(axis=0) else: self.corrs += data_vector.T.dot(data_vector) self.sum_t += data_vector.sum(axis=0) self.sum_t_dt += data_vector.sum(axis=0) if self.calc_cov_mat: self.corrs_lag0_t += data_vector[:-self.lag].T.dot(data_vector[:-self.lag]) self.corrs_lag0_t_dt += data_vector[self.lag:].T.dot(data_vector[self.lag:]) self.sum_all += data_vector.sum(axis=0) self.total_frames += data_vector.shape[0] self.trained_frames += data_vector.shape[0] - self.lag # this accounts for us having finite trajectories, so we really are # only calculating expectation values over N - \Delta t total samples c = time() logger.debug("Setup: %f, Corrs: %f" % (b - a, c - b)) # Probably should just get rid of this..
def get_current_estimate(self): """Calculate the current estimate of the time-lag correlation matrix and the covariance matrix (if asked for). These estimates come from an MLE argument assuming that the data {X_t, X_t+dt} are distributed as a multivariate normal. Of course, this assumption is not very true, but this is merely one way to enforce that the timelag correlation matrix is symmetric. The MLE has nice properties, as well, such as the eigenvalues that result from solving the tICA equation are always bounded between -1 and 1, which is not the case when one merely symmetrizes the timelag correlation matrix while estimating the covariance matrix and mean in the usual manner. See Shukla, D et. al. In Preparation for details, or email Christian Schwantes (schwancr@stanford.edu). """ two_N = 2. * float(self.trained_frames) # ^^ denominator in all of these expressions... mle_mean = (self.sum_t + self.sum_t_dt) / two_N outer_means = np.outer(mle_mean, mle_mean) time_lag_corr = (self.corrs + self.corrs.T) / two_N timelag_corr_mat = time_lag_corr - outer_means self.timelag_corr_mat = timelag_corr_mat if self.calc_cov_mat: cov_mat = (self.corrs_lag0_t + self.corrs_lag0_t_dt) / two_N cov_mat -= np.outer(mle_mean, mle_mean) self.cov_mat = cov_mat return timelag_corr_mat, cov_mat return timelag_corr_mat def _sort(self): """ sort the eigenvectors by their eigenvalues. """ if self.vals is None: self.solve() ind = np.argsort(self.vals)[::-1] # in order of decreasing value self.vals = self.vals[ind] self.vecs = self.vecs[:, ind] self._sorted = True
[docs] def solve(self, pca_cutoff=0): """ Solve the eigenvalue problem. We can translate into the PCA space and remove directions that have zero variance. If there are directions with zero variance, then the tICA eigenvalues will be complex or greater than one. Parameters: ----------- pca_cutoff : float, optional pca_cutoff to throw out PCs with variance less than this cutoff. Default is zero, but you should really check your covariance matrix to see if you need this. """ if self.timelag_corr_mat is None or self.cov_mat is None: self.get_current_estimate() # should really add check if we're just doing PCA, but I # don't know why anyone would use this class to do PCA... # maybe I should just remove that ability... if pca_cutoff <= 0: lhs = self.timelag_corr_mat rhs = self.cov_mat else: pca_vals, pca_vecs = np.linalg.eigh(self.cov_mat) good_ind = np.where(pca_vals > pca_cutoff)[0] pca_vals = pca_vals[good_ind] pca_vecs = pca_vecs[:, good_ind] lhs = pca_vecs.T.dot(self.timelag_corr_mat).dot(pca_vecs) rhs = pca_vecs.T.dot(self.cov_mat).dot(pca_vecs) vals, vecs = scipy.linalg.eig(lhs, b=rhs) if pca_cutoff <= 0: self.vals = vals self.vecs = vecs else: self.vals = vals self.vecs = pca_vecs.dot(vecs) if np.abs(self.vals.imag).max() > 1E-10: logger.warn("you have non-real eigenvalues. This usually means " "you need to throw out some coordinates by doing tICA " "in PCA space.") else: self.vals = self.vals.real if np.abs(self.vecs.imag).max() > 1E-10: logger.warn("you have non-real eigenvector entries...") else: self.vecs = self.vecs.real self._sort()
[docs] def project(self, trajectory=None, prep_trajectory=None, which=None): """ project a trajectory (or prepared trajectory) onto a subset of the tICA eigenvectors. Parameters: ----------- trajectory : mdtraj.Trajectory, optional trajectory object (can also pass a prepared trajectory instead) prep_trajectory : np.ndarray, optional prepared trajectory which : np.ndarray which eigenvectors to project onto Returns: -------- proj_trajectory : np.ndarray projected trajectory (n_points, n_tICs) """ if not self._sorted: self._sort() if prep_trajectory is None: if trajectory is None: raise Exception("must pass one of trajectory or prep_trajectory") prep_trajectory = self.prep_metric.prepare_trajectory(trajectory) if which is None: raise Exception("must pass 'which' to indicate which tICs to project onto") which = np.array(which).flatten().astype(int) proj_trajectory = prep_trajectory.dot(self.vecs[:, which]) return proj_trajectory
[docs] def save(self, output): """ save the results to file Parameters: ----------- output : str output filename (.h5) """ # Serialize metric used to calculate tICA input. metric_string = cPickle.dumps(self.prep_metric) io.saveh(output, timelag_corr_mat=self.timelag_corr_mat, cov_mat=self.cov_mat, lag=np.array([self.lag]), vals=self.vals, vecs=self.vecs, metric_string=np.array([metric_string]))
@classmethod def load(cls, tica_fn): """ load a tICA solution to use in projecting data. Parameters: ----------- tica_fn : str filename pointing to tICA solutions """ # the only variables we need to save are the two matrices # and the eigenvectors / values as well as the lag time logger.warn("NOTE: You can only use the tICA solution, you will " "not be able to continue adding data") f = io.loadh(tica_fn) metric = cPickle.loads(f["metric_string"][0]) tica_obj = cls(f['lag'][0], prep_metric=metric) # lag entry is an array... with a single item tica_obj.timelag_corr_mat = f['timelag_corr_mat'] tica_obj.cov_mat = f['cov_mat'] tica_obj.vals = f['vals'] tica_obj.vecs = f['vecs'] tica_obj._sort() return tica_obj
Versions