Source code for msmbuilder.MSMLib

# This file is part of MSMBuilder.
#
# Copyright 2011 Stanford University
#
# MSMBuilder is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

"""Classes and functions for working with Transition and Count Matrices.

Notes

* Assignments typically refer to a numpy array of integers such that Assignments[i,j] gives the state of trajectory i, frame j.
* Transition and Count matrices are typically stored in scipy.sparse.csr_matrix format.

MSMLib functions generally relate to one of the following

* Counting the number of transitions observed in Assignment data--e.g., constructing a Count matrix.
* Constructing a transition matrix from a count matrix.
* Performing calculations with Assignments, Counts matrices, or transition matrices.

.. currentmodule:: msmbuilder.MSMLib
.. autosummary::
    :toctree: generated/

    apply_mapping_to_assignments
    apply_mapping_to_vector
    build_msm
    ergodic_trim
    ergodic_trim_indices
    estimate_rate_matrix
    estimate_transition_matrix
    get_count_matrix_from_assignments
    get_counts_from_traj
    invert_assignments
    log_likelihood
    mle_reversible_count_matrix
    permute_mat
    renumber_states
    tarjan
    trim_states
"""
from __future__ import print_function, absolute_import, division
from mdtraj.utils.six import iteritems

import scipy.sparse
import scipy.linalg
import scipy
import numpy as np
import scipy.optimize
from collections import defaultdict
from msmbuilder.utils import deprecated, check_assignment_array_input
from msmbuilder import msm_analysis
import logging
logger = logging.getLogger(__name__)
__all__ = ['apply_mapping_to_assignments', 'apply_mapping_to_vector', 'build_msm',
           'ergodic_trim', 'ergodic_trim_indices',  'estimate_rate_matrix',
           'estimate_transition_matrix',  'get_count_matrix_from_assignments',
           'get_counts_from_traj',  'invert_assignments',  'log_likelihood',
           'mle_reversible_count_matrix', 'permute_mat',
           'renumber_states', 'tarjan', 'trim_states']


[docs]def estimate_rate_matrix(count_matrix, assignments): """MLE Rate Matrix given transition counts and *dwell times* Parameters ---------- count_matrix : sparse or dense matrix transition counts assignments : ndarray 2D assignments array used to compute average dwell times Returns ------- K : csr_matrix Rate matrix Notes ----- The *correct* likelihood function to use for estimating the rate matrix when the data is sampled at a discrete frequency is open for debate. This likelihood function doesn't take into account the error in the lifetime estimates from the discrete sampling. Other methods are currently under development (RTM 6/27) See Also -------- estimate_transition_matrix References ---------- .. [1] Buchete NV, Hummer G. "Coarse master equaions for peptide folding dynamics." J Phys Chem B 112:6057-6069. """ # Find the estimated dwell times (need to deal w negative ones) neg_ind = np.where(assignments == -1) n = np.max(assignments.flatten()) + 1 assignments[neg_ind] = n R = np.bincount(assignments.flatten()) R = R[:n] assert count_matrix.shape[0] == R.shape[0] # Most Likely Estimator ( Kij(hat) = Nij / Ri ) if scipy.sparse.isspmatrix(count_matrix): C = scipy.sparse.csr_matrix(count_matrix).asfptype() D = scipy.sparse.dia_matrix((1.0 / R, 0), C.shape).tocsr() K = D * C # if all is sparse is matrix multiply, formerly: D.dot( C ) else: # deprecated due to laziness --TJL raise ValueError("ERROR! Pass sparse matrix to me") # Now get the diagonals right. They should be negative row sums row_sums = np.asarray(C.sum(axis=1)).flatten() current = K.diagonal() S = scipy.sparse.dia_matrix(((row_sums + (2.0 * current)), 0), C.shape).tocsr() K = K - S if not K.shape == count_matrix.shape: raise RuntimeError('Bad news bears') # assert K.sum(0).all() == np.zeros(K.shape[0]).all(), K.sum(0).all() return K
[docs]def estimate_transition_matrix(count_matrix): """ Simple Maximum Likelihood estimator of transition matrix. Parameters ---------- count_matrix : array or sparse matrix A square matrix of transition counts Returns ------- tProb : array or sparse matrix Most likely transition matrix given `tCount` """ # 1. Make sure you don't modify tCounts. # 2. Make sure you handle both floats and ints eps = np.finfo(np.float32).tiny if scipy.sparse.isspmatrix(count_matrix): C = scipy.sparse.csr_matrix(count_matrix).asfptype() weights = np.asarray(C.sum(axis=1)).flatten() inv_weights = np.zeros(len(weights)) inv_weights[weights > eps] = 1.0 / weights[weights > eps] D = scipy.sparse.dia_matrix((inv_weights, 0), C.shape).tocsr() tProb = D.dot(C) else: tProb = np.asarray(count_matrix.astype(float)) # astype creates a copy weights = tProb.sum(axis=1) inv_weights = np.zeros(len(weights)) inv_weights[weights > eps] = 1.0 / weights[weights > eps] tProb = tProb * inv_weights.reshape((weights.shape[0], 1)) return tProb
[docs]def build_msm(counts, symmetrize='MLE', ergodic_trimming=True): """ Estimates the transition probability matrix from the counts matrix. Parameters ---------- counts : scipy.sparse.csr_matrix the MSM counts matrix symmetrize : {'MLE', 'Transpose', None} symmetrization scheme so that we have reversible counts ergodic_trim : bool (optional) whether or not to trim states to achieve an ergodic model Returns ------- rev_counts : matrix the estimate of the reversible counts t_matrix : matrix the transition probability matrix populations : ndarray, float the equilibrium populations of each state mapping : ndarray, int a mapping from the passed counts matrix to the new counts and transition matrices """ symmetrize = str(symmetrize).lower() symmetrization_error = ValueError("Invalid symmetrization scheme " "requested: %s. Exiting." % symmetrize) if symmetrize not in ['mle', 'transpose', 'none']: raise symmetrization_error if ergodic_trimming: counts, mapping = ergodic_trim(counts) else: mapping = np.arange(counts.shape[0]) # Apply a symmetrization scheme if symmetrize == 'mle': if not ergodic_trimming: raise ValueError("MLE symmetrization requires ergodic trimming.") rev_counts = mle_reversible_count_matrix(counts) elif symmetrize == 'transpose': rev_counts = 0.5 * (counts + counts.transpose()) elif symmetrize == 'none': rev_counts = counts else: raise symmetrization_error t_matrix = estimate_transition_matrix(rev_counts) if symmetrize in ['mle', 'transpose']: populations = np.array(rev_counts.sum(0)).flatten() elif symmetrize == 'none': vectors = msm_analysis.get_eigenvectors(t_matrix, 5)[1] populations = vectors[:, 0] else: raise symmetrization_error populations /= populations.sum() # ensure normalization return rev_counts, t_matrix, populations, mapping
[docs]def get_count_matrix_from_assignments(assignments, n_states=None, lag_time=1, sliding_window=True): """ Calculate counts matrix from `assignments`. Parameters ---------- assignments : ndarray A 2d ndarray containing the state assignments. n_states : int, optional Can be automatically determined, unless you want a model with more states than are observed lag_time: int, optional the LagTime with which to estimate the count matrix. Default: 1 sliding_window: bool, optional Use a sliding window. Default: True Returns ------- counts : sparse matrix `Counts[i,j]` stores the number of times in the assignments that a trajectory went from state i to state j in `LagTime` frames Notes ----- assignments are input as iterables over numpy 1-d arrays of integers. For example a 2-d array where assignments[i,j] gives the ith trajectory, jth frame. The beginning and end of each trajectory may be padded with negative ones, which will be ignored. If the number of states is not given explitly, it will be determined as one plus the largest state index of the Assignments. Sliding window yields non-independent samples, but wastes less data. """ check_assignment_array_input(assignments) if not n_states: # Lutz: a single np.max is not enough, b/c it can't handle a list of 1-d arrays of different lengths n_states = 1 + int(np.max([np.max(a) for a in assignments])) if n_states < 1: raise ValueError() # Lutz: why are we using float for count matrices? C = scipy.sparse.lil_matrix((int(n_states), int(n_states)), dtype='float32') for A in assignments: FirstEntry = np.where(A != -1)[0] # New Code by KAB to skip pre-padded negative ones. # This should solve issues with Tarjan trimming results. if len(FirstEntry) >= 1: FirstEntry = FirstEntry[0] A = A[FirstEntry:] # .tolil() C = C + get_counts_from_traj(A, n_states, lag_time=lag_time, sliding_window=sliding_window) return C
[docs]def get_counts_from_traj(states, n_states=None, lag_time=1, sliding_window=True): """Computes the transition count matrix for a sequence of states (single trajectory). Parameters ---------- states : array A one-dimensional array of integers representing the sequence of states. These integers must be in the range [0, n_states] n_states : int The total number of states. If not specified, the largest integer in the states array plus one will be used. lag_time : int, optional The time delay over which transitions are counted sliding_window : bool, optional Use sliding window Returns ------- C : sparse matrix of integers The computed transition count matrix """ check_assignment_array_input(states, ndim=1) if not n_states: n_states = np.max(states) + 1 if sliding_window: from_states = states[: -lag_time: 1] to_states = states[lag_time:: 1] else: from_states = states[: -lag_time: lag_time] to_states = states[lag_time:: lag_time] assert from_states.shape == to_states.shape transitions = np.row_stack((from_states, to_states)) counts = np.ones(transitions.shape[1], dtype=int) try: C = scipy.sparse.coo_matrix((counts, transitions), shape=(n_states, n_states)) except ValueError: # Lutz: if we arrive here, there was probably a state with index -1 # we try to fix it by ignoring transitions in and out of those states # (we set both the count and the indices for those transitions to 0) mask = transitions < 0 counts[mask[0, :] | mask[1, :]] = 0 transitions[mask] = 0 C = scipy.sparse.coo_matrix((counts, transitions), shape=(n_states, n_states)) return C
[docs]def apply_mapping_to_assignments(assignments, mapping): """Remap the states in an assignments file according to a mapping. Parameters ---------- assignments : ndarray Standard 2D assignments array mapping : ndarray 1D numpy array of length equal to the number of states in Assignments. Mapping[a] = b means that the frames currently in state a will be mapped to state b Returns ------- NewAssignments : ndarray Notes ----- This function is useful after performing PCCA or Ergodic Trimming. Also, the state -1 is treated specially -- it always stays -1 and is not remapped. """ check_assignment_array_input(assignments) NewMapping = mapping.copy() # Make a special state for things that get deleted by Ergodic Trimming. NewMapping[np.where(mapping == -1)] = mapping.max() + 1 NegativeOneStates = np.where(assignments == -1) assignments[:] = NewMapping[assignments] WhereEliminatedStates = np.where(assignments == (mapping.max() + 1)) # These are the dangling 'tails' of trajectories (with no actual data) that we denote state -1. assignments[NegativeOneStates] = -1 # These states have typically been "deleted" by the ergodic trimming # algorithm. Can be at beginning or end of trajectory. assignments[WhereEliminatedStates] = -1
[docs]def invert_assignments(assignments): """Invert an assignments array -- that is, produce a mapping from state -> traj/frame Parameters ---------- assignments : np.ndarray 2D array of MSMBuilder assignments Returns ------- inverse_mapping : collections.defaultdict Mapping from state -> traj,frame, such that inverse_mapping[s] gives the conformations assigned to state s. Notes ----- The assignments array may have -1's, which are simply placeholders we do not add these to the inverted assignments. Therefore, doing the following will raise a KeyError: >>> inv_assignments = MSMLib.invert_assignments(assignments) >>> print inv_assignments[-1] KeyError: -1 """ check_assignment_array_input(assignments) inverse_mapping = defaultdict(lambda: ([], [])) non_neg_inds = np.array(np.where(assignments != -1)).T # we do not care about -1's for (i, j) in non_neg_inds: inverse_mapping[assignments[i, j]][0].append(i) inverse_mapping[assignments[i, j]][1].append(j) # convert from lists to numpy arrays for key, (trajs, frames) in iteritems(inverse_mapping): inverse_mapping[key] = (np.array(trajs), np.array(frames)) return inverse_mapping
[docs]def apply_mapping_to_vector(vector, mapping): """ Remap an observable vector after ergodic trimming RTM 6/27: I don't think this function is really doing what it should. It does a reordering, but when the mapping is a many->one, don't you really want to average things together or something? TJL 7/1: That's true. I wrote this with only the ergodic trimming in mind, it needs to be updated if it's going to work w/PCCA as well... Parameters ---------- vector : ndarray 1D. Some observable value associated with each states Mapping : ndarray 1D numpy array of length equal to the number of states in Assignments. Mapping[a] = b means that the frames currently in state a are now assigned to state b, and thus their observable should be too Returns ------- new_vector : ndarray mapped observable values Notes ----- The state -1 is treated specially -- it always stays -1 and is not remapped. """ new_vector = vector[np.where(mapping != -1)[0]] logger.info("Mapping %d elements --> %d", len(vector), len(new_vector)) return new_vector
[docs]def renumber_states(assignments): """Renumber states to be consecutive integers (0, 1, ... , n), performs this transformation in place. Parameters ---------- assignments : ndarray 2D array of msmbuilder assignments Returns ------- mapping : ndarray, int A mapping from the old numbering scheme to the new, such that mapping[new] = old Notes ----- Useful if some states have 0 counts. """ check_assignment_array_input(assignments) unique = list(np.unique(assignments)) if unique[0] == -1: minus_one = np.where(assignments == -1) unique.pop(0) else: minus_one = [] inverse_mapping = invert_assignments(assignments) for i, x in enumerate(unique): assignments[inverse_mapping[x]] = i assignments[minus_one] = -1 mapping = np.array(unique, dtype=int) return mapping
@deprecated(scipy.sparse.csgraph.connected_components, '2.8')
[docs]def tarjan(graph): """Find the strongly connected components in a graph using Tarjan's algorithm. Parameters ---------- graph : scipy.sparse.csr_matrix mapping from node names to lists of successor nodes. Returns ------- components : list list of the strongly connected components Notes ----- Code based on ActiveState code by Josiah Carlson (New BSD license). Most users will want to call the ErgodicTrim() function rather than directly calling Tarjan(). See Also -------- ErgodicTrim """ n_states = graph.shape[0] # Keeping track of recursion state info by node Nodes = np.arange(n_states) NodeNums = [None for i in range(n_states)] NodeRoots = np.arange(n_states) NodeVisited = [False for i in range(n_states)] NodeHidden = [False for i in range(n_states)] NodeInComponent = [None for i in range(n_states)] stack = [] components = [] nodes_visit_order = [] graph.next_visit_num = 0 def visit(v): "Mark a state as visited" call_stack = [(1, v, graph.getrow(v).nonzero()[1], None)] while call_stack: tovisit, v, iterator, w = call_stack.pop() if tovisit: NodeVisited[v] = True nodes_visit_order.append(v) NodeNums[v] = graph.next_visit_num graph.next_visit_num += 1 stack.append(v) if w and not NodeInComponent[v]: NodeRoots[v] = nodes_visit_order[min(NodeNums[NodeRoots[v]], NodeNums[NodeRoots[w]])] cont = 0 for w in iterator: if not NodeVisited[w]: cont = 1 call_stack.append((0, v, iterator, w)) call_stack.append((1, w, graph.getrow(w).nonzero()[1], None)) break if not NodeInComponent[w]: NodeRoots[v] = nodes_visit_order[min(NodeNums[NodeRoots[v]], NodeNums[NodeRoots[w]])] if cont: continue if NodeRoots[v] == v: c = [] while 1: w = stack.pop() NodeInComponent[w] = c c.append(w) if w == v: break components.append(c) # the "main" routine for v in Nodes: if not NodeVisited[v]: visit(v) # extract SCC info for n in Nodes: if NodeInComponent[n] and len(NodeInComponent[n]) > 1: # part of SCC NodeHidden[n] = False else: # either not in a component, or singleton case NodeHidden[n] = True return(components)
[docs]def ergodic_trim(counts, assignments=None): """Use Tarjan's Algorithm to find maximal strongly connected subgraph. Parameters ---------- counts : sparse matrix (csr is best) transition counts assignments : ndarray, optional Optionally map assignments to the new states, nulling out disconnected regions. Returns ---- trimmed_counts : lil sparse matrix transition counts after ergodic trimming mapping : ndarray mapping[i] = j maps from untrimmed state index i to trimmed state index j, or mapping[i] = -1 if state i was trimmed Notes ----- The component with maximum number of counts is selected See Also -------- Tarjan """ states_to_trim = ergodic_trim_indices(counts) trimmed_counts = trim_states(states_to_trim, counts, assignments=assignments) # Hacky way to calculate the mapping of saved / discarded states. mapping = np.array([np.arange(counts.shape[0])]) # Need 2D shape for renumber_states. mapping[:, states_to_trim] = -1 renumber_states(mapping) # renumbers into contiguous order, in-place mapping = mapping[0] # Unpack the 2D array into a 1D array. return trimmed_counts, mapping
[docs]def ergodic_trim_indices(counts): """ Finds the indices of the largest strongly connected subgraph implied by the transitions in `counts`. Parameters ---------- counts : matrix The MSM counts matrix Returns ------- states_to_trim : ndarray, int A list of the state indices that should be trimmed to obtain the See Also -------- trim_states : func """ n_components, component_assignments = scipy.sparse.csgraph.connected_components(counts, connection="strong") populations_symmetrized = np.array(counts.sum(0)).flatten() component_pops = np.array([populations_symmetrized[component_assignments == i].sum() for i in range(n_components)]) which_component = component_pops.argmax() logger.info("Selected component %d with population %f", which_component, component_pops[which_component] / component_pops.sum()) states_to_trim = np.arange(counts.shape[0])[component_assignments != which_component] return states_to_trim
[docs]def trim_states(states_to_trim, counts, assignments=None): """ Performs the necessary operations to reduce an MSM to a subset of the original states -- effectively trimming those states out. Parameters ---------- states_to_trim : ndarray, int OR list of ints A list of indices of the states to trim counts : matrix A counts matrix assignments : ndarray, int (optional) An assignments array Returns ------- trimmed_counts : matrix The trimmed counts matrix trimmed_assignments : ndarray (if assignments are provided) The assignements, with values for "trimmed" states set to "-1", which is read as an empty value by MSMBuilder """ # Trim the counts matrix by simply deleting the appropriate rows & columns. # Switching to lil format makes it easy to delete rows directly -- # maybe not most efficient, but easy to understand (and shouldn't be # a bottleneck) counts = counts.tolil() ndel = len(states_to_trim) # delete rows counts.rows = np.delete(counts.rows, states_to_trim) counts.data = np.delete(counts.data, states_to_trim) counts._shape = (counts._shape[0] - ndel, counts._shape[1]) # delete cols counts = counts.T counts.rows = np.delete(counts.rows, states_to_trim) counts.data = np.delete(counts.data, states_to_trim) counts._shape = (counts._shape[0] - ndel, counts._shape[1]) counts = counts.T if assignments is not None: # Use the ORIGINAL number of states! Re bug #300 mapping = np.arange(counts.shape[0] + ndel) mapping[states_to_trim] = -1 # renumber_states requires rank 2 input, not rank 1. Re bug #300 mapping = np.array([mapping]) renumber_states(mapping) # renumbers into contiguous order, in-place mapping = mapping[0] # Unpack the 2D array into a 1D array. Re bug #300 trimmed_assignments = assignments.copy() apply_mapping_to_assignments(trimmed_assignments, mapping) # in-place return counts, trimmed_assignments else: return counts
[docs]def log_likelihood(count_matrix, transition_matrix): """log of the likelihood of an observed count matrix given a transition matrix Parameters ---------- count_matrix : ndarray or sparse matrix Transition count matrix. transition_matrix : ndarray or sparse matrix Transition probability matrix. Returns ------- loglikelihood : float The natural log of the likelihood, computed as :math:`\sum_{ij} C_{ij} \log(P_{ij})` """ if isinstance(transition_matrix, np.ndarray) and isinstance(count_matrix, np.ndarray): # make sure that both count_matrix and transition_matrix are arrays count_matrix = np.asarray(count_matrix) # (not dense matrices), so we can use element-wise multiplication transition_matrix = np.asarray(transition_matrix) mask = count_matrix > 0 return np.sum(np.log(transition_matrix[mask]) * count_matrix[mask]) else: # make sure both count_matrix and transition_matrix are sparse CSR matrices if not scipy.sparse.isspmatrix(count_matrix): count_matrix = scipy.sparse.csr_matrix(count_matrix) else: count_matrix = count_matrix.tocsr() if not scipy.sparse.isspmatrix(transition_matrix): transition_matrix = scipy.sparse.csr_matrix(transition_matrix) else: transition_matrix = transition_matrix.tocsr() row, col = count_matrix.nonzero() return np.sum(np.log(np.asarray(transition_matrix[row, col])) * np.asarray(count_matrix[row, col])) # Lutz's MLE code works but has occasional convergence issues. We use # this code as a reference to unit test our more recent MLE code.
def __mle_reversible_count_matrix_lutz__(count_matrix, prior=0.0, initial_guess=None): """Calculates the maximum-likelihood symmetric count matrix for a givnen observed count matrix. This function uses a Newton conjugate-gradient algorithm to maximize the likelihood of a reversible transition probability matrix. Parameters ---------- count_matrix : array or sparse matrix Transition count matrix. prior : float If not zero, add this value to the count matrix for every transition that has occured in either direction. initial_guess : array or sparse matrix Initial guess for the symmetric count matrix uses as starting point for likelihood maximization. If None, the naive symmetrized guess 0.5*(C+C.T) is used. Returns ------- reversible_counts : array or sparse matrix Symmetric count matrix. If C is sparse then the returned matrix is also sparse, and dense otherwise. """ C = count_matrix def negativeLogLikelihoodFromCountEstimatesSparse(Xupdata, row, col, N, C): """Calculates the negative log likelihood that a symmetric count matrix X gave rise to an observed transition count matrix C, as well as the gradient d -log L / d X_ij.""" assert np.alltrue(Xupdata > 0) Xup = scipy.sparse.csr_matrix( (Xupdata, (row, col)), shape=(N, N)) # Xup is the upper triagonal (inluding the main diagonal) of the symmetric count matrix # X is the complete symmetric count matrix X = Xup + Xup.T - scipy.sparse.spdiags(Xup.diagonal(), 0, Xup.shape[0], Xup.shape[1]) # Xs is the array of row sums of X: Xs_i = sum_j X_ij Xs = np.array(X.sum(axis=1)).ravel() XsInv = scipy.sparse.spdiags(1.0 / Xs, 0, len(Xs), len(Xs)) # P is now the matrix P_ij = X_ij / sum_j X_ij P = (XsInv * X).tocsr() logP = scipy.sparse.csr_matrix((np.log(P.data), P.indices, P.indptr)) # logL is the log of the likelihood: sum_ij C_ij log(X_ij / Xs_i) logL = np.sum(C.multiply(logP).data) # Cs is the array of row sums of C: Cs_i = sum_j C_ij Cs = np.array(C.sum(axis=1)).ravel() # remember the postitions of the non-zero elements of X srow, scol = X.nonzero() Udata = np.array( (C[srow, scol] / X[srow, scol]) - (Cs[srow] / Xs[srow])).ravel() # calculate the derivative: d(log L)/dX_ij = C_ij/X_ij - Cs_i/Xs_i # U is the matrix U_ij = d(log L) / dX_ij U = scipy.sparse.csr_matrix((Udata, (srow, scol)), shape=(N, N)) # so far, we have assumed that all the partial derivatives wrt. X_ij are independent # however, the degrees of freedom are only X_ij for i <= j # for i != j, the total change in log L is d(log L)/dX_ij + d(log L)/dX_ji gradient = (U + U.T - scipy.sparse.spdiags(U.diagonal(), 0, U.shape[0], U.shape[1])).tocsr() # now we have to convert the non-zero elements of the upper triangle into the # same 1-d array structure that was used for Xupdata gradient = np.array(gradient[row, col]).reshape(-1) # print "max g:", np.max(gradient), "min g:", np.min(gradient), "|g|^2", # (gradient*gradient).sum(), "g * X", (gradient*Xupdata).sum() return -logL, -gradient # current implementation only for sparse matrices # if given a dense matrix, sparsify it, and turn the result back to a dense array if not scipy.sparse.isspmatrix(C): return __mle_reversible_count_matrix_lutz__(scipy.sparse.csr_matrix(C), prior=prior, initial_guess=initial_guess).toarray() N = C.shape[0] if not C.shape[1] == N: raise ValueError("Count matrix is not square, but has shape %s" % C.shape) C = C.tocsr() C.eliminate_zeros() # add prior if necessary if (prior is not None) and (prior != 0): PriorMatrix = (C + C.transpose()).tocsr() PriorMatrix.data *= 0. PriorMatrix.data += prior C = C + PriorMatrix logger.warning("Added prior value of %f to count matrix", prior) # initial guess for symmetric count matrix if initial_guess is None: X0 = 0.5 * (C + C.T) else: # this guarantees that the initial guess is indeed symmetric (and sparse) X0 = scipy.sparse.csr_matrix(0.5 * (initial_guess + initial_guess.T)) initialLogLikelihood = log_likelihood(C, estimate_transition_matrix(X0)) # due to symmetry, we degrees of freedom for minimization are only the # elments in the upper triangle of the matrix X (incl. main diagonal) X0up = scipy.sparse.triu(X0).tocoo() row = X0up.row col = X0up.col # the variables used during minimization are those X_ij (i <= j) for which either C_ij or C_ji is greater than zero # those X_ij can be arbitrariliy small, but they must be positive # the function minimizer requires an inclusive bound, so we use some very small number instead of zero # (without loss of generality, b/c we can always multiply all X_ij by some large number without changing the likelihood) lower_bound = 1.E-10 bounds = [[lower_bound, np.inf]] * len(X0up.data) # Here comes the main loop # In principle, we would have to run the function minimizer only once. But in practice, minimization may fail # if the gradient term becomes too large, or minimization is slow if the gradient is too small. # Every so often, we therefore rescale the parameters X_ij so that the gradient is of resonable magnitude # (which does not affect the likelihood). This empirical procedure includes two parameters: the rescaling # frequency and the target value. In principles, these choices should not # affect the outcome of the maximization. rescale_every = 500 rescale_target = 1. Xupdata = X0up.data maximizationrun = 1 totalnumberoffunctionevaluations = 0 negative_logL, negative_gradient = negativeLogLikelihoodFromCountEstimatesSparse( Xupdata, row, col, N, C) logger.info("Log-Likelihood of intial guess for reversible transition " "probability matrix: %s", -negative_logL) while maximizationrun <= 1000: # rescale the X_ij so that the magnitude of the gradient is 1 gtg = (negative_gradient * negative_gradient).sum() scalefactor = np.sqrt(gtg / rescale_target) Xupdata[:] *= scalefactor # now run the minimizer Xupdata, nfeval, rc = scipy.optimize.fmin_tnc(negativeLogLikelihoodFromCountEstimatesSparse, Xupdata, args=(row, col, N, C), bounds=bounds, approx_grad=False, maxfun=rescale_every, disp=0, xtol=1E-20) totalnumberoffunctionevaluations += nfeval negative_logL, negative_gradient = negativeLogLikelihoodFromCountEstimatesSparse( Xupdata, row, col, N, C) logger.info("Log-Likelihood after %s function evaluations; %s ", totalnumberoffunctionevaluations, -negative_logL) if rc in (0, 1, 2): break # Converged elif rc in (3, 4): pass # Not converged, keep going else: raise RuntimeError("Likelihood maximization caused internal error (code %s): %s" % ( rc, scipy.optimize.tnc.RCSTRINGS[rc])) maximizationrun += 1 else: logger.error("maximum could not be obtained.") logger.info("Result of last maximization run (run %s): %s", str( maximizationrun), scipy.optimize.tnc.RCSTRINGS[rc]) Xup = scipy.sparse.coo_matrix((Xupdata, (row, col)), shape=(N, N)) # reconstruct full symmetric matrix from upper triangle part X = Xup + Xup.T - scipy.sparse.spdiags(Xup.diagonal(), 0, Xup.shape[0], Xup.shape[1]) finalLogLikelihood = log_likelihood(C, estimate_transition_matrix(X)) logger.info( "Log-Likelihood of final reversible transition probability matrix: %s", finalLogLikelihood) logger.info("Likelihood ratio: %s", np.exp(finalLogLikelihood - initialLogLikelihood)) # some basic consistency checks if not np.alltrue(np.isfinite(X.data)): raise RuntimeError("The obtained symmetrized count matrix is not finite.") if not np.alltrue(X.data > 0): raise RuntimeError( "The obtained symmetrized count matrix is not strictly positive for all observed transitions, the smallest element is %s" % str(np.min(X.data))) # normalize X to have correct total number of counts X /= X.sum() X *= C.sum() return X
[docs]def permute_mat(A, permutation): """ Permutes the indices of a transition probability matrix. This functions simply switches the lables of `A` rows and columns from [0, 1, 2, ...] to `permutation`. Parameters ---------- tprob : matrix permutation: ndarray, int The permutation array, a list of unique indices that Returns ------- permuted_A : matrix The permuted matrix """ if scipy.sparse.issparse(A): sparse = True else: sparse = False if sparse: Pi = scipy.sparse.lil_matrix(A.shape) for i in range(A.shape[0]): Pi[i, permutation[i]] = 1.0 # send i -> correct place permuted_A = Pi * A * Pi.T else: Pi = np.zeros(A.shape) for i in range(A.shape[0]): Pi[i, permutation[i]] = 1.0 # send i -> correct place permuted_A = np.dot(Pi, np.dot(A, Pi.T)) return permuted_A
[docs]def mle_reversible_count_matrix(count_matrix): """Maximum likelihood estimate for a reversible count matrix Parameters ---------- counts : scipy.sparse.csr_matrix sparse matrix of transition counts (raw, not symmetrized) Returns ------- X : scipy.sparse.csr_matrix Returns the MLE reversible (symmetric) counts matrix Notes ----- This function can be used to find the maximum likelihood reversible count matrix. See Docs/notes/mle_notes.pdf for details on the math used during these calculations. """ mle = __Reversible_MLE_Estimator__(count_matrix) return mle.optimize() # This class is hidden: you should use the helper function instead.
class __Reversible_MLE_Estimator__(): def __init__(self, counts): """Construct class to maximize likelihood of reversible count matrix. Parameters ---------- counts : scipy.sparse.csr_matrix sparse matrix of transition counts (raw, not symmetrized) Notes ----- This object can be used to find the maximum likelihood reversible count matrix. See Docs/notes/mle_notes.pdf for details on the math used during these calculations. This class is hidden: you should use the helper function instead. """ c = counts.asformat("csr").asfptype() c.eliminate_zeros() self.counts = c self.row_sums = np.array(c.sum(1)).flatten() # sym_counts is a sparse matrix with the symmetrized counts--e.g. the # "full" symmetric matrix. sym_i and sym_j are the nonzero indices of # sym_counts self.sym_counts = c + c.transpose() self.sym_row_indices, self.sym_col_indices = self.sym_counts.nonzero() self.sym_upper_ind = np.where(self.sym_row_indices < self.sym_col_indices)[0] self.sym_lower_ind = np.where(self.sym_row_indices >= self.sym_col_indices)[0] # This will be used to calculate the log likelihood. Avoids repeated # copying of sparse matrices. self.temporary_sym_counts = self.sym_counts.copy() # partial_counts is a sparse matrix with the lower triangle (including # diagonal) of the symmetrized counts self.partial_counts = self.sym_counts.copy() self.partial_counts.data[self.sym_upper_ind] = 0. self.partial_counts.eliminate_zeros() self.partial_row_indices, self.partial_col_indices = self.partial_counts.nonzero() self.partial_diag_indices = np.where( self.partial_row_indices == self.partial_col_indices)[0] self.construct_upper_mapping() self.stencil = self.partial_counts.copy() self.stencil.data[:] = 1. self.sym_diag_indices = np.where(self.sym_row_indices == self.sym_col_indices)[0] def construct_upper_mapping(self): """Construct a mapping (self.partial_upper_mapping) that maps elements from the data vector to the full sparse symmetric matrix. Notes ----- Use the following mappings to go between the data vector (log_vector) and the full sparse symmetric matrix (X): X.data[self.sym_lower_ind] = np.exp(log_vector) X.data[self.sym_upper_ind] = np.exp(log_vector)[self.partial_upper_mapping] """ partial_ij_to_data = {} for k, i in enumerate(self.partial_row_indices): j = self.partial_col_indices[k] partial_ij_to_data[i, j] = k self.partial_upper_mapping = np.zeros(len(self.sym_upper_ind), 'int') for k0, k in enumerate(self.sym_upper_ind): i0, j0 = self.sym_row_indices[k], self.sym_col_indices[k] k1 = partial_ij_to_data[j0, i0] self.partial_upper_mapping[k0] = k1 def log_vector_to_matrix(self, log_vector): """Construct sparse matrix from vector of parameters. Parameters ---------- log_vector: np.ndarray log_vector contains the log of all nonzero matrix entries on the lower triangle (including the diagonal) """ x = self.temporary_sym_counts x.data[self.sym_lower_ind] = np.exp(log_vector) x.data[self.sym_upper_ind] = np.exp(log_vector)[self.partial_upper_mapping] return x def flatten_matrix(self, counts): """Extract lower triangle from an arbitrary sparse CSR matrix. Parameters ---------- counts : scipy.sparse.csr_matrix Returns ------- data : np.ndarray The nonzero entries on the lower triangle (including diagonal) """ counts = self.stencil.multiply(counts) counts = counts + self.stencil data = counts.data - 1. return data def matrix_to_log_vector(self, counts): """Construct vector of parameters from sparse matrix Parameters ---------- counts : scipy.sparse.csr_matrix C must be symmetric. Returns ------- data : np.ndarray The log of all nonzero entries on the lower triangle (including diagonal). """ data = self.flatten_matrix(counts) return np.log(data) def log_likelihood(self, log_vector): """Calculate log likelihood of log_vector given the observed counts Parameters ---------- log_vector : np.ndarray log_vector contains the log of all nonzero matrix entries Notes ----- The counts used are those input during construction of this class. This calculation is based on Eqn. 4 in Docs/notes/mle_notes.pdf """ f = (log_vector * self.partial_counts.data).sum() f -= (log_vector * self.partial_counts.data)[self.partial_diag_indices].sum() * 0.5 r = self.log_vector_to_matrix(log_vector) q = np.array(r.sum(0)).flatten() f -= np.log(q).dot(self.row_sums) return f def dlog_likelihood(self, log_vector): """Log likelihood gradient at log_vector given the observed counts Parameters ---------- log_vector : np.ndarray log_vector contains the log of all nonzero matrix entries Returns ------- grad : np.ndarray grad is the derivative of the log likelihood with respect to log_vector Notes ----- The counts used are those input during construction of this class. This calculation is based on Eqn. 5 in Docs/notes/mle_notes.pdf """ grad = 0.0 * log_vector grad += self.partial_counts.data grad[self.partial_diag_indices] -= 0.5 * self.partial_counts.data[self.partial_diag_indices] current_count_matrix = self.log_vector_to_matrix(log_vector) current_row_sums = np.array(current_count_matrix.sum(1)).flatten() v = self.row_sums / current_row_sums D = scipy.sparse.dia_matrix((v, 0), shape=current_count_matrix.shape) grad -= self.flatten_matrix(D.dot(current_count_matrix) + current_count_matrix.dot(D)) grad += self.flatten_matrix(D.multiply(current_count_matrix)) return grad def optimize(self): """Maximize the log_likelihood to find the MLE reversible counts. Returns ------- X : scipy.sparse.csr_matrix Returns the MLE reversible (symmetric) counts matrix Notes ----- This algorithm uses the symmetrized counts as an initial guess. """ log_vector = 1.0 * np.log(self.partial_counts.data) f = lambda x: -1 * self.log_likelihood(x) df = lambda x: -1 * self.dlog_likelihood(x) initial_log_likelihood = -1 * f(log_vector) parms, final_log_likelihood, info_dict = scipy.optimize.fmin_l_bfgs_b( f, log_vector, df, disp=0, factr=0.001, m=26) # m is the number of variable metric correcitons. m=26 seems to give ~15% speedup X = self.log_vector_to_matrix(parms) X *= (self.counts.sum() / X.sum()) final_log_likelihood *= -1 logger.info("BFGS likelihood maximization terminated after %d function calls. Initial and final log likelihoods: %f, %f." % (info_dict["funcalls"], initial_log_likelihood, final_log_likelihood)) if info_dict["warnflag"] != 0: logger.warning("Abnormal termination of BFGS likelihood maximization. Error code %d" % info_dict["warnflag"]) return X
Versions