#
# Copyright 2015 by Justin MacCallum, Alberto Perez, Ken Dill
# All rights reserved
#
"""
Module for MELD input/output
The main class is :class:`DataStore` which handles all IO for a MELD run.
"""
import contextlib
import os
import pickle
import shutil
import time
from typing import Iterator, Optional, Sequence
import netCDF4 as cdf # type: ignore
import numpy as np # type: ignore
from meld import interfaces
from meld.system import options, param_sampling, pdb_writer, state
ENERGY_GROUPS = 7
def _load_pickle(data):
"""Read in pickle file.
Work around incompatibility between python 2 and 3 when
reading in pickle files saved with python 2.
"""
try:
return pickle.load(data)
except UnicodeDecodeError:
return pickle.load(data, encoding="latin1")
[docs]class DataStore:
"""
Load / store data for MELD runs.
Data will be stored in the 'Data' subdirectory. Backups will be stored
in 'Data/Backup'.
Some information is stored as python pickled files:
- Data/data_store.dat -- the :class:`DataStore` object
- Data/communicator.dat -- the :class:`MPICommunicator` object
- Data/remd_runner.dat -- the :class:`LeaderReplicaExchangeRunner` object
Other data (positions, velocities, etc) is stored in files in the Data/Blocks
diretory.
"""
#
# data paths
#
_data_dir: str = "Data"
_backup_dir: str = os.path.join(_data_dir, "Backup")
_blocks_dir: str = os.path.join(_data_dir, "Blocks")
log_dir: str = "Logs"
"Sub-directory for log files"
_data_store_filename: str = "data_store.dat"
_data_store_path: str = os.path.join(_data_dir, _data_store_filename)
_data_store_backup_path: str = os.path.join(_backup_dir, _data_store_filename)
_communicator_filename: str = "communicator.dat"
_communicator_path: str = os.path.join(_data_dir, _communicator_filename)
_communicator_backup_path: str = os.path.join(_backup_dir, _communicator_filename)
_remd_runner_filename: str = "remd_runner.dat"
_remd_runner_path: str = os.path.join(_data_dir, _remd_runner_filename)
_remd_runner_backup_path: str = os.path.join(_backup_dir, _remd_runner_filename)
_system_filename: str = "system.dat"
_system_path: str = os.path.join(_data_dir, _system_filename)
_system_backup_path: str = os.path.join(_backup_dir, _system_filename)
_run_options_filename: str = "run_options.dat"
_run_options_path: str = os.path.join(_data_dir, _run_options_filename)
_run_options_backup_path: str = os.path.join(_backup_dir, _run_options_filename)
_net_cdf_filename_template: str = "block_{:06d}.nc"
_net_cdf_path_template: str = os.path.join(_blocks_dir, _net_cdf_filename_template)
_traj_filename: str = "trajectory.pdb"
_traj_path: str = os.path.join(_data_dir, _traj_filename)
_cdf_data_set: Optional[cdf.Dataset] = None
[docs] def __init__(
self,
state_template: interfaces.IState,
n_replicas: int,
pdb_writer: pdb_writer.PDBWriter,
block_size: int = 100,
):
"""
Initialize the DataStore
Args:
n_atoms: number of atoms
n_replicas: number of replicas
pdb_writer: the object to handle writing pdb files
block_size: size of netcdf blocks and frequency to do backups
"""
self._n_atoms = state_template.positions.shape[0]
self._n_discrete_parameters = state_template.parameters.discrete.shape[0]
self._n_continuous_parameters = state_template.parameters.continuous.shape[0]
self._n_mappings = state_template.mappings.shape[0]
self._n_alignments = state_template.rdc_alignments.shape[0]
self._n_replicas = n_replicas
self._block_size = block_size
self._cdf_data_set = None
self._readonly_mode = False
self._pdb_writer = pdb_writer
self._current_stage = 0
self._current_block = 0
self._max_safe_block = -1
self._readonly_mode = False
def __getstate__(self):
# don't save some fields to disk
excluded = ["_cdf_data_set"]
return dict((k, v) for (k, v) in self.__dict__.items() if k not in excluded)
def __setstate__(self, state):
# set _cdf_data_set to None
self.__dict__ = state
self._cdf_data_set = None
def __del__(self):
# close the _cdf_data_set when we go out of scope
if hasattr(self, "_cdf_data_set"):
if self._cdf_data_set:
self._cdf_data_set.close()
#
# properties
#
@property
def n_replicas(self) -> int:
"The number of replicas"
return self._n_replicas
@property
def n_atoms(self) -> int:
"The number of atoms"
return self._n_atoms
#
# public methods
#
[docs] def initialize(self, mode: str):
"""
Prepare to use the DataStore object.
Args:
mode: mode to open with
Available modes are:
- 'w' -- create a new directory structure and initialize the hd5 file
- 'a' -- append to the existing files
- 'r' -- open the file in read-only mode
"""
if mode == "w":
if os.path.exists(self._data_dir):
raise RuntimeError("Data directory already exists")
else:
os.mkdir(self._data_dir)
os.mkdir(self._blocks_dir)
os.mkdir(self._backup_dir)
if os.path.exists(self.log_dir):
raise RuntimeError("Logs directory already exists")
else:
os.mkdir(self.log_dir)
self._current_block = 0
self._current_stage = 0
self._create_cdf_file()
elif mode == "a":
block_path = self._net_cdf_path_template.format(self._current_block)
if os.path.exists(block_path):
self._cdf_data_set = cdf.Dataset(block_path, "a")
else:
self._create_cdf_file()
elif mode == "r":
self._current_block = 0
self._readonly_mode = True
self._load_cdf_file_readonly()
else:
raise RuntimeError(f"Unknown value for mode={mode}")
[docs] def close(self):
"""Close the DataStore"""
if self._cdf_data_set:
self._cdf_data_set.close()
self._cdf_data_set = None
[docs] def save_data_store(self):
"""Save this object to disk."""
with open(self._data_store_path, "wb") as store_file:
pickle.dump(self, store_file)
[docs] @classmethod
def load_data_store(cls, load_backup: bool = False):
"""
Load the DataStore object from disk.
Args:
load_backup: whether to load the backup
"""
path = cls._data_store_backup_path if load_backup else cls._data_store_path
with open(path, "rb") as store_file:
return _load_pickle(store_file)
[docs] def save_communicator(self, comm: interfaces.ICommunicator):
"""Save the communicator to disk"""
self._can_save()
with open(self._communicator_path, "wb") as comm_file:
pickle.dump(comm, comm_file)
[docs] def load_communicator(self) -> interfaces.ICommunicator:
"""Load the communicator from disk"""
if self._readonly_mode:
path = self._communicator_backup_path
else:
path = self._communicator_path
with open(path, "rb") as comm_file:
return _load_pickle(comm_file)
[docs] def save_positions(self, positions: np.ndarray, stage: int):
"""
Save the positions to disk.
Args:
positions: n_replicas x n_atoms x 3 array
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["positions"][..., stage] = positions
[docs] def load_positions(self, stage: int) -> np.ndarray:
"""
Load positions from disk.
Args:
stage: stage to load
Returns:
n_replicas x n_atoms x 3 array
.. warning::
:meth:`load_positions` can only access moving forward in time.
Attempts to move backwards in time will raise an error.
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["positions"][..., stage]
[docs] def load_positions_random_access(self, stage: int) -> np.ndarray:
"""
Load positions from disk.
Args:
stage: stage to load
Returns:
n_replicas x n_atoms x 3 array
Note:
This differs from :meth:`load_positions` in that you can positions
from any stage, while :meth:`load_positions` can only move forward
in time. However, this comes at a performance penalty.
"""
# get the block for this stage
block = self._block_for_stage(stage)
# if it's the current block, then just return the positions
if block == self._current_block:
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["positions"][..., stage]
# otherwise open the file, grab the positions, and then close it
else:
path = self._net_cdf_path_template.format(block)
with contextlib.closing(cdf.Dataset(path, "r")) as dataset:
return dataset.variables["positions"][..., stage]
[docs] def load_all_positions(self) -> np.ndarray:
"""
Load all positions from disk.
Returns:
n_steps x n_replicas x n_atoms x 3 array
.. warning::
This could use a lot of memory.
"""
return np.concatenate(
[
np.array(self.load_positions(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def iterate_positions(
self, start: Optional[int] = None, end: Optional[int] = None
) -> Iterator[np.ndarray]:
"""
Iterate the positions over time.
Args:
start: starting step
end: ending step
Returns:
An iterator over steps of n_replicas x n_atoms x 3 array
"""
if start is None:
start = 0
if end is None:
end = self.max_safe_frame
for i in range(start, end):
yield self.load_positions(i)
[docs] def save_velocities(self, velocities: np.ndarray, stage: int):
"""
Save velocities to disk.
Args:
velocities: n_replicas x n_atoms x 3 array
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["velocities"][..., stage] = velocities
[docs] def load_velocities(self, stage: int) -> np.ndarray:
"""
Load velocities from disk.
Args:
stage: stage to load
Returns:
n_replicas x n_atoms x 3 array
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["velocities"][..., stage]
[docs] def load_all_velocities(self) -> np.ndarray:
"""
Load all velocities from disk.
Returns:
n_steps x n_replicas x n_atoms x 3 array
.. warning::
This could use a lot of memory.
"""
return np.concatenate(
[
np.array(self.load_velocities(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def save_box_vectors(self, box_vectors: np.ndarray, stage: int):
"""
Save the box_vectors to disk.
Args:
positions: n_replicas x 3 x 3 array
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["box_vectors"][..., stage] = box_vectors
[docs] def load_box_vectors(self, stage: int) -> np.ndarray:
"""
Load box_vectors from disk.
Args:
stage: stage to load
Returns:
n_replicas x 3 x 3 array
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["box_vectors"][..., stage]
[docs] def load_all_box_vectors(self) -> np.ndarray:
"""
Load all box_vectors from disk.
Returns:
n_steps x n_replicas x 3 x 3 array
.. warning::
This could use a lot of memory.
"""
return np.concatenate(
[
np.array(self.load_box_vectors(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def iterate_box_vectors(
self, start: Optional[int] = None, end: Optional[int] = None
) -> Iterator[np.ndarray]:
"""
Iterate over the box_vectors from disk.
Args:
start: starting frame
end: ending frame
Returns:
iterator over n_replicas x 3 x 3 array
"""
if start is None:
start = 0
if end is None:
end = self.max_safe_frame
for i in range(start, end):
yield self.load_box_vectors(i)
[docs] def save_states(self, states: Sequence[interfaces.IState], stage: int):
"""
Save states to disk.
Args:
states: states to store
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
positions = np.array([s.positions for s in states])
velocities = np.array([s.velocities for s in states])
alphas = np.array([s.alpha for s in states])
energies = np.array([s.energy for s in states])
group_energies = np.array([s.group_energies for s in states])
box_vectors = np.array([s.box_vector for s in states])
discrete_parameters = np.array(
[s.parameters.discrete for s in states], dtype=np.int32
)
continuous_parameters = np.array(
[s.parameters.continuous for s in states], dtype=np.float64
)
mappings = np.array([s.mappings for s in states], dtype=int)
alignments = np.array([s.rdc_alignments for s in states], dtype=np.float64)
self.save_positions(positions, stage)
self.save_velocities(velocities, stage)
self.save_box_vectors(box_vectors, stage)
self.save_alphas(alphas, stage)
self.save_energies(energies, stage)
self.save_group_energies(group_energies, stage)
self.save_discrete_parameters(discrete_parameters, stage)
self.save_continuous_parameters(continuous_parameters, stage)
self.save_mappings(mappings, stage)
self.save_alignments(alignments, stage)
[docs] def load_states(self, stage: int) -> Sequence[interfaces.IState]:
"""
Load states from disk
Args:
stage: stage to load
Returns:
list of states
"""
self._handle_load_stage(stage)
positions = self.load_positions(stage)
velocities = self.load_velocities(stage)
box_vectors = self.load_box_vectors(stage)
alphas = self.load_alphas(stage)
energies = self.load_energies(stage)
group_energies = self.load_group_energies(stage)
discrete_parameters = self.load_discrete_parameters(stage)
continuous_parameters = self.load_continuous_parameters(stage)
mappings = self.load_mappings(stage)
alignments = self.load_alignments(stage)
states = []
for i in range(self._n_replicas):
s = state.SystemState(
positions[i],
velocities[i],
alphas[i],
energies[i],
group_energies[i],
box_vectors[i],
param_sampling.ParameterState(
discrete_parameters[i], continuous_parameters[i]
),
mappings[i],
alignments[i],
)
states.append(s)
return states
[docs] def append_traj(self, state: interfaces.IState, stage: int):
"""
Append structure from state to end of trajectory
Args:
state: state to append
stage: stage number
"""
pdb_string = self._pdb_writer.get_pdb_string(state.positions, stage)
with open(self._traj_path, "a") as traj_file:
traj_file.write(pdb_string)
[docs] def save_alphas(self, alphas: np.ndarray, stage: int):
"""
Save alphas to disk.
Args:
alphas: n_replicas array
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["alphas"][..., stage] = alphas
[docs] def load_alphas(self, stage: int) -> np.ndarray:
"""
Load alphas from disk.
Args:
stage: stage to load from disk
Returns:
n_replicas array
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["alphas"][..., stage]
[docs] def load_all_alphas(self) -> np.ndarray:
"""
Load all alphas from disk.
Returns:
n_stage x n_replicas array
.. warning::
This could use a lot of memory.
"""
return np.concatenate(
[
np.array(self.load_alphas(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def save_energies(self, energies: np.ndarray, stage: int):
"""
Save energies to disk.
Args:
energies: n_replicas array of energy
stage: stage to save
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["energies"][..., stage] = energies
[docs] def load_energies(self, stage) -> np.ndarray:
"""
Load energies from disk.
Args:
stage: stage to load
Returns:
n_replicas array of energies
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["energies"][..., stage]
def save_group_energies(self, group_energies: np.ndarray, stage):
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["group_energies"][..., stage] = group_energies
def load_group_energies(self, stage) -> np.ndarray:
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["group_energies"][..., stage]
[docs] def load_all_energies(self) -> np.ndarray:
"""
Load all energies from disk.
Returns:
n_stage x n_replicas array of energies
.. warning::
This could use a lot of memory
"""
return np.concatenate(
[
np.array(self.load_energies(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def save_energy_matrix(self, energy_matrix: np.ndarray, stage: int):
"""
Save energy matrix to disk
Args:
energy_matrix: n_replicas x n_replicas matrix of energies
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["energy_matrix"][..., stage] = energy_matrix
[docs] def load_energy_matrix(self, stage: int) -> np.ndarray:
"""
Load energy matrix from disk
Args:
stage: stage to laod
Returns:
n_replicas x n_replicas array of energies
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["energy_matrix"][..., stage]
[docs] def load_all_energy_matrices(self) -> np.ndarray:
"""
Load all energy matrix from disk
Returns:
n_stages x n_replicas x n_replicas array of energies
"""
return np.concatenate(
[
np.array(self.load_energy_matrix(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def save_permutation_vector(self, perm_vec: np.ndarray, stage: int):
"""
Save permutation vector to disk.
Args:
perm_vec: n_replicas array of int
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
self._cdf_data_set.variables["permutation_vectors"][..., stage] = perm_vec
[docs] def load_permutation_vector(self, stage: int) -> np.ndarray:
"""
Load permutation vector from disk.
Args:
stage: stage to load
Returns:
n_replicas array of int
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
return self._cdf_data_set.variables["permutation_vectors"][..., stage]
[docs] def load_all_permutation_vectors(self) -> np.ndarray:
"""
Load all permutation vector from disk.
Returns:
n_stages x n_replicas array of int
.. warning::
This might take a lot of memory
"""
return np.concatenate(
[
np.array(self.load_permutation_vector(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
[docs] def iterate_permutation_vectors(
self, start: Optional[int] = None, end: Optional[int] = None
) -> Iterator[np.ndarray]:
"""
Iterate over the permutation vectors from disk.
Args:
start: starting stage
end: ending stage
Returns:
an iterator over n_replicas array of int
"""
if start is None:
start = 0
if end is None:
end = self.max_safe_frame
for i in range(start, end):
yield self.load_permutation_vector(i)
[docs] def save_acceptance_probabilities(self, accept_probs: np.ndarray, stage: int):
"""
Save acceptance probabilities vector to disk.
Args:
accept_probs: n_replica_pairs array
stage: stage to store
"""
self._can_save()
self._handle_save_stage(stage)
assert self._cdf_data_set is not None
ds = self._cdf_data_set
ds.variables["acceptance_probabilities"][..., stage] = accept_probs
[docs] def load_acceptance_probabilities(self, stage: int) -> np.ndarray:
"""
Load acceptance probability vector from disk.
Args:
stage: stage to load
Returns:
n_replica_pairs array
"""
self._handle_load_stage(stage)
assert self._cdf_data_set is not None
ds = self._cdf_data_set
return ds.variables["acceptance_probabilities"][..., stage]
[docs] def load_all_acceptance_probabilities(self) -> np.ndarray:
"""
Load all acceptance probabilities from disk
Returns:
n_stages x n_replica_pairs array
.. warning::
This might take a lot of memory
"""
return np.concatenate(
[
np.array(self.load_acceptance_probabilities(i))[..., np.newaxis]
for i in range(self.max_safe_frame)
],
axis=-1,
)
def save_discrete_parameters(self, data, stage):
self._can_save()
self._handle_save_stage(stage)
ds = self._cdf_data_set
ds.variables["discrete_parameters"][..., stage] = data
def load_discrete_parameters(self, stage):
self._handle_load_stage(stage)
ds = self._cdf_data_set
return ds.variables["discrete_parameters"][..., stage]
def save_continuous_parameters(self, data, stage):
self._can_save()
self._handle_save_stage(stage)
ds = self._cdf_data_set
ds.variables["continuous_parameters"][..., stage] = data
def load_continuous_parameters(self, stage):
self._handle_load_stage(stage)
ds = self._cdf_data_set
return ds.variables["continuous_parameters"][..., stage]
def save_mappings(self, data, stage):
self._can_save()
self._handle_save_stage(stage)
ds = self._cdf_data_set
ds.variables["mappings"][..., stage] = data
def load_mappings(self, stage):
self._handle_load_stage(stage)
ds = self._cdf_data_set
return ds.variables["mappings"][..., stage]
def save_alignments(self, data, stage):
self._can_save()
self._handle_save_stage(stage)
ds = self._cdf_data_set
ds.variables["rdc_alignments"][..., stage] = data
def load_alignments(self, stage):
self._handle_load_stage(stage)
ds = self._cdf_data_set
return ds.variables["rdc_alignments"][..., stage]
[docs] def save_remd_runner(self, runner):
"""
Save replica runner to disk
Args:
runner (LeaderReplicaExchangeRunner): replica exchange runner to save
"""
self._can_save()
with open(self._remd_runner_path, "wb") as runner_file:
pickle.dump(runner, runner_file)
[docs] def load_remd_runner(self):
"""
Load replica runner from disk
Returns:
LeaderReplicaExchangeRunner
"""
path = (
self._remd_runner_backup_path
if self._readonly_mode
else self._remd_runner_path
)
with open(path, "rb") as runner_file:
return _load_pickle(runner_file)
[docs] def save_system(self, system: interfaces.ISystem):
"""
Save MELD system to disk
Args:
system: system to save
"""
self._can_save()
with open(self._system_path, "wb") as system_file:
pickle.dump(system, system_file)
[docs] def load_system(self) -> interfaces.ISystem:
"""Load MELD system from disk"""
path = self._system_backup_path if self._readonly_mode else self._system_path
with open(path, "rb") as system_file:
return _load_pickle(system_file)
[docs] def save_run_options(self, run_options: options.RunOptions):
"""
Save RunOptions to disk
Args:
run_options: options to save to disk
"""
self._can_save()
with open(self._run_options_path, "wb") as options_file:
pickle.dump(run_options, options_file)
[docs] def load_run_options(self) -> options.RunOptions:
"""Load RunOptions from disk"""
path = (
self._run_options_backup_path
if self._readonly_mode
else self._run_options_path
)
with open(path, "rb") as options_file:
options = _load_pickle(options_file)
return options
[docs] def backup(self, stage: int):
"""
Backup all files to Data/Backup.
Backup will occur if `stage % backup_freq == 0`
Args:
stage: stage
"""
self._can_save()
if not stage % self._block_size:
self._backup(self._communicator_path, self._communicator_backup_path)
self._backup(self._data_store_path, self._data_store_backup_path)
self._backup(self._remd_runner_path, self._remd_runner_backup_path)
self._backup(self._system_path, self._system_backup_path)
self._backup(self._run_options_path, self._run_options_backup_path)
#
# private methods
#
def _create_cdf_file(self):
# create the file
path = self._net_cdf_path_template.format(self._current_block)
ds = cdf.Dataset(path, "w", format="NETCDF4")
# setup dimensions
ds.createDimension("n_replicas", self._n_replicas)
ds.createDimension("n_replica_pairs", self._n_replicas - 1)
ds.createDimension("n_atoms", self._n_atoms)
ds.createDimension("cartesian", 3)
ds.createDimension("timesteps", None)
ds.createDimension("n_discrete_parameters", self._n_discrete_parameters)
ds.createDimension("n_continuous_parameters", self._n_continuous_parameters)
ds.createDimension("n_mappings", self._n_mappings)
ds.createDimension("n_alignments", self._n_alignments)
ds.createDimension("n_energy_groups", ENERGY_GROUPS)
# setup variables
ds.createVariable(
"positions",
float,
["n_replicas", "n_atoms", "cartesian", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"velocities",
float,
["n_replicas", "n_atoms", "cartesian", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"box_vectors",
float,
["n_replicas", "cartesian", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"alphas",
float,
["n_replicas", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"energies",
float,
["n_replicas", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"group_energies",
float,
["n_replicas", "n_energy_groups", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"permutation_vectors",
int,
["n_replicas", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"energy_matrix",
float,
["n_replicas", "n_replicas", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"acceptance_probabilities",
float,
["n_replica_pairs", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"discrete_parameters",
int,
["n_replicas", "n_discrete_parameters", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"continuous_parameters",
float,
["n_replicas", "n_continuous_parameters", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"mappings",
int,
["n_replicas", "n_mappings", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
ds.createVariable(
"rdc_alignments",
float,
["n_replicas", "n_alignments", "timesteps"],
zlib=True,
fletcher32=True,
shuffle=True,
complevel=9,
)
self._cdf_data_set = ds
def _backup(self, src, dest):
if os.path.exists(src):
try:
shutil.copy(src, dest)
except IOError:
# if we encounter an error, wait five seconds and try again
time.sleep(5)
shutil.copy(src, dest)
def _can_save(self):
if self._readonly_mode:
raise RuntimeError("Cannot save in safe mode.")
def _handle_save_stage(self, stage):
if stage < self._current_stage:
raise RuntimeError("Cannot go back in time")
self._current_stage = stage
block_index = self._block_for_stage(stage)
if block_index > self._current_block:
self.close()
self._max_safe_block = self._current_block
self._current_block = block_index
self._create_cdf_file()
def _handle_load_stage(self, stage):
block_index = self._block_for_stage(stage)
if self._readonly_mode:
if block_index > self._max_safe_block:
raise RuntimeError("Tried to read an unsafe block")
else:
if block_index < self._current_block:
raise RuntimeError(
"Tried to load from an index before the current block,"
"which is not allowed."
)
if block_index != self._current_block:
self.close()
self._current_block = block_index
self._load_cdf_file_readonly()
def _block_for_stage(self, stage):
return stage // self._block_size
def _load_cdf_file_readonly(self):
path = self._net_cdf_path_template.format(self._current_block)
self._cdf_data_set = cdf.Dataset(path, "r")
@property
def max_safe_frame(self):
"""Maximum safe fram that can be read"""
return (self._max_safe_block + 1) * self._block_size
@property
def max_safe_block(self):
"""Maximum safe block that can be read"""
return self._max_safe_block