Source code for pywindow._internal.trajectory

"""Module intended for the analysis of molecular dynamics trajectories.

The trajectory file (DL_POLY_C:HISTORY, PDB or XYZ) should be loaded with
the one of the corresponding classes (DLPOLY, PDB or XYZ, respectively).

Example:
-------
In this example a DL_POLY_C HISTORY trajectory file is loaded.

.. code-block:: python

    pywindow.trajectory.DLPOLY('path/to/HISTORY')

Then, each of the trajectory frames can be extracted and returned as a
:class:`pywindow.MolecularSystem` object for analysis. See
:mod:`pywindow.molecular` docstring for more information.

Alternatively, the analysis can be performed on a whole or a chunk of
the trajectory with the :func:`analysis()` function. The benefit is
that the analysis can be performed in parallel and the results stored as a
single JSON dictionary in a straightforward way. Also, the deciphering of the
force field atom ids and the rebuilding of molecules can be applied to each
frame in a consitent and automated manner. The downfall is that at the
moment it is not possible to choose the set of parameters that are being
calculated in the :class:`pywindow.Molecule` as the
:func:`pywindow.Molecule.full_analysis()` is invoked by default.
However, the computational cost of calculating majority of the structural
properties is miniscule and it is usually the
:func:`pywindow.MolecularSystem.rebuild_system()` step that is the
bottleneck.

"""

from __future__ import annotations

import pathlib
from contextlib import closing
from copy import deepcopy
from mmap import ACCESS_READ, mmap
from multiprocessing import Pool
from typing import Any, Literal

import numpy as np

from pywindow._internal.io_tools import Output
from pywindow._internal.molecular import MolecularSystem
from pywindow._internal.utilities import (
    create_supercell,
    is_number,
    lattice_array_to_unit_cell,
    to_list,
)


class _ParallelAnalysisError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message


class _TrajectoryError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message


class _FormatError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message


class _FunctionError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message


[docs] def make_supercell( system: dict, # type: ignore[type-arg] supercell: list[float] | None = None, ) -> MolecularSystem: """Return a supercell. This functions takes the input unitcell and creates a supercell of it that is returned as a new :class:`pywindow.MolecularSystem`. Parameters: system: The unit cell for creation of the supercell supercell: A list that specifies the size of the supercell in the a, b and c direction. (default=[1, 1, 1]) Returns: Returns the created supercell as a new :class:`MolecularSystem`. """ if supercell is None: supercell = [1, 1, 1] user_supercell = [[1, supercell[0]], [1, supercell[1]], [1, supercell[1]]] system = create_supercell(system=system, supercell=user_supercell) return MolecularSystem.load_system(system)
class Trajectory: """A base class container for trajectories.""" def __init__(self) -> None: self.frames = {} # type: ignore[var-annotated] self.analysis_output = {} # type: ignore[var-annotated] msg = "This is an abstract class" raise NotImplementedError(msg) def get_frames( # noqa: C901 self, frames: int | list[int] | tuple[int, int] | Literal["all", "everything"] = "all", override: bool = False, # noqa: FBT001, FBT002 swap_atoms: dict | None = None, # type: ignore[type-arg] forcefield: str | None = None, extract_data: bool = True, # noqa: FBT001, FBT002 ) -> dict[int, MolecularSystem]: """Extract frames from the trajectory file. Depending on the passed parameters a frame, a list of particular frames, a range of frames (from, to), or all frames can be extracted with this function. Parameters: frames : :class:`int` or :class:`list` or :class:`tuple` or :class:`str` Specified frame (:class:`int`), or frames (:class:`list`), or range (:class:`tuple`), or `all`/`everything` (:class:`str`). (default=`all`) override : :class:`bool` If True, a frame already storred in :attr:`frames` can be override. (default=False) extract_data : :class:`bool`, optional If False, a frame is returned as a :class:`str` block as in the trajectory file. Ohterwise, it is extracted and returned as :class:`pywindow.MolecularSystem`. (default=True) swap_atoms : :class:`dict`, optional If this kwarg is passed with an appropriate dictionary a :func:`pywindow.MolecularSystem.swap_atom_keys()` will be applied to the extracted frame. forcefield : :class:`str`, optional If this kwarg is passed with appropriate forcefield keyword a :func:`pywindow.MolecularSystem.decipher_atom_keys()` will be applied to the extracted frame. Returns: Dictionary of frames. """ collected_frames = {} if override is True: self.frames = {} if isinstance(frames, int): frame_system = self._get_frame( frame_coordinates=self.trajectory_map[frames], # type: ignore[attr-defined] frame_no=frames, swap_atoms=swap_atoms, forcefield=forcefield, extract_data=extract_data, ) if frames not in self.frames: self.frames[frames] = frame_system collected_frames[frames] = frame_system return collected_frames if isinstance(frames, list): for frame in frames: if frame not in self.frames: self.frames[frame] = self._get_frame( frame_coordinates=self.trajectory_map[frame], # type: ignore[attr-defined] frame_no=frame, swap_atoms=swap_atoms, forcefield=forcefield, extract_data=extract_data, ) collected_frames[frame] = self.frames[frame] if isinstance(frames, tuple): for frame in range(frames[0], frames[1]): if frame not in self.frames: self.frames[frame] = self._get_frame( frame_coordinates=self.trajectory_map[frame], # type: ignore[attr-defined] frame_no=frame, swap_atoms=swap_atoms, forcefield=forcefield, extract_data=extract_data, ) collected_frames[frame] = self.frames[frame] if isinstance(frames, str) and frames in ["all", "everything"]: for frame in range(self.no_of_frames): # type: ignore[attr-defined] if frame not in self.frames: self.frames[frame] = self._get_frame( frame_coordinates=self.trajectory_map[frame], # type: ignore[attr-defined] frame_no=frame, swap_atoms=swap_atoms, forcefield=forcefield, extract_data=extract_data, ) collected_frames[frame] = self.frames[frame] return collected_frames def _decode_frame(self, frame: list) -> dict: # type:ignore[type-arg] raise NotImplementedError def _get_frame( self, frame_coordinates: list[int], frame_no: int, swap_atoms: dict | None = None, # type: ignore[type-arg] forcefield: str | None = None, extract_data: bool = True, # noqa: FBT001, FBT002 ) -> MolecularSystem: start, end = frame_coordinates with ( self.filepath.open() as trajectory_file, # type: ignore[attr-defined] closing( mmap(trajectory_file.fileno(), 0, access=ACCESS_READ) ) as mapped_file, ): if extract_data is False: return mapped_file[start:end].decode("utf-8") # type: ignore[return-value] # [:-1] because the split results in last list empty. frame = [ i.split() for i in mapped_file[start:end].decode("utf-8").split("\n") ][:-1] decoded_frame = self._decode_frame(frame) molsys = MolecularSystem.load_system( decoded_frame, "_".join([self.system_id, str(frame_no)]), # type: ignore[attr-defined] ) if swap_atoms is not None: molsys.swap_atom_keys(swap_atoms) if forcefield is not None: molsys.decipher_atom_keys(forcefield) return molsys def save_analysis( self, filepath: str | pathlib.Path | None = None, override: bool = False, # noqa: FBT001, FBT002 ) -> None: """Dump the content of :attr:`analysis_output` as JSON dictionary.""" # We pass a copy of the analysis attribute dictionary. dict_obj = deepcopy(self.analysis_output) # type: ignore[attr-defined] # If no filepath is provided we create one. if filepath is None: filepath = f"{self.system_id}_pywindow_analysis" # type: ignore[attr-defined] filepath = pathlib.Path.cwd() / filepath filepath = pathlib.Path(filepath) # Dump the dictionary to json file. Output().dump2json( dict_obj, filepath, default=to_list, override=override, ) def save_frames( # noqa: C901, PLR0912 self, frames: int | list | tuple | Literal["all", "everything"] = "all", # type: ignore[type-arg] filepath: str | pathlib.Path | None = None, decipher: bool = True, # noqa: FBT001, FBT002 swap_atoms: dict | None = None, # type: ignore[type-arg] forcefield: str | None = None, ) -> None: # If no filepath is provided we create one. if filepath is None: filepath = pathlib.Path.cwd() / str(self.system_id) # type: ignore[attr-defined] filepath = pathlib.Path(filepath) frames_to_get = [] if isinstance(frames, int): frames_to_get.append(frames) if isinstance(frames, list): frames_to_get = frames if isinstance(frames, tuple): for frame in range(frames[0], frames[1]): frames_to_get.append(frame) if isinstance(frames, str) and frames in ["all", "everything"]: for frame in range(self.no_of_frames): # type: ignore[attr-defined] frames_to_get.append(frame) for frame in frames_to_get: if frame not in self.frames: _ = self.get_frames(frame) for frame in frames_to_get: frame_molsys = self.frames[frame] if decipher is True and forcefield is not None: if swap_atoms is not None: if isinstance(swap_atoms, dict): frame_molsys.swap_atom_keys(swap_atoms) else: msg = ( # type: ignore[unreachable] "The swap_atom_keys function only accepts " "'swap_atoms' argument in form of a dictionary." ) raise _FunctionError(msg) frame_molsys.decipher_atom_keys(forcefield) ffilepath = "_".join((str(filepath), str(frame))) if "elements" not in frame_molsys.system: msg = ( "The frame (MolecularSystem object) needs to have " "'elements' attribute within the system dictionary. " "It is, therefore, neccessary that you set a decipher " "keyword to True. (see manual)" ) raise _FunctionError(msg) if filepath.suffix == ".pdb": Output()._save_pdb( # noqa: SLF001 system=frame_molsys.system, filepath=ffilepath, atom_ids_key="elements" if "atom_ids" not in frame_molsys.system else "atom_ids", forcefield=forcefield, decipher=decipher, ) elif filepath.suffix == ".xyz": Output()._save_xyz( # noqa: SLF001 system=frame_molsys.system, filepath=ffilepath, forcefield=forcefield, decipher=decipher, ) else: msg = ( f"The {filepath.suffix} file extension is " "not supported for dumping a MolecularSystem or a" " Molecule. Please use XYZ or PDB." ) raise _FormatError(msg) def analysis( # noqa: C901, PLR0912, PLR0913 self, frames: int | list | tuple | str = "all", # type: ignore[type-arg] ncpus: int = 1, ncpus_analysis: int = 1, override: bool = False, # noqa: FBT001, FBT002 modular: bool = False, # noqa: FBT001, FBT002 rebuild: bool = False, # noqa: FBT001, FBT002 swap_atoms: dict | None = None, # type: ignore[type-arg] forcefield: str | None = None, ) -> None: """Perform structural analysis on a frame/ set of frames. Depending on the passed parameters a frame, a list of particular frames, a range of frames (from, to), or all frames can be analysed with this function. The analysis is performed on each frame and each discrete molecule in that frame separately. The steps are as follows: 1. A frame is extracted and returned as a :class:`MolecularSystem`. 2. If `swap_atoms` is set the atom ids are swapped. 3. If `forcefield` is set the atom ids are deciphered. 4. If `rebuild` is set the molecules in the system are rebuild. 5. Each discrete molecule is extracted as :class:`Molecule` 6. Each molecule is analysed with :func:`Molecule.full_analysis()` 7. Analysis output populates the :attr:`analysis_output` dictionary. As the analysis of trajectories often have to be unique, many options are conditional. A side effect of this function is that the analysed frames are also returned to the :attr:`frames` mimicking the behaviour of the :func:`get_frames()`. Parameters: frames : :class:`int` or :class:`list` or :class:`tuple` or :class:`str` Specified frame (:class:`int`), or frames (:class:`list`), or range (:class:`touple`), or `all`/`everything` (:class:`str`). (default='all') override : :class:`bool` If True, an output already storred in :attr:`analysis_output` can be override. (default=False) swap_atoms : :class:`dict`, optional If this kwarg is passed with an appropriate dictionary a :func:`pywindow.MolecularSystem.swap_atom_keys()` will be applied to the extracted frame. forcefield : :class:`str`, optional If this kwarg is passed with appropriate forcefield keyword a :func:`pywindow.MolecularSystem.decipher_atom_keys()` will be applied to the extracted frame. modular : :class:`bool`, optional If this kwarg is passed a :func:`pywindow.MolecularSystem.make_modular()` will be applied to the extracted frame. (default=False) rebuild : :class:`bool`, optional If this kwarg is passed a `rebuild=True` is passed to :func:`pywindow.MolecularSystem.make_modular()` that will be applied to the extracted frame. (default=False) ncpus : :class:`int`, optional If ncpus > 1, then the analysis is performed in parallel for the specified number of parallel jobs. Otherwise, it runs in serial. (default=1) ncpus_analysis : :class:`int`, optional If ncpus > 1, then the analysis is performed in parallel for the specified number of parallel jobs. Otherwise, it runs in serial. (default=1) Returns: None : :class:`NoneType` The function returns `None`, the analysis output is returned to :attr:`analysis_output` dictionary. """ frames_for_analysis = [] # First populate the frames_for_analysis list. if isinstance(frames, int): frames_for_analysis.append(frames) if isinstance(frames, list): for frame in frames: if isinstance(frame, int): frames_for_analysis.append(frame) else: msg = "The list should be populated with integers only." raise _FunctionError(msg) if ( isinstance(frames, tuple) and isinstance(frames[0], int) and isinstance(frames[1], int) ): for frame in range(frames[0], frames[1]): frames_for_analysis.append(frame) # noqa: PERF402 msg = ( "The tuple should contain only two integers " "for the begining and the end of the frames range." ) raise _FunctionError(msg) if isinstance(frames, str): if frames in ["all", "everything"]: for frame in range(self.no_of_frames): # type: ignore[attr-defined] frames_for_analysis.append(frame) # noqa: PERF402 else: msg = "Didn't recognise the keyword. (see manual)" raise _FunctionError(msg) # The override keyword by default is False. So we check if any of the # frames were already analysed and if so we delete them from the list. # However, if the override is set to True, then we just proceed. if override is False: frames_for_analysis_new = [] for frame in frames_for_analysis: if frame not in self.analysis_output: frames_for_analysis_new.append(frame) # noqa: PERF401 frames_for_analysis = frames_for_analysis_new if ncpus == 1: for frame in frames_for_analysis: analysed_frame = self._analysis_serial( frame=frame, ncpus_analysis=ncpus_analysis, rebuild=rebuild, modular=modular, forcefield=forcefield, swap_atoms=swap_atoms, ) self.analysis_output[frame] = analysed_frame if ncpus > 1: self._analysis_parallel( frames=frames_for_analysis, ncpus=ncpus, ncpus_analysis=ncpus_analysis, rebuild=rebuild, modular=modular, forcefield=forcefield, swap_atoms=swap_atoms, ) def _analysis_serial( # noqa: PLR0913 self, frame: int, ncpus_analysis: int = 1, rebuild: bool = False, # noqa: FBT001, FBT002 modular: bool = False, # noqa: FBT001, FBT002 swap_atoms: dict | None = None, # type: ignore[type-arg] forcefield: str | None = None, ) -> dict: # type: ignore[type-arg] molecular_system = self._get_frame( frame_coordinates=self.trajectory_map[frame], # type: ignore[attr-defined] frame_no=frame, extract_data=True, swap_atoms=swap_atoms, forcefield=forcefield, ) if modular is True: molecular_system.make_modular(rebuild=rebuild) molecules = molecular_system.molecules else: molecules = {"0": molecular_system.system_to_molecule()} results = {} for molecule in molecules: mol = molecules[molecule] results[molecule] = mol.full_analysis(ncpus=ncpus_analysis) return results def _analysis_parallel_execute( # type: ignore[no-untyped-def] self, frame: int, # Using kwargs here for parallel execution. **kwargs, # noqa: ANN003 ) -> tuple[int, dict]: # type: ignore[type-arg] settings = {"rebuild": False, "modular": False} settings.update(kwargs) molecular_system = self._get_frame( frame_coordinates=self.trajectory_map[frame], # type: ignore[attr-defined] frame_no=frame, # type: ignore[arg-type] extract_data=True, swap_atoms=settings["swap_atoms"], # type: ignore[arg-type] forcefield=settings["forcefield"], # type: ignore[arg-type] ) if settings["modular"] is True: molecular_system.make_modular(rebuild=settings["rebuild"]) molecules = molecular_system.molecules else: molecules = {"0": molecular_system.system_to_molecule()} results = {} for molecule in molecules: mol = molecules[molecule] results[molecule] = mol.full_analysis( ncpus=settings["ncpus_analysis"] ) return frame, results def _analysis_parallel( # noqa: PLR0913 self, frames: list, # type: ignore[type-arg] ncpus: int, ncpus_analysis: int, rebuild: bool = False, # noqa: FBT001, FBT002 modular: bool = False, # noqa: FBT001, FBT002 swap_atoms: dict | None = None, # type: ignore[type-arg] forcefield: str | None = None, ) -> None: try: pool = Pool(processes=ncpus) parallel = [ pool.apply_async( self._analysis_parallel_execute, args=(frame,), kwds={ "swap_atoms": swap_atoms, "rebuild": rebuild, "forcefield": forcefield, "modular": modular, "ncpus_analysis": ncpus_analysis, }, ) for frame in frames ] results = [p.get() for p in parallel if p.get()] pool.terminate() for i in results: self.analysis_output[i[0]] = i[1] # type: ignore[index, attr-defined] except TypeError: pool.terminate() msg = "Parallel analysis failed." raise _ParallelAnalysisError(msg) from None
[docs] class DLPOLY(Trajectory): """A container for a DL_POLY_C type trajectory (HISTORY). This function takes a DL_POLY_C trajectory file and maps it for the binary points in the file where each frame starts/ends. This way the process is fast, as it not require loading the trajectory into computer memory. When a frame is being extracted, it is only this frame that gets loaded to the memory. Frames can be accessed individually and loaded as an unmodified string, returned as a :class:`pywindow.MolecularSystem` (and analysed), dumped as PDB or XYZ or JSON (if dumped as a :attr:`pywindow.MolecularSystem.system`) Attributes: filepath : :class:`str` The filepath. system_id : :class:`str` The system id inherited from the filename. frames : :class:`dict` A dictionary that is populated, on the fly, with the extracted frames. analysis_output : :class:`dict` A dictionary that is populated, on the fly, with the analysis output. """ def __init__(self, filepath: pathlib.Path | str) -> None: # Image conventions - periodic boundary key. self._imcon = { 0: "nonperiodic", 1: "cubic", 2: "orthorhombic", 3: "parallelepiped", 4: "truncated octahedral", 5: "rhombic dodecahedral", 6: "x-y parallelogram", 7: "hexagonal prism", } # Trajectory key - content type. self._keytrj = { 0: "coordinates", 1: "coordinates and velocities", 2: "coordinates, velocities and forces", } self.filepath = pathlib.Path(filepath) self.system_id = self.filepath.name.split(".")[0] self.frames = {} # type: ignore[var-annotated] self.analysis_output = {} # type: ignore[var-annotated] # Check the history file at init, if no errors, proceed to mapping. self._check_history() # Map the trajectory file at init. self._map_history() def _map_history(self) -> None: """Map HISTORY file.""" self.trajectory_map: dict[str | int, Any] = {} with self.filepath.open() as trajectory_file: with closing( mmap(trajectory_file.fileno(), 0, access=ACCESS_READ) ) as mapped_file: progress = 0 line = 0 frame = 0 # We need to first process trajectory file's header. header_flag = True while progress <= len(mapped_file): line = line + 1 # We read a binary data from a mapped file. bline = mapped_file.readline() # If the bline length equals zero we terminate. # We reached end of the file but still add the last frame! if len(bline) == 0: self.trajectory_map[frame] = [frame_start, progress] # noqa: F821 frame = frame + 1 break # We need to decode byte line into an utf-8 string. sline = bline.decode("utf-8").strip("\n").split() # We extract map's byte coordinates for each frame if header_flag is False and sline[0] == "timestep": self.trajectory_map[frame] = [ frame_start, # noqa: F821 progress, ] frame_start: int = progress frame = frame + 1 # Here we extract the map's byte coordinates for the header # And also the periodic system type needed for later. if header_flag is True and sline[0] == "timestep": self.trajectory_map["header"] = self._decode_head( [0, progress] ) frame_start = progress # noqa: F841 header_flag = False progress = progress + len(bline) self.no_of_frames = frame def _decode_head( self, header_coordinates: list[int], ) -> list[list[str]]: start, end = header_coordinates with ( self.filepath.open() as trajectory_file, closing( mmap(trajectory_file.fileno(), 0, access=ACCESS_READ) ) as mapped_file, ): header = [ i.split() for i in mapped_file[start:end].decode("utf-8").split("\n") ] header = [int(i) for i in header[1]] # type:ignore[misc] self.periodic_boundary = self._imcon[header[1]] # type:ignore[index] self.content_type = self._keytrj[header[0]] # type:ignore[index] self.no_of_atoms = header[2] # type:ignore[index] return header def _decode_frame(self, frame: list) -> dict: # type:ignore[type-arg] # noqa: C901, PLR0912 frame_data = { "frame_info": { "nstep": int(frame[0][1]), "natms": int(frame[0][2]), "keytrj": int(frame[0][3]), "imcon": int(frame[0][4]), "tstep": float(frame[0][5]), } } start_line = 1 if frame_data["frame_info"]["imcon"] in [1, 2, 3]: frame_data["lattice"] = np.array(frame[1:4], dtype=float).T # type:ignore[assignment] frame_data["unit_cell"] = lattice_array_to_unit_cell( # type:ignore[assignment] frame_data["lattice"] # type:ignore[arg-type] ) start_line = 4 # Depending on what the trajectory key is (see __init__) we need # to extract every second/ third/ fourth line for elements and coor. elements = [] coordinates = [] velocities = [] forces = [] for i in range(len(frame[start_line:])): i_ = i + start_line if frame_data["frame_info"]["keytrj"] == 0: if i % 2 == 0: elements.append(frame[i_][0]) if i % 2 == 1: coordinates.append(frame[i_]) if frame_data["frame_info"]["keytrj"] == 1: if i % 3 == 0: elements.append(frame[i_][0]) if i % 3 == 1: coordinates.append(frame[i_]) if i % 3 == 2: # noqa: PLR2004 velocities.append(frame[i_]) if frame_data["frame_info"]["keytrj"] == 2: # noqa: PLR2004 if i % 4 == 0: elements.append(frame[i_][0]) if i % 4 == 1: coordinates.append(frame[i_]) if i % 4 == 2: # noqa: PLR2004 velocities.append(frame[i_]) if i % 4 == 3: # noqa: PLR2004 forces.append(frame[i_]) frame_data["atom_ids"] = np.array(elements) # type:ignore[assignment] frame_data["coordinates"] = np.array(coordinates, dtype=float) # type:ignore[assignment] if velocities: frame_data["velocities"] = np.array(velocities, dtype=float) # type:ignore[assignment] if forces: frame_data["forces"] = np.array(forces, dtype=float) # type:ignore[assignment] return frame_data def _check_history(self) -> None: self.check_log = "" line = 0 binary_step = 0 timestep = 0 timestep_flag = "timestep" warning_1 = "No comment line is present as the file header.\n" warning_2 = ( "Second header line is missing from the file that contains" " information on the system's periodicity and the type of the " "trajectory file.\n" ) error_1 = "The trajectory is discontinous.\n" error_2 = "The file contains an empty line.\n" # We open the HISTORY trajectory file with ( self.filepath.open() as trajectory_file, closing( mmap(trajectory_file.fileno(), 0, access=ACCESS_READ) ) as file_binary_map, ): # We use this binary mapping feature that instead of loading # the full file into memory beforehand it only # maps the content. Especially useful with enormous files while binary_step < len(file_binary_map): line += 1 binary_line = file_binary_map.readline() binary_step = binary_step + len(binary_line) string_line = binary_line.decode("utf-8").strip("\n").split() # Warning 1 if line == 1 and string_line[0] != "DLFIELD": self.check_log = " ".join( ( self.check_log, f"Line {line}:", warning_1, ) ) # Warning 2 if line == 2 and len(string_line) != 3: # noqa: PLR2004 self.check_log = " ".join( ( self.check_log, f"Line {line}:", warning_2, ) ) # Error 1 if string_line and string_line[0] == timestep_flag: old_timestep = timestep timestep = int(string_line[1]) if old_timestep > timestep: error = " ".join((f"Line {line}:", error_1)) raise _TrajectoryError(error) # Error 2 if len(string_line) == 0: error = " ".join((f"Line {line}:", error_2)) raise _TrajectoryError(error)
[docs] class XYZ(Trajectory): """A container for an XYZ type trajectory. This function takes an XYZ trajectory file and maps it for the binary points in the file where each frame starts/ends. This way the process is fast, as it not require loading the trajectory into computer memory. When a frame is being extracted, it is only this frame that gets loaded to the memory. Frames can be accessed individually and loaded as an unmodified string, returned as a :class:`pywindow.MolecularSystem` (and analysed), dumped as PDB or XYZ or JSON (if dumped as a :attr:`pywindow.MolecularSystem.system`) Attributes: filepath : :class:`str` The filepath. filename : :class:`str` The filename. system_id : :class:`str` The system id inherited from the filename. frames : :class:`dict` A dictionary that is populated, on the fly, with the extracted frames. analysis_output : :class:`dict` A dictionary that is populated, on the fly, with the analysis output. """ def __init__(self, filepath: pathlib.Path | str) -> None: self.filepath = pathlib.Path(filepath) self.filename = self.filepath.name self.system_id = self.filename.split(".")[0] self.frames = {} # type: ignore[var-annotated] self.analysis_output = {} # type: ignore[var-annotated] # Map the trajectory file at init. self._map_trajectory() def _map_trajectory(self) -> None: """Map xyz trajectory.""" self.trajectory_map = {} with self.filepath.open() as trajectory_file: with closing( mmap(trajectory_file.fileno(), 0, access=ACCESS_READ) ) as mapped_file: progress = 0 line = 0 frame = -1 frame_start = 0 while progress <= len(mapped_file): line = line + 1 # We read a binary data from a mapped file. bline = mapped_file.readline() # If the bline length equals zero we terminate. # We reached end of the file but still add the last frame! if len(bline) == 0: frame = frame + 1 self.trajectory_map[frame] = [frame_start, progress] break # We need to decode byte line into an utf-8 string. sline = bline.decode("utf-8").strip("\n").split() # We extract map's byte coordinates for each frame if ( len(sline) == 1 and is_number(sline[0]) and progress > 0 ): frame = frame + 1 self.trajectory_map[frame] = [frame_start, progress] frame_start = progress # Here we extract the map's byte coordinates for the header # And also the periodic system type needed for later. progress = progress + len(bline) self.no_of_frames = frame + 1 def _decode_frame(self, frame: list) -> dict: # type: ignore[type-arg] frame_data = { "frame_info": { "natms": int(frame[0][0]), "remarks": " ".join([*frame[1]]), } } start_line = 2 elements = [] coordinates = [] for i in range(start_line, len(frame)): elements.append(frame[i][0]) coordinates.append(frame[i][1:]) frame_data["atom_ids"] = np.array(elements) # type: ignore[assignment] frame_data["coordinates"] = np.array(coordinates, dtype=float) # type: ignore[assignment] return frame_data
[docs] class PDB(Trajectory): def __init__(self, filepath: pathlib.Path | str) -> None: """A container for an PDB type trajectory. This function takes an PDB trajectory file and maps it for the binary points in the file where each frame starts/ends. This way the process is fast, as it not require loading the trajectory into computer memory. When a frame is being extracted, it is only this frame that gets loaded to the memory. Frames can be accessed individually and loaded as an unmodified string, returned as a :class:`pywindow.MolecularSystem` (and analysed), dumped as PDB or XYZ or JSON (if dumped as a :attr:`pywindow.MolecularSystem.system`) Attributes: filepath: The filepath. filename: The filename. system_id: The system id inherited from the filename. frames: A dictionary that is populated, on the fly, with the extracted frames. analysis_output: A dictionary that is populated, on the fly, with the analysis output. """ self.filepath = pathlib.Path(filepath) self.filename = self.filepath.name self.system_id = self.filename.split(".")[0] self.frames: dict = {} # type: ignore[type-arg] self.analysis_output: dict = {} # type: ignore[type-arg] # Map the trajectory file at init. self._map_trajectory() def _map_trajectory(self) -> None: """Map pdb trajectory.""" self.trajectory_map = {} with self.filepath.open() as trajectory_file: with closing( mmap(trajectory_file.fileno(), 0, access=ACCESS_READ) ) as mapped_file: progress = 0 line = 0 frame = -1 frame_start = 0 while progress <= len(mapped_file): line = line + 1 # We read a binary data from a mapped file. bline = mapped_file.readline() # If the bline length equals zero we terminate. # We reached end of the file but still add the last frame! if len(bline) == 0: frame = frame + 1 if progress - frame_start > 10: # noqa: PLR2004 self.trajectory_map[frame] = [ frame_start, progress, ] break # We need to decode byte line into an utf-8 string. sline = bline.decode("utf-8").strip("\n").split() # We extract map's byte coordinates for each frame if len(sline) == 1 and sline[0] == "END": frame = frame + 1 self.trajectory_map[frame] = [frame_start, progress] frame_start = progress # Here we extract the map's byte coordinates for the header # And also the periodic system type needed for later. progress = progress + len(bline) self.no_of_frames = frame def _decode_frame(self, frame: list) -> dict: # type: ignore[type-arg] frame_data = {} # type: ignore[var-annotated] elements = [] coordinates = [] for i in range(len(frame)): if frame[i][:6] == "REMARK": if "REMARKS" not in frame_data: frame_data["REMARKS"] = [] # type: ignore[assignment] frame_data["REMARKS"].append(frame[i][6:]) # type: ignore[attr-defined] if frame[i][:6] == "CRYST1": cryst = np.array( [ frame[i][6:15], frame[i][15:24], frame[i][24:33], frame[i][33:40], frame[i][40:47], frame[i][47:54], ], dtype=float, ) # This is in case of nonperiodic systems, often they have # a,b,c unit cell parameters as 0,0,0. if sum(cryst[0:3]) != 0: frame_data["CRYST1"] = cryst if frame[i][:6] in ["HETATM", "ATOM "]: elements.append(frame[i][12:16].strip()) coordinates.append( [frame[i][30:38], frame[i][38:46], frame[i][46:54]] ) frame_data["atom_ids"] = np.array(elements, dtype="<U8") frame_data["coordinates"] = np.array(coordinates, dtype=float) return frame_data