Source code for meld.system.builders.amber.builder

#
# Copyright 2015 by Justin MacCallum, Alberto Perez, Ken Dill
# All rights reserved
#

"""
Module to build a System from AmberSubSystems
"""

import logging
import subprocess
from dataclasses import dataclass, field
from functools import partial
from typing import List, Optional

import numpy as np  # type: ignore
import openmm as mm  # type: ignore
from openmm import app  # type: ignore
from openmm import unit as u  # type: ignore
from openmm.app import forcefield as ff  # type: ignore

from meld import util
from meld.system import indexing
from meld.system.builders.amber import amap, subsystem
from meld.system.builders.spec import SystemSpec

logger = logging.getLogger(__name__)


[docs]@partial(dataclass, frozen=True) class AmberOptions: default_temperature: u.Quantity = field(default_factory=lambda: 300.0 * u.kelvin) forcefield: str = "ff14sbside" solvation: str = "implicit" gb_radii: str = "mbondi3" implicit_solvent_model: str = "gbNeck2" solute_dielectric: Optional[float] = None solvent_dielectric: Optional[float] = None implicit_solvent_salt_conc: Optional[float] = None solvent_forcefield: str = "tip3p" solvent_distance: float = 0.6 explicit_ions: bool = False p_ion: str = "Na+" p_ioncount: int = 0 n_ion: str = "Cl-" n_ioncount: int = 0 enable_pme: bool = False pme_tolerance: float = 0.0005 enable_pressure_coupling: bool = False pressure: u.Quantity = field(default_factory=lambda: 1.01325 * u.bar) pressure_coupling_update_steps: int = 25 cutoff: Optional[float] = None remove_com: bool = True use_big_timestep: bool = False use_bigger_timestep: bool = False enable_amap: bool = False amap_alpha_bias: float = 1.0 amap_beta_bias: float = 1.0 def __post_init__(self): # Sanity checks for implicit and explicit solvent if self.solvation == "implicit": if self.enable_pme: raise ValueError("Using implicit solvation, but `enable_pme` is True.") if self.enable_pressure_coupling: raise ValueError( "Using implicit solvation, but `enable_pressure_coupling` is True." ) elif self.solvation == "explicit": if not self.enable_pme: raise ValueError("Using explicit solvation, but `enable_pme` is False.") if not self.enable_pressure_coupling: raise ValueError( "Using explicit solvation, but `enable_pressure_coupling` is False." ) if self.enable_amap: raise ValueError("Using explicit solvation, but `enable_amap` is True.") if self.cutoff is None: raise ValueError("Using explicit solvation, but `cutoff` is None.") else: raise ValueError(f"Unknown solvation model {self.solvation}") if self.forcefield not in ["ff12sb", "ff14sb", "ff14sbside"]: raise ValueError(f"Unknown forcefield {self.forcefield}") if self.gb_radii not in ["mbondi2", "mbondi3"]: raise ValueError(f"Unknown gb_radii {self.gb_radii}") if self.solvent_forcefield not in ["spce", "spceb", "opc", "tip3p", "tip4pew"]: raise ValueError(f"Unknown solvent_forcefield {self.solvent_forcefield}") if self.p_ion not in ["Na+", "K+", "Li+", "Rb+", "Cs+", "Mg+"]: raise ValueError(f"Unknown p_ion {self.p_ion}") if self.n_ion not in ["Cl-", "I-", "Br-", "F-"]: raise ValueError(f"Unknown n_ion {self.n_ion}") if isinstance(self.default_temperature, u.Quantity): object.__setattr__( self, "default_temperature", self.default_temperature.value_in_unit(u.kelvin), ) if self.default_temperature < 0: raise ValueError(f"default_temperature must be >= 0") if isinstance(self.pressure, u.Quantity): object.__setattr__(self, "pressure", self.pressure.value_in_unit(u.bar)) if self.pressure < 0: raise ValueError(f"pressure must be >= 0")
[docs]class AmberSystemBuilder: r""" Class to handle building a System from SubSystems. """ options: AmberOptions
[docs] def __init__(self, options: AmberOptions): """ Initialize a SystemBuilder Args: options: Options for building the system """ self.options = options self._set_forcefield() if self.options.solvation == "explicit": self._set_solvent_forcefield() self._solvent_dist = self.options.solvent_distance * 10.0 # nm to angstrom
[docs] def build_system( self, subsystems: List[subsystem._AmberSubSystem], leap_header_cmds: Optional[List[str]] = None, ) -> SystemSpec: """ Build the system from AmberSubSystems """ if not subsystems: raise ValueError("len(subsystems) must be > 0") if leap_header_cmds is None: leap_header_cmds = [] if isinstance(leap_header_cmds, str): leap_header_cmds = [leap_header_cmds] with util.in_temp_dir(): mol_ids = [] chains = [] current_res_index = 0 leap_cmds = [] leap_cmds.extend(self._generate_leap_header()) leap_cmds.extend(leap_header_cmds) for index, sub in enumerate(subsystems): # First we'll update the indexing for this subsystem for chain in sub._info.chains: residues_with_offset = { k: v + current_res_index for k, v in chain.residues.items() } chains.append(indexing._ChainInfo(residues_with_offset)) current_res_index += sub._info.n_residues # now add the leap commands for this subsystem mol_id = f"mol_{index}" mol_ids.append(mol_id) sub.prepare_for_tleap(mol_id) leap_cmds.extend(sub.generate_tleap_input(mol_id)) if self.options.solvation == "explicit": leap_cmds.extend(self._generate_solvent(mol_ids)) leap_cmds.extend(self._generate_leap_footer([f"solute"])) else: leap_cmds.extend(self._generate_leap_footer(mol_ids)) with open("tleap.in", "w") as tleap_file: tleap_string = "\n".join(leap_cmds) tleap_file.write(tleap_string) try: subprocess.check_call("tleap -f tleap.in > tleap.out", shell=True) except subprocess.CalledProcessError: print("Call to tleap failed.") print() print() print() print("=========") print("tleap.in") print("=========") print(open("tleap.in").read()) print() print() print() print("=========") print("tleap.out") print("=========") print(open("tleap.out").read()) print() print() print() print("========") print("leap.log") print("========") print(open("leap.log").read()) print() print() print() raise prmtop = app.AmberPrmtopFile("system.top") crd = app.AmberInpcrdFile("system.mdcrd") topology = prmtop.topology topology = _add_chains(topology, chains) system, barostat = _create_openmm_system( prmtop, self.options.solvation, self.options.cutoff, self.options.use_big_timestep, self.options.use_bigger_timestep, self.options.implicit_solvent_model, self.options.enable_pme, self.options.pme_tolerance, self.options.enable_pressure_coupling, self.options.pressure, self.options.pressure_coupling_update_steps, self.options.remove_com, self.options.default_temperature, self.options.implicit_solvent_salt_conc, self.options.solute_dielectric, self.options.solvent_dielectric, ) if self.options.enable_amap: amap.add_amap( system, topology, self.options.amap_alpha_bias, self.options.amap_beta_bias, ) integrator = _create_integrator( self.options.default_temperature, self.options.use_big_timestep, self.options.use_bigger_timestep, ) coords = crd.getPositions(asNumpy=True).value_in_unit(u.nanometer) try: vels = crd.getVelocities(asNumpy=True) except AttributeError: print("WARNING: No velocities found, setting to zero") vels = np.zeros_like(coords) try: box = crd.getBoxVectors(asNumpy=True) # We only support orthorhombic boxes box_a = box[0][0].value_in_unit(u.nanometer) assert box[0][1] == 0.0 * u.nanometer, "Only orthorhombic boxes supported" assert box[0][1] == 0.0 * u.nanometer, "Only orthorhombic boxes supported" box_b = box[1][1].value_in_unit(u.nanometer) assert box[1][0] == 0.0 * u.nanometer, "Only orthorhombic boxes supported" assert box[1][2] == 0.0 * u.nanometer, "Only orthorhombic boxes supported" box_c = box[2][2].value_in_unit(u.nanometer) assert box[2][0] == 0.0 * u.nanometer, "Only orthorhombic boxes supported" assert box[2][1] == 0.0 * u.nanometer, "Only orthorhombic boxes supported" box = np.array([box_a, box_b, box_c]) except AttributeError: box = None return SystemSpec( self.options.solvation, system, topology, integrator, barostat, coords, vels, box, { "solvation": self.options.solvation, "builder": "amber", "implicit_solvent_model": self.options.implicit_solvent_model, }, )
def _set_forcefield(self): ff_dict = { "ff12sb": "leaprc.ff12SB", "ff14sb": "leaprc.protein.ff14SB", "ff14sbside": "leaprc.protein.ff14SBonlysc", } self._forcefield = ff_dict[self.options.forcefield] def _set_solvent_forcefield(self): ff_dict = { "spce": "leaprc.water.spce", "spceb": "leaprc.water.spceb", "opc": "leaprc.water.opc", "tip3p": "leaprc.water.tip3p", "tip4pew": "leaprc.water.tip4pew", } box_dict = { "spce": "SPCBOX", "spceb": "SPCBOX", "opc": "OPCBOX", "tip3p": "TIP3PBOX", "tip4pew": "TIP4PEWBOX", } self._solvent_forcefield = ff_dict[self.options.solvent_forcefield] self._solvent_box = box_dict[self.options.solvent_forcefield] def _generate_leap_header(self): leap_cmds = [] leap_cmds.append(f"set default PBradii {self.options.gb_radii}") leap_cmds.append(f"source {self._forcefield}") if self.options.solvation == "explicit": leap_cmds.append(f"source {self._solvent_forcefield}") return leap_cmds def _generate_solvent(self, mol_ids): leap_cmds = [] list_of_mol_ids = "" for mol_id in mol_ids: list_of_mol_ids += f"{mol_id} " leap_cmds.append(f"solute = combine {{ {list_of_mol_ids} }}") leap_cmds.append(f"solvateBox solute {self._solvent_box} {self._solvent_dist}") if self.options.explicit_ions: leap_cmds.append( f"addIons solute {self.options.p_ion} {self.options.p_ioncount}" ) leap_cmds.append( f"addIons solute {self.options.n_ion} {self.options.n_ioncount}" ) return leap_cmds def _generate_leap_footer(self, mol_ids): leap_cmds = [] list_of_mol_ids = "" for mol_id in mol_ids: list_of_mol_ids += f"{mol_id} " leap_cmds.append(f"sys = combine {{ {list_of_mol_ids} }}") leap_cmds.append("check sys") leap_cmds.append("saveAmberParm sys system.top system.mdcrd") leap_cmds.append("quit") return leap_cmds
def _create_openmm_system( parm_object, solvation_type, cutoff, use_big_timestep, use_bigger_timestep, implicit_solvent, enable_pme, pme_tolerance, enable_pressure_coupling, pressure, pressure_coupling_update_steps, remove_com, default_temperature, implicitSolventSaltConc, soluteDielectric, solventDielectric, ): if solvation_type == "implicit": logger.info("Creating implicit solvent system") system = _create_openmm_system_implicit( parm_object, cutoff, use_big_timestep, use_bigger_timestep, implicit_solvent, remove_com, implicitSolventSaltConc, soluteDielectric, solventDielectric, ) baro = None elif solvation_type == "explicit": logger.info("Creating explicit solvent system") system, baro = _create_openmm_system_explicit( parm_object, cutoff, use_big_timestep, use_bigger_timestep, enable_pme, pme_tolerance, enable_pressure_coupling, pressure, pressure_coupling_update_steps, remove_com, default_temperature, ) else: raise ValueError(f"unknown value for solvation_type: {solvation_type}") return system, baro def _get_hydrogen_mass_and_constraints(use_big_timestep, use_bigger_timestep): if use_big_timestep: logger.info("Enabling hydrogen mass=3, constraining all bonds") constraint_type = ff.AllBonds hydrogen_mass = 3.0 * u.gram / u.mole elif use_bigger_timestep: logger.info("Enabling hydrogen mass=4, constraining all bonds") constraint_type = ff.AllBonds hydrogen_mass = 4.0 * u.gram / u.mole else: logger.info("Enabling hydrogen mass=1, constraining bonds with hydrogen") constraint_type = ff.HBonds hydrogen_mass = None return hydrogen_mass, constraint_type def _create_openmm_system_implicit( parm_object, cutoff, use_big_timestep, use_bigger_timestep, implicit_solvent, remove_com, implicitSolventSaltConc, soluteDielectric, solventDielectric, ): if cutoff is None: logger.info("Using no cutoff") cutoff_type = ff.NoCutoff cutoff_dist = 999.0 else: logger.info(f"Using a cutoff of {cutoff}") cutoff_type = ff.CutoffNonPeriodic cutoff_dist = cutoff hydrogen_mass, constraint_type = _get_hydrogen_mass_and_constraints( use_big_timestep, use_bigger_timestep ) if implicit_solvent == "obc": logger.info('Using "OBC" implicit solvent') implicit_type = app.OBC2 elif implicit_solvent == "gbNeck": logger.info('Using "gbNeck" implicit solvent') implicit_type = app.GBn elif implicit_solvent == "gbNeck2": logger.info('Using "gbNeck2" implicit solvent') implicit_type = app.GBn2 elif implicit_solvent == "vacuum" or implicit_solvent is None: logger.info("Using vacuum instead of implicit solvent") implicit_type = None else: RuntimeError("Should never get here") if implicitSolventSaltConc is None: implicitSolventSaltConc = 0.0 if soluteDielectric is None: soluteDielectric = 1.0 if solventDielectric is None: solventDielectric = 78.5 sys = parm_object.createSystem( nonbondedMethod=cutoff_type, nonbondedCutoff=cutoff_dist, constraints=constraint_type, implicitSolvent=implicit_type, removeCMMotion=remove_com, hydrogenMass=hydrogen_mass, implicitSolventSaltConc=implicitSolventSaltConc, soluteDielectric=soluteDielectric, solventDielectric=solventDielectric, ) return sys def _create_openmm_system_explicit( parm_object, cutoff, use_big_timestep, use_bigger_timestep, enable_pme, pme_tolerance, enable_pressure_coupling, pressure, pressure_couping_update_steps, remove_com, default_temperature, ): if cutoff is None: raise ValueError("cutoff must be set for explicit solvent, but got None") else: if enable_pme: logger.info(f"Using PME with tolerance {pme_tolerance}") cutoff_type = ff.PME else: logger.info("Using reaction field") cutoff_type = ff.CutoffPeriodic logger.info(f"Using a cutoff of {cutoff}") cutoff_dist = cutoff hydrogen_mass, constraint_type = _get_hydrogen_mass_and_constraints( use_big_timestep, use_bigger_timestep ) s = parm_object.createSystem( nonbondedMethod=cutoff_type, nonbondedCutoff=cutoff_dist, constraints=constraint_type, implicitSolvent=None, removeCMMotion=remove_com, hydrogenMass=hydrogen_mass, rigidWater=True, ewaldErrorTolerance=pme_tolerance, ) baro = None if enable_pressure_coupling: logger.info("Enabling pressure coupling") logger.info(f"Pressure is {pressure}") logger.info( f"Volume moves attempted every {pressure_couping_update_steps} steps" ) baro = mm.MonteCarloBarostat( pressure, default_temperature, pressure_couping_update_steps ) s.addForce(baro) return s, baro def _create_integrator(temperature, use_big_timestep, use_bigger_timestep): if use_big_timestep: logger.info("Creating integrator with 3.5 fs timestep") timestep = 3.5 * u.femtosecond elif use_bigger_timestep: logger.info("Creating integrator with 4.5 fs timestep") timestep = 4.5 * u.femtosecond else: logger.info("Creating integrator with 2.0 fs timestep") timestep = 2.0 * u.femtosecond return mm.LangevinIntegrator(temperature * u.kelvin, 1.0 / u.picosecond, timestep) def _add_chains(topology, chain_list): # Verify that the input from Amber only has one chain assert len(list(topology.chains())) == 1 newtop = app.Topology() # Add the chains to the new topology and # create a map between residues and chains. chain_map = {} for chain_info in chain_list: chain = newtop.addChain() for res in chain_info.residues.values(): chain_map[res] = chain # Now we'll create a final chain for solvent, etc # and everything left to this last chain. last_chain = newtop.addChain() for res in topology.residues(): if res.index not in chain_map: chain_map[res.index] = last_chain # Add all of the residues to the new topology, while # correcting the chain index. # Create a map between the old and new residues so # that we can add the atoms. residue_map = {} for residue in topology.residues(): chain = chain_map[residue.index] new_residue = newtop.addResidue(residue.name, chain, residue.index) residue_map[residue] = new_residue # Now add back all of the atoms with tne new residues. # We keep a map between the old and new atoms so that # we can add the bonds. atom_map = {} for atom in topology.atoms(): new_atom = newtop.addAtom( atom.name, atom.element, residue_map[atom.residue], atom.index ) atom_map[atom] = new_atom # Now we add all of the bonds for bond in topology.bonds(): newtop.addBond(atom_map[bond[0]], atom_map[bond[1]]) return newtop