Source code for MDAnalysis.coordinates.TRZ

# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- https://www.mdanalysis.org
# Copyright (c) 2006-2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the Lesser GNU Public Licence, v2.1 or any higher version
#
# Please cite your use of MDAnalysis in published work:
#
# R. J. Gowers, M. Linke, J. Barnoud, T. J. E. Reddy, M. N. Melo, S. L. Seyler,
# D. L. Dotson, J. Domanski, S. Buchoux, I. M. Kenney, and O. Beckstein.
# MDAnalysis: A Python package for the rapid analysis of molecular dynamics
# simulations. In S. Benthall and S. Rostrup editors, Proceedings of the 15th
# Python in Science Conference, pages 102-109, Austin, TX, 2016. SciPy.
# doi: 10.25080/majora-629e541a-00e
#
# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
#
# TRZ Reader written by Richard J. Gowers (2013)

"""TRZ trajectory I/O  --- :mod:`MDAnalysis.coordinates.TRZ`
============================================================

Classes to read `IBIsCO`_ / `YASP`_ TRZ binary trajectories, including
coordinates, velocities and more (see attributes of the :class:`Timestep`).

Data are read and written in binary representation but because this depends on
the machine hardware architecture, MDAnalysis *always* reads and writes TRZ
trajectories in *little-endian* byte order.

.. _IBIsCO: https://onlinelibrary.wiley.com/doi/full/10.1002/jcc.21717
.. _YASP: https://www.sciencedirect.com/science/article/abs/pii/0010465593901442

Classes
-------

.. autoclass:: TRZReader
   :members:

.. autoclass:: TRZWriter
   :members:

"""
import warnings
import numpy as np
import os
import errno

from . import base
from ..exceptions import NoDataError
from .timestep import Timestep
from ..lib import util
from ..lib.util import cached, store_init_arguments
from .core import triclinic_box, triclinic_vectors


[docs]class TRZReader(base.ReaderBase): """Reads an IBIsCO or YASP trajectory file Attributes ---------- ts : timestep.Timestep :class:`~MDAnalysis.coordinates.timestep.Timestep` object containing coordinates of current frame Note ---- Binary TRZ trajectories are *always* assumed to be written in *little-endian* byte order and are read as such. .. versionchanged:: 0.11.0 Frames now 0-based instead of 1-based. Extra data (Temperature, Energies, Pressures, etc) now read into ts.data dictionary. Now passes a weakref of self to ts (ts._reader). .. versionchanged:: 1.0.1 Now checks for the correct `n_atoms` on reading and can raise :exc:`ValueError`. .. versionchanged:: 2.1.0 TRZReader now returns a default :attr:`dt` of 1.0 when it cannot be obtained from the difference between two frames. .. versionchanged:: 2.3.0 _frame attribute moved to `ts.data` dictionary. .. deprecated:: 2.7.0 The TRZ Reader and Writer are deprecated as of version 2.7.0 and will be removed in version 3.0.0. """ format = "TRZ" units = {"time": "ps", "length": "nm", "velocity": "nm/ps"} @store_init_arguments def __init__(self, trzfilename, n_atoms=None, **kwargs): """Creates a TRZ Reader Parameters ---------- trzfilename : str name of input file n_atoms : int number of atoms in trajectory, must be taken from topology file! convert_units : bool (optional) converts units to MDAnalysis defaults Raises ------ ValueError If `n_atoms` or the number of atoms in the topology file do not match the number of atoms in the trajectory. """ wmsg = ( "The TRZ reader is deprecated and will be removed in " "MDAnalysis version 3.0.0" ) warnings.warn(wmsg, DeprecationWarning) super(TRZReader, self).__init__(trzfilename, **kwargs) if n_atoms is None: raise ValueError("TRZReader requires the n_atoms keyword") self.trzfile = util.anyopen(self.filename, "rb") self._cache = dict() self._n_atoms = n_atoms self._read_trz_header() self.ts = Timestep( self.n_atoms, velocities=True, forces=self.has_force, reader=self, **self._ts_kwargs ) # structured dtype of a single trajectory frame readarg = str(n_atoms) + "<f4" frame_contents = [ ("p1", "<i4"), ("nframe", "<i4"), ("ntrj", "<i4"), ("natoms", "<i4"), ("treal", "<f8"), ("p2", "<2i4"), ("box", "<9f8"), ("p3", "<2i4"), ("pressure", "<f8"), ("ptensor", "<6f8"), ("p4", "<3i4"), ("etot", "<f8"), ("ptot", "<f8"), ("ek", "<f8"), ("T", "<f8"), ("p5", "<6i4"), ("rx", readarg), ("pad2", "<2i4"), ("ry", readarg), ("pad3", "<2i4"), ("rz", readarg), ("pad4", "<2i4"), ("vx", readarg), ("pad5", "<2i4"), ("vy", readarg), ("pad6", "<2i4"), ("vz", readarg), ] if not self.has_force: frame_contents += [("pad7", "<i4")] else: frame_contents += [ ("pad7", "<2i4"), ("fx", readarg), ("pad8", "<2i4"), ("fy", readarg), ("pad9", "<2i4"), ("fz", readarg), ("pad10", "<i4"), ] self._dtype = np.dtype(frame_contents) self._read_next_timestep() def _read_trz_header(self): """Reads the header of the trz trajectory""" self._headerdtype = np.dtype( [ ("p1", "<i4"), ("title", "80c"), ("p2", "<2i4"), ("force", "<i4"), ("p3", "<i4"), ] ) data = np.fromfile(self.trzfile, dtype=self._headerdtype, count=1) self.title = "".join(c.decode("utf-8") for c in data["title"][0]).strip() if data["force"] == 10: self.has_force = False elif data["force"] == 20: self.has_force = True else: raise IOError def _read_next_timestep(self, ts=None): if ts is None: ts = self.ts try: data = np.fromfile(self.trzfile, dtype=self._dtype, count=1) if data["natoms"][0] != self.n_atoms: raise ValueError( "Supplied n_atoms {} is incompatible " "with provided trajectory file. " "Maybe `topology` is wrong?".format(self.n_atoms) ) ts.frame = data["nframe"][0] - 1 # 0 based for MDA ts.data["frame"] = data["ntrj"][0] # moved from attr to data ts.time = data["treal"][0] ts.dimensions = triclinic_box(*(data["box"].reshape(3, 3))) ts.data["pressure"] = data["pressure"] ts.data["pressure_tensor"] = data["ptensor"] ts.data["total_energy"] = data["etot"] ts.data["potential_energy"] = data["ptot"] ts.data["kinetic_energy"] = data["ek"] ts.data["temperature"] = data["T"] ts._x[:] = data["rx"] ts._y[:] = data["ry"] ts._z[:] = data["rz"] ts._velocities[:, 0] = data["vx"] ts._velocities[:, 1] = data["vy"] ts._velocities[:, 2] = data["vz"] if self.has_force: ts._forces[:, 0] = data["fx"] ts._forces[:, 1] = data["fy"] ts._forces[:, 2] = data["fz"] except IndexError: # Raises indexerror if data has no data (EOF) raise IOError from None else: # Convert things read into MDAnalysis' native formats (nm -> angstroms) if self.convert_units: self.convert_pos_from_native(self.ts._pos) if self.ts.dimensions is not None: self.convert_pos_from_native(self.ts.dimensions[:3]) self.convert_velocities_from_native(self.ts._velocities) return ts @property def n_atoms(self): """Number of atoms in a frame""" return self._n_atoms @property @cached("n_frames") def n_frames(self): """Total number of frames in a trajectory""" try: return self._read_trz_n_frames(self.trzfile) except IOError: return 0 def _read_trz_n_frames(self, trzfile): """Uses size of file and dtype information to determine how many frames exist .. versionchanged:: 0.9.0 Now is based on filesize rather than reading entire file """ fsize = os.fstat(trzfile.fileno()).st_size # size of file in bytes if not (fsize - self._headerdtype.itemsize) % self._dtype.itemsize == 0: raise IOError( "Trajectory has incomplete frames" ) # check that division is sane nframes = int( (fsize - self._headerdtype.itemsize) / self._dtype.itemsize ) # returns long int otherwise return nframes def _get_dt(self): """The amount of time between frames in ps Assumes that this step is constant (ie. 2 trajectories with different steps haven't been stitched together). Returns ``AttributeError`` in case of ``StopIteration`` (which makes :attr:`dt` return 1.0). .. versionchanged:: 2.1.0 Now returns an ``AttributeError`` if dt can't be obtained from the time difference between two frames. """ curr_frame = self.ts.frame try: t0 = self.ts.time self.next() t1 = self.ts.time dt = t1 - t0 except StopIteration: raise AttributeError else: return dt finally: self._read_frame(curr_frame) @property @cached("delta") def delta(self): """MD integration timestep""" return self.dt / self.skip_timestep @property @cached("skip_timestep") def skip_timestep(self): """Timesteps between trajectory frames""" curr_frame = self.ts.frame try: t0 = self.ts.data["frame"] self.next() t1 = self.ts.data["frame"] skip_timestep = t1 - t0 except StopIteration: return 0 else: return skip_timestep finally: self._read_frame(curr_frame) def _read_frame(self, frame): """Move to *frame* and fill timestep with data. .. versionchanged:: 0.11.0 Frames now 0-based instead of 1-based """ move = frame - self.ts.frame self._seek(move - 1) self._read_next_timestep() return self.ts def _seek(self, nframes): """Move *nframes* in the trajectory Note that this doens't read the trajectory (ts remains unchanged) .. versionadded:: 0.9.0 """ framesize = self._dtype.itemsize seeksize = framesize * nframes maxi_l = seeksize + 1 if seeksize > maxi_l: # Workaround for seek not liking long ints # On python 3 this branch will never be used as we defined maxi_l # greater than seeksize. framesize = long(framesize) seeksize = framesize * nframes nreps = int(seeksize / maxi_l) # number of max seeks we'll have to do rem = int(seeksize % maxi_l) # amount leftover to do once max seeks done for _ in range(nreps): self.trzfile.seek(maxint, 1) self.trzfile.seek(rem, 1) else: seeksize = int(seeksize) self.trzfile.seek(seeksize, 1) def _reopen(self): self.close() self.open_trajectory() self._read_trz_header() # Moves to start of first frame
[docs] def open_trajectory(self): """Open the trajectory file""" if self.trzfile is not None: raise IOError(errno.EALREADY, "TRZ file already opened", self.filename) if not os.path.exists(self.filename): raise IOError(errno.ENOENT, "TRZ file not found", self.filename) self.trzfile = util.anyopen(self.filename, "rb") # Reset ts ts = self.ts ts.frame = -1 return self.trzfile
[docs] def Writer(self, filename, n_atoms=None): if n_atoms is None: # guess that they want to write the whole timestep unless told otherwise? n_atoms = self.ts.n_atoms return TRZWriter(filename, n_atoms)
[docs] def close(self): """Close trz file if it was open""" if self.trzfile is not None: self.trzfile.close() self.trzfile = None
[docs]class TRZWriter(base.WriterBase): """Writes a TRZ format trajectory. Note ---- Binary TRZ trajectories are *always* written in *little-endian* byte order. .. deprecated:: 2.7.0 The TRZ Reader and Writer are deprecated as of version 2.7.0 and will be removed in version 3.0.0. """ format = "TRZ" multiframe = True units = {"time": "ps", "length": "nm", "velocity": "nm/ps"} def __init__(self, filename, n_atoms, title="TRZ", convert_units=True): """Create a TRZWriter Parameters ---------- filename : str name of output file n_atoms : int number of atoms in trajectory title : str (optional) title of the trajectory; the title must be 80 characters or shorter, a longer title raises a ValueError exception. convert_units : bool (optional) units are converted to the MDAnalysis base format; [``True``] """ wmsg = ( "The TRZ writer is deprecated and will be removed in " "MDAnalysis version 3.0.0" ) warnings.warn(wmsg, DeprecationWarning) self.filename = filename if n_atoms is None: raise ValueError("TRZWriter requires the n_atoms keyword") if n_atoms == 0: raise ValueError("TRZWriter: no atoms in output trajectory") self.n_atoms = n_atoms if len(title) > 80: raise ValueError("TRZWriter: 'title' must be 80 characters of shorter") self.convert_units = convert_units self.trzfile = util.anyopen(self.filename, "wb") self._writeheader(title) floatsize = str(n_atoms) + "<f4" self.frameDtype = np.dtype( [ ("p1a", "<i4"), ("nframe", "<i4"), ("ntrj", "<i4"), ("natoms", "<i4"), ("treal", "<f8"), ("p1b", "<i4"), ("p2a", "<i4"), ("box", "<9f8"), ("p2b", "<i4"), ("p3a", "<i4"), ("pressure", "<f8"), ("ptensor", "<6f8"), ("p3b", "<i4"), ("p4a", "<i4"), ("six", "<i4"), ("etot", "<f8"), ("ptot", "<f8"), ("ek", "<f8"), ("T", "<f8"), ("blanks", "<2f8"), ("p4b", "<i4"), ("p5a", "<i4"), ("rx", floatsize), ("p5b", "<i4"), ("p6a", "<i4"), ("ry", floatsize), ("p6b", "<i4"), ("p7a", "<i4"), ("rz", floatsize), ("p7b", "<i4"), ("p8a", "<i4"), ("vx", floatsize), ("p8b", "<i4"), ("p9a", "<i4"), ("vy", floatsize), ("p9b", "<i4"), ("p10a", "<i4"), ("vz", floatsize), ("p10b", "<i4"), ] ) def _writeheader(self, title): hdt = np.dtype( [ ("pad1", "<i4"), ("title", "80c"), ("pad2", "<i4"), ("pad3", "<i4"), ("nrec", "<i4"), ("pad4", "<i4"), ] ) out = np.zeros((), dtype=hdt) out["pad1"], out["pad2"] = 80, 80 out["title"] = title + " " * (80 - len(title)) out["pad3"], out["pad4"] = 4, 4 out["nrec"] = 10 out.tofile(self.trzfile) def _write_next_frame(self, obj): """Write information associated with ``obj`` at current frame into trajectory Parameters ---------- ag : AtomGroup or Universe .. versionchanged:: 1.0.0 Renamed from `write_next_timestep` to `_write_next_frame`. .. versionchanged:: 2.0.0 Deprecated support for Timestep argument has now been removed. Use AtomGroup or Universe as an input instead. """ # Check size of ts is same as initial try: # atomgroup? ts = obj.ts except AttributeError: # universe? try: ts = obj.trajectory.ts except AttributeError: errmsg = "Input obj is neither an AtomGroup or Universe" raise TypeError(errmsg) from None if not obj.atoms.n_atoms == self.n_atoms: errmsg = "Number of atoms in ts different to initialisation" raise ValueError(errmsg) # Gather data, faking it when unavailable data = {} faked_attrs = [] for att in [ "pressure", "pressure_tensor", "total_energy", "potential_energy", "kinetic_energy", "temperature", ]: try: data[att] = ts.data[att] except KeyError: if att == "pressure_tensor": data[att] = np.zeros(6, dtype=np.float64) else: data[att] = 0.0 faked_attrs.append(att) try: data["step"] = ts.data["frame"] except KeyError: data["step"] = ts.frame faked_attrs.append("step") try: data["time"] = ts.time except AttributeError: data["time"] = float(ts.frame) faked_attrs.append("time") if faked_attrs: warnings.warn( "Timestep didn't have the following attributes: '{0}', " "these will be set to 0 in the output trajectory" "".format(", ".join(faked_attrs)) ) # Convert other stuff into our format if ts.dimensions is not None: unitcell = triclinic_vectors(ts.dimensions).reshape(9) else: warnings.warn( "Timestep didn't have dimensions information, " "box will be written as all zero values" ) unitcell = np.zeros(9, dtype=np.float32) try: vels = ts.velocities except NoDataError: vels = np.zeros((self.n_atoms, 3), dtype=np.float32, order="F") warnings.warn( "Timestep didn't have velocity information, " "this will be set to zero in output trajectory. " ) out = np.zeros((), dtype=self.frameDtype) out["p1a"], out["p1b"] = 20, 20 out["nframe"] = ts.frame + 1 # TRZ wants 1 based out["ntrj"] = data["step"] out["natoms"] = self.n_atoms out["treal"] = data["time"] out["p2a"], out["p2b"] = 72, 72 out["box"] = self.convert_pos_to_native(unitcell, inplace=False) out["p3a"], out["p3b"] = 56, 56 out["pressure"] = data["pressure"] out["ptensor"] = data["pressure_tensor"] out["p4a"], out["p4b"] = 60, 60 out["six"] = 6 out["etot"] = data["total_energy"] out["ptot"] = data["potential_energy"] out["ek"] = data["kinetic_energy"] out["T"] = data["temperature"] out["blanks"] = 0.0, 0.0 size = ts.n_atoms * 4 # size of float for vels & coords out["p5a"], out["p5b"] = size, size out["rx"] = self.convert_pos_to_native(ts._x, inplace=False) out["p6a"], out["p6b"] = size, size out["ry"] = self.convert_pos_to_native(ts._y, inplace=False) out["p7a"], out["p7b"] = size, size out["rz"] = self.convert_pos_to_native(ts._z, inplace=False) out["p8a"], out["p8b"] = size, size out["vx"] = self.convert_velocities_to_native(vels[:, 0], inplace=False) out["p9a"], out["p9b"] = size, size out["vy"] = self.convert_velocities_to_native(vels[:, 1], inplace=False) out["p10a"], out["p10b"] = size, size out["vz"] = self.convert_velocities_to_native(vels[:, 2], inplace=False) out.tofile(self.trzfile)
[docs] def close(self): """Close if it was open""" if self.trzfile is None: return self.trzfile.close() self.trzfile = None