Source code for qmmm_pme.common.file_manager

#! /usr/bin/env python3
"""A module defining the class and functions needed to load and write
files.
"""
from __future__ import annotations

import array
import os
import struct
from typing import TYPE_CHECKING

import numpy as np
import openmm.app
import simtk.unit

from .utils import compute_lattice_constants

if TYPE_CHECKING:
    from numpy.typing import NDArray


[docs]class FileManager: """A class to load and generate inputs and outputs. :param working_directory: The current working directory. """ def __init__(self, working_directory: str = "./") -> None: self._working_directory = working_directory if not os.path.isdir(working_directory): os.makedirs(working_directory)
[docs] def load( self, pdb_list: list[str], topology_list: list[str], forcefield_list: list[str], ) -> tuple[ dict[str, NDArray[np.float64]], dict[str, list[str]], dict[str, list[list[int]]], ]: """Load files necessary to generate a system. :param pdb_list: |pdb_list| :param topology_list: |topology_list| :param forcefield_list: |forcefield_list| :return: Data for the :class:`State` and :class:`Topology` record classes. """ # Check the file extensions and add them to the :class:`Files` # record. for fh in pdb_list: _check_ext(fh, "pdb") for fh in topology_list: _check_ext(fh, "xml") for fh in forcefield_list: _check_ext(fh, "xml") # Generate and return :class:`System` data using OpenMM. state_data, name_data, residue_data = _get_system_data( pdb_list, topology_list, forcefield_list, ) return state_data, name_data, residue_data
[docs] def write_to_pdb( self, name: str, positions: NDArray[np.float64], box: NDArray[np.float64], all_groups: list[list[int]], residues: list[str], elements: list[str], atoms: list[str], ) -> None: """Utility to write PDB files with :class:`State` current coordinates. :param name: The directory/name of PDB file to be written. :param positions: |positions| :param box: |box| :param all_groups: The indices of all atoms in the :class:`System` grouped by residue. :param residues: |residues| :param elements: |elements| :param atoms: |atoms| .. note:: Based on PDB writer from OpenMM. """ filename = self._parse_name(name, ext="pdb") ( len_a, len_b, len_c, alpha, beta, gamma, ) = compute_lattice_constants(box) with open(filename, "w") as fh: fh.write( ( f"CRYST1{len_a:9.3f}{len_b:9.3f}{len_c:9.3f}" + f"{alpha:7.2f}{beta:7.2f}" + f"{gamma:7.2f} P 1 1 \n" ), ) for i, residue in enumerate(all_groups): for atom in residue: line = "HETATM" line += f"{atom+1:5d} " line += f"{atoms[atom]:4s}" line += f"{residues[i]:3s} " line += f"A{i+1:4d} " line += f"{positions[atom,0]:8.3f}" line += f"{positions[atom,1]:8.3f}" line += f"{positions[atom,2]:8.3f}" line += " 1.00 0.00 " line += f"{elements[atom]:2s} \n" fh.write(line) fh.write("END")
[docs] def start_dcd( self, name: str, write_interval: int, num_particles: int, timestep: int | float, ) -> None: """Utility to start writing a DCD file. :param name: The directory/name of DCD file to be written. :param write_interval: The interval between successive writes to the DCD file. :param num_particles: The number of particles in the :class:`System`. :param timestep: |timestep| .. note:: Based on DCD writer from OpenMM. """ filename = self._parse_name(name, ext="dcd") with open(filename, "wb") as fh: header = struct.pack( "<i4c9if", 84, b"C", b"O", b"R", b"D", 0, 0, write_interval, 0, 0, 0, 0, 0, 0, timestep, ) header += struct.pack( "<13i", 1, 0, 0, 0, 0, 0, 0, 0, 0, 24, 84, 164, 2, ) header += struct.pack("<80s", b"Created by QM/MM/PME") header += struct.pack("<80s", b"Created now") header += struct.pack("<4i", 164, 4, num_particles, 4) fh.write(header)
[docs] def write_to_dcd( self, name: str, write_interval: int, num_particles: int, positions: NDArray[np.float64], box: NDArray[np.float64], frame: int, ) -> None: """Write data to an existing DCD file. :param name: The directory/name of DCD file to be written. :param write_interval: The interval between successive writes to the DCD file. :param num_particles: The number of particles in the :class:`System`. :param positions: |positions| :param box: |box| :param frame: |frame| .. note:: Based on DCD writer from OpenMM. """ filename = self._parse_name(name, ext="dcd") ( len_a, len_b, len_c, alpha, beta, gamma, ) = compute_lattice_constants(box) with open(filename, "r+b") as fh: fh.seek(8, os.SEEK_SET) fh.write(struct.pack("<i", frame//write_interval)) fh.seek(20, os.SEEK_SET) fh.write(struct.pack("<i", frame)) fh.seek(0, os.SEEK_END) fh.write( struct.pack( "<i6di", 48, len_a, gamma, len_b, beta, alpha, len_c, 48, ), ) num = struct.pack("<i", 4*num_particles) for i in range(3): fh.write(num) coordinate = array.array( "f", (position[i] for position in positions), ) coordinate.tofile(fh) fh.write(num) fh.flush()
[docs] def start_log( self, name: str, ) -> None: """Utility to start writing a log file. :param name: The directory/name of log file to be written. """ filename = self._parse_name(name, ext="log") with open(filename, "w") as fh: fh.write(f"{' QM/MM/PME Logger ':=^72}\n")
[docs] def write_to_log( self, name: str, lines: str, frame: int, ) -> None: """Write data to an existing log file. :param name: The directory/name of log file to be written. :param lines: The lines to be written to the log file. :param frame: |frame| """ filename = self._parse_name(name, ext="log") with open(filename, "a") as fh: fh.write(f"{' Frame ' + f'{frame:0>6}' + ' ':-^72}\n") fh.write(lines + "\n") fh.flush()
[docs] def end_log( self, name: str, ) -> None: """Terminate an existing log file. :param name: The directory/name of log file to be terminated. """ filename = self._parse_name(name, ext="log") with open(filename, "a") as fh: fh.write(f"{' End of Log ':=^72}")
[docs] def start_csv( self, name: str, header: str, ) -> None: """Utility to start writing a CSV file. :param name: The directory/name of CSV file to be written. :param header: The header for the CSV file. """ filename = self._parse_name(name, ext="csv") with open(filename, "w") as fh: fh.write(header + "\n")
[docs] def write_to_csv( self, name: str, line: str, ) -> None: """Write data to an existing CSV file. :param name: The directory/name of CSV file to be written. :param lines: The lines to be written to the CSV file. """ filename = self._parse_name(name, ext="csv") with open(filename, "a") as fh: fh.write(line + "\n") fh.flush()
def _parse_name(self, name: str, ext: str = "") -> str: """Ensure that the given name has the correct directory path and extension. :param name: The directory/name of the file to be written. :param ext: The desired extension of the file to be written. """ filename = "" if not name.startswith(self._working_directory): filename = self._working_directory filename += name if not name.endswith(ext): filename += "." + ext return filename
def _check_ext(filename: str, ext: str) -> None: """Ensure that the given filename has the correct extension. :param filename: The name of the file. :param ext: The desired extension. """ if not filename.endswith(ext): bad_ext = filename.split(".")[-1] raise ValueError( ( f"Got extension'{bad_ext}' from file '{filename}'. " + f"Expected extension '{ext}'" ), ) def _get_system_data( pdb_list: list[str], topology_list: list[str], forcefield_list: list[str], ) -> tuple[ dict[str, NDArray[np.float64]], dict[str, list[str]], dict[str, list[list[int]]], ]: """Extract :class:`State` and :class:`Topology` data from PDB and XML files using OpenMM. :param pdb_list: |pdb_list| :param topology_list: |topology_list| :param forcefield_list: |forcefield_list| :return: Data for the :class:`State` and :class:`Topology` record classes. """ # Create the OpenMM Modeller object and extract relevant topology # data from the Atoms and Residues. for fh in topology_list: openmm.app.topology.Topology().loadBondDefinitions(fh) openmm_pdb = openmm.app.pdbfile.PDBFile(pdb_list[0]) openmm_modeller = openmm.app.modeller.Modeller( openmm_pdb.topology, openmm_pdb.positions, ) openmm_residues = list(openmm_pdb.topology.residues()) openmm_atoms = list(openmm_pdb.topology.atoms()) atoms = [ [atom.index for atom in residue.atoms()] for residue in openmm_residues ] elements = [atom.element.symbol for atom in openmm_atoms] residue_names = [residue.name for residue in openmm_residues] atom_names = [atom.name for atom in openmm_atoms] name_data = { "elements": elements, "residue_names": residue_names, "atom_names": atom_names, } residue_data = { "atoms": atoms, } # Load the OpenMM ForceField and create a System in order to get # charge and mass data. openmm_forcefield = openmm.app.forcefield.ForceField( *forcefield_list, ) openmm_modeller.addExtraParticles(openmm_forcefield) openmm_system = openmm_forcefield.createSystem( openmm_modeller.topology, ) nonbonded_force = [ force for force in [ openmm_system.getForce(i) for i in range(openmm_system.getNumForces()) ] if isinstance(force, openmm.NonbondedForce) ][0] masses = np.array( [ openmm_system.getParticleMass( atom.index, )/simtk.unit.dalton for atom in openmm_atoms ], ) charges = np.array( [ nonbonded_force.getParticleParameters( atom.index, )[0]/simtk.unit.elementary_charges for atom in openmm_atoms ], ) positions = np.array( [ vector/simtk.unit.angstrom for vector in openmm_modeller.getPositions() ], ) box = np.array( [ vector/simtk.unit.angstrom for vector in openmm_modeller.topology.getPeriodicBoxVectors() ], ) state_data = { "masses": masses, "charges": charges, "positions": positions, "box": box, } return state_data, name_data, residue_data