from __future__ import print_function, absolute_import, division
from mdtraj.utils.six.moves import xrange
import os
import mdtraj as md
import numpy as np
import tables
import warnings
from mdtraj import io
import logging
logger = logging.getLogger(__name__)
def _setup_containers(project, assignments_fn, distances_fn):
"""
Setup the files on disk (Assignments.h5 and Assignments.h5.distances) that
results will be sent to.
Check to ensure that if they exist (and contain partial results), the
containers are not corrupted
Parameters
----------
project : msmbuilder.Project
The msmbuilder project file. Only the n_trajs and traj_lengths are
actully used.
assignments_fn : string
distances_fn : string
Returns
-------
f_assignments : tables.File
pytables handle to the assignments file, open in 'append' mode
f_distances : tables.File
pytables handle to the assignments file, open in 'append' mode
"""
def save_container(filename, dtype):
io.saveh(filename, arr_0=-1 * np.ones((project.n_trajs, np.max(project.traj_lengths)), dtype=dtype),
completed_trajs=np.zeros((project.n_trajs), dtype=np.bool))
def check_container(filename):
with warnings.catch_warnings():
warnings.simplefilter('ignore')
fh = tables.openFile(filename, 'r')
if fh.root.arr_0.shape != (project.n_trajs, np.max(project.traj_lengths)):
raise ValueError('Shape error 1')
if fh.root.completed_trajs.shape != (project.n_trajs,):
raise ValueError('Shape error 2')
fh.close()
# save assignments container
if (not os.path.exists(assignments_fn)) \
and (not os.path.exists(distances_fn)):
save_container(assignments_fn, np.int)
save_container(distances_fn, np.float32)
elif os.path.exists(assignments_fn) and os.path.exists(distances_fn):
check_container(assignments_fn)
check_container(distances_fn)
else:
raise ValueError("You're missing one of the containers")
# append mode is read and write
with warnings.catch_warnings():
warnings.simplefilter('ignore')
f_assignments = tables.openFile(assignments_fn, mode='a')
f_distances = tables.openFile(distances_fn, mode='a')
return f_assignments, f_distances
[docs]def assign_in_memory(metric, generators, project, atom_indices_to_load=None):
"""
Assign every frame to its closest generator
This code does everything in memory, and does not checkpoint. It also does
not save any results to disk.
Parameters
----------
metric : msmbuilder.metrics.AbstractDistanceMetric
A distance metric used to define "closest"
project : msmbuilder.Project
Used to load the trajectories
generators : msmbuilder.Trajectory
A trajectory containing the structures of all of the cluster centers
atom_indices_to_load : {None, list}
The indices of the atoms to load for each trajectory chunk. Note that
this method is responsible for loading up atoms from the project, but
does NOT load up the generators. Those are passed in as a trajectory
object (above). So if the generators are already subsampled to a restricted
set of atom indices, but the trajectories on disk are NOT, you'll
need to pass in a set of indices here to resolve the difference.
See Also
--------
assign_with_checkpoint
"""
n_trajs, max_traj_length = project.n_trajs, np.max(project.traj_lengths)
assignments = -1 * np.ones((n_trajs, max_traj_length), dtype='int')
distances = -1 * np.ones((n_trajs, max_traj_length), dtype='float32')
pgens = metric.prepare_trajectory(generators)
for i in xrange(n_trajs):
traj = project.load_traj(i, atom_indices=atom_indices_to_load)
if traj['XYZList'].shape[1] != generators['XYZList'].shape[1]:
raise ValueError('Number of atoms in generators does not match '
'traj we\'re trying to assign! Maybe check atom indices?')
ptraj = metric.prepare_trajectory(traj)
for j in xrange(len(traj)):
d = metric.one_to_all(ptraj, pgens, j)
assignments[i, j] = np.argmin(d)
distances[i, j] = d[assignments[i, j]]
return assignments, distances
[docs]def assign_with_checkpoint(metric, project, generators, assignments_path,
distances_path, chunk_size=10000, atom_indices_to_load=None):
"""
Assign every frame to its closest generator
The results will be checkpointed along the way, trajectory by trajectory.
If the process is killed, it should be able to roughly pick up where it
left off.
Parameters
----------
metric : msmbuilder.metrics.AbstractDistanceMetric
A distance metric used to define "closest"
project : msmbuilder.Project
Used to load the trajectories
generators : msmbuilder.Trajectory
A trajectory containing the structures of all of the cluster centers
assignments_path : str
Path to a file that contains/will contain the assignments, as a 2D array
of integers in hdf5 format
distances_path : str
Path to a file that contains/will contain the assignments, as a 2D array
of integers in hdf5 format
chunk_size : int
The number of frames to load and process per step. The optimal number
here depends on your system memory -- it should probably be roughly
the number of frames you can fit in memory at any one time. Note, this
is only important if your trajectories are long, as the effective chunk_size
is really `min(traj_length, chunk_size)`
atom_indices_to_load : {None, list}
The indices of the atoms to load for each trajectory chunk. Note that
this method is responsible for loading up atoms from the project, but
does NOT load up the generators. Those are passed in as a trajectory
object (above). So if the generators are already subsampled to a restricted
set of atom indices, but the trajectories on disk are NOT, you'll
need to pass in a set of indices here to resolve the difference.
See Also
--------
assign_in_memory
"""
pgens = metric.prepare_trajectory(generators)
# setup the file handles
fh_a, fh_d = _setup_containers(project, assignments_path, distances_path)
for i in xrange(project.n_trajs):
if fh_a.root.completed_trajs[i] and fh_d.root.completed_trajs[i]:
logger.info('Skipping trajectory %s -- already assigned', i)
continue
if fh_a.root.completed_trajs[i] or fh_d.root.completed_trajs[i]:
logger.warn("Re-assigning trajectory even though some data is"
" available...")
fh_a.root.completed_trajs[i] = False
fh_d.root.completed_trajs[i] = False
logger.info('Assigning trajectory %s', i)
# pointer to the position in the total trajectory where
# the current chunk starts, so we know where in the Assignments
# array to put each batch of data
start_index = 0
filename = project.traj_filename(i)
chunkiter = md.iterload(filename, chunk=chunk_size, atom_indices=atom_indices_to_load)
for tchunk in chunkiter:
if tchunk.n_atoms != generators.n_atoms:
msg = ("Number of atoms in generators does not match "
"traj we're trying to assign! Maybe check atom indices?")
raise ValueError(msg)
ptchunk = metric.prepare_trajectory(tchunk)
this_length = len(ptchunk)
distances = np.empty(this_length, dtype=np.float32)
assignments = np.empty(this_length, dtype=np.int)
for j in xrange(this_length):
d = metric.one_to_all(ptchunk, pgens, j)
ind = np.argmin(d)
assignments[j] = ind
distances[j] = d[ind]
end_index = start_index + this_length
fh_a.root.arr_0[i, start_index:end_index] = assignments
fh_d.root.arr_0[i, start_index:end_index] = distances
# i'm not sure exactly what the optimal flush frequency is
fh_a.flush()
fh_d.flush()
start_index = end_index
# we're going to keep duplicates of this record -- i.e. writing
# it to both files
# completed chunks are not checkpointed -- only completed trajectories
# this means that if the process dies after completing 10/20 of the
# chunks in trajectory i -- those chunks are going to have to be recomputed
# (put trajectory i-1 is saved)
# this could be changed, but the implementation is a little tricky -- you
# have to watch out for the fact that the person might call this function
# with chunk_size=N, let it run for a while, kill it, and then call it
# again with chunk_size != N. Dealing with that appropriately is tricky
# since the chunks wont line up in the two cases
fh_a.root.completed_trajs[i] = True
fh_d.root.completed_trajs[i] = True
fh_a.close()
fh_d.close()
def streaming_assign_with_checkpoint(metric, project, generators, assignments_path,
distances_path, checkpoint=1, chunk_size=10000, atom_indices_to_load=None):
warnings.warn(("assign_with_checkpoint now uses the steaming engine "
"-- this function is deprecated"), DeprecationWarning)
assign_with_checkpoint(metric, project, generators, assignments_path,
distances_path, chunk_size, atom_indices_to_load)