"""Module contains classes for input/output processing."""
from __future__ import annotations
import json
import pathlib
from typing import TYPE_CHECKING, Callable
import numpy as np
from pywindow._internal.utilities import (
decipher_atom_key,
unit_cell_to_lattice_array,
)
if TYPE_CHECKING:
from collections import abc
import rdkit
class _CorruptedPDBFileError(Exception):
def __init__(self, message: str) -> None:
self.message = message
class _CorruptedXYZFileError(Exception):
def __init__(self, message: str) -> None:
self.message = message
class _NotADictionaryError(Exception):
def __init__(self, message: str) -> None:
self.message = message
class _FileTypeError(Exception):
def __init__(self, message: str) -> None:
self.message = message
class Output:
"""Class used to process and save output files."""
def __init__(self) -> None:
self.cwd = pathlib.Path.cwd()
self._save_funcs = {".xyz": self._save_xyz, ".pdb": self._save_pdb}
def dump2json(
self,
obj: dict, # type: ignore[type-arg]
filepath: str | pathlib.Path,
default: Callable, # type:ignore[type-arg]
override: bool = False, # noqa: FBT001, FBT002
) -> None:
"""Dump a dictionary into a JSON dictionary.
Uses the json.dump() function.
Parameters:
obj:
A dictionary to be dumpped as JSON file.
filepath:
The filepath for the dumped file.
default:
A callable function that will be used to convert
non-serializable objects to serializable ones.
override:
If True, any file in the filepath will be override.
(default=False)
"""
filepath = pathlib.Path(filepath)
# We make sure that the object passed by the user is a dictionary.
if not isinstance(obj, dict):
msg = "This function only accepts dictionaries as input" # type:ignore[unreachable]
raise _NotADictionaryError(msg)
# We check if the filepath has a json extenstion, if not we add it.
if ".json" not in filepath.name:
filepath = filepath.with_suffix(".json")
# First we check if the file already exists. If yes and the override
# keyword is False (default), we will raise an exception. Otherwise
# the file will be overwritten.
if override is False: # noqa: SIM102
if filepath.is_file():
msg = (
f"The file {filepath} already exists. Use a different "
"filepath, or set the 'override' to True."
)
raise FileExistsError(msg)
# We dump the object to the json file. Additional kwargs can be passed.
with filepath.open("w+") as json_file:
json.dump(obj, json_file, default=default)
def dump2file( # noqa: PLR0913
self,
obj: dict, # type: ignore[type-arg]
filepath: str | pathlib.Path,
atom_ids_key: str,
override: bool = False, # noqa: FBT001, FBT002
elements_key: str = "elements",
coordinates_key: str = "coordinates",
remarks: None | abc.Sequence[str] | str | float = None,
cryst: str = "unit_cell",
space_group: str | None = None,
forcefield: str | None = None,
decipher: bool = False, # noqa: FBT001, FBT002
resname: str = "MOL",
chainid: str = "A",
resseq: int = 1,
) -> None:
"""Dump a dictionary into a file. (Extensions: XYZ or PDB).
Parameters:
obj:
A dictionary containing molecular information.
filepath:
The filepath for the dumped file.
atom_ids:
Whether to use elements or atom_ids in pdb files.
override:
If True, any file in the filepath will be override.
(default=False)
NEED TO UPDATE KWARGS.
"""
filepath = pathlib.Path(filepath)
# First we check if the file already exists. If yes and the override
# keyword is False (default), we will raise an exception. Otherwise
# the file will be overwritten.
if override is False and filepath.is_file():
msg = (
f"The file {filepath} already exists. "
"Use a different filepath, "
"or set the 'override' kwarg to True."
)
raise FileExistsError(msg)
if filepath.suffix == ".pdb":
self._save_pdb(
system=obj,
filepath=filepath,
atom_ids_key=atom_ids_key,
elements_key=elements_key,
coordinates_key=coordinates_key,
remarks=remarks,
cryst=cryst,
space_group=space_group,
forcefield=forcefield,
decipher=decipher,
resname=resname,
chainid=chainid,
resseq=resseq,
)
elif filepath.suffix == ".xyz":
self._save_xyz(
system=obj,
filepath=filepath,
elements_key=elements_key,
coordinates_key=coordinates_key,
remarks=remarks,
forcefield=forcefield,
decipher=decipher,
)
else:
msg = (
f"The {filepath.suffix} file extension is "
"not supported for dumping a MolecularSystem or a Molecule. "
"Please use XYZ or PDB."
)
raise _FileTypeError(msg)
def _save_xyz( # noqa: PLR0913
self,
system: dict, # type: ignore[type-arg]
filepath: str | pathlib.Path,
elements_key: str = "elements",
coordinates_key: str = "coordinates",
remarks: None | str | float | abc.Sequence[str] = None,
forcefield: str | None = None,
decipher: bool = False, # noqa: FBT001, FBT002
) -> None:
filepath = pathlib.Path(filepath)
if isinstance(remarks, (list, tuple)):
remarks = ";".join(remarks)
elif remarks is None:
remarks = ""
# Extract neccessary data.
elements = system[elements_key]
coordinates = system[coordinates_key]
if decipher is True:
if forcefield is None:
msg = "forcefield must be provided when decipher is True"
raise ValueError(msg)
elements = np.array(
[
decipher_atom_key(key, forcefield=forcefield)
for key in elements
]
)
string = f"{len(elements):0d}\n{remarks!s}\n"
for i, j in zip(elements, coordinates):
string += "{} {:.2f} {:.2f} {:.2f}\n".format(i, *j)
with filepath.open("w+") as file_:
file_.write(string)
def _save_pdb( # noqa: C901, PLR0913
self,
system: dict, # type: ignore[type-arg]
filepath: str | pathlib.Path,
atom_ids_key: str,
elements_key: str = "elements",
coordinates_key: str = "coordinates",
remarks: None | abc.Sequence[str] | str | float = None,
cryst: str = "unit_cell",
space_group: str | None = None,
forcefield: str | None = None,
decipher: bool = False, # noqa: FBT001, FBT002
resname: str = "MOL",
chainid: str = "A",
resseq: int = 1,
) -> None:
filepath = pathlib.Path(filepath)
# We create initial string that we will gradually extend while we
# process the data and in the end it will be written into a pdb file.
string = "REMARK File generated using pyWINDOW."
# We process the remarks, if any, given by the user (optional).
if isinstance(remarks, (list, tuple)):
# If a list or tuple of remarks each is written at a new line
# with the REMARK prefix not to have to long remark line.
for remark in remarks:
string = "\n".join([string, f"REMARK {remark}"])
# Otherwise if it's a single string or an int/float we just write
# it under single remark line, otherwise nothing happens.
elif isinstance(remarks, (str, int, float)):
remark = remarks
string = "\n".join([string, f"REMARK {remark}"])
# If there is a unit cell (crystal data) provided we need to add it.
if cryst in system and system[cryst].any():
cryst_line = "CRYST1"
cryst = system[cryst]
# The user have to provide the crystal data as a list/array
# of six items containing unit cell edges lengths a, b and c
# in x, y and z directions and three angles, or it can be.
# Other options are not allowed for simplicity. It can convert
# from the lattice array using function from utilities.
for i in cryst[:3]:
cryst_line = "".join([cryst_line, f"{i:9.3f}"])
for i in cryst[3:]:
cryst_line = "".join([cryst_line, f"{i:7.2f}"])
# This is kind of messy, by default the data written in PDB
# file should be P1 symmetry group therefore containing all
# atom coordinates and not considering symmetry operations.
# But, user can still define a space group if he wishes to.
if space_group is None:
space_group = "{}".format("P1")
cryst_line = f"{cryst_line} {space_group}"
# We add the unit cell parameters to the main string.
string = f"{string}\n{cryst_line}"
# For the sake of code readability we extract interesting data from the
# system. Atom_ids are the atom ids written at the third column of a
# PDB file and the user has here the freedom to use the forcefield
# assigned ones. However, they have to specify it directly using the
# atom_ids key. Otherwise, the 'elements' array from system object
# will be used, that is also used for elements in the last column of
# a PDB file. Other parameters like residue name (resName), chain id
# (chainID) and residue sequence (resSeq) can be controlled by
# appropriate parameter keyword passed to this function, Otherwise
# the default values from settings dictionary are used.
atom_ids = system[atom_ids_key]
elements = system[elements_key]
# If the 'elements' array of the system need deciphering atom keys this
# is done if the user sets decipher to True. They can also provided
# forcefield, otherwise it's None which equals to DLF.
if decipher is True:
if forcefield is None:
msg = "forcefield must be provided when decipher is True"
raise ValueError(msg)
elements = np.array(
[
decipher_atom_key(key, forcefield=forcefield)
for key in elements
]
)
coordinates = system[coordinates_key]
# Number of items (atoms) in the provided system.
# No idea why mypy has an issue here...
for i in range(len(list(system[atom_ids_key]))): # type: ignore[assignment]
atom_line = f"ATOM {i + 1:5d}" # type: ignore[operator]
atom_id = f"{atom_ids[i].center(4):4}"
atom_line = f"{atom_line} {atom_id} {resname:3} {chainid}"
resseq_formatted = str(resseq).rjust(4)
atom_line = f"{atom_line}{resseq_formatted}"
coor = (
f"{coordinates[i][0]:8.3f}{coordinates[i][1]:8.3f}"
f"{coordinates[i][2]:8.3f}"
)
atom_line = f"{atom_line} {coor}"
big_space = "{}".format(" ".center(22))
element = f"{elements[i].rjust(2):2} "
atom_line = f"{atom_line}{big_space}{element}"
string = f"{string}\n{atom_line}"
string = f"{string}\nEND"
# Check if .pdb extension is missing from filepath.
if filepath.suffix != ".pdb":
filepath = pathlib.Path(f"{filepath}.pdb")
# Write the string to a a PDB file.
with filepath.open("w+") as file:
file.write(string)