-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathEquilibrationProtocol.py
More file actions
177 lines (151 loc) · 7.68 KB
/
EquilibrationProtocol.py
File metadata and controls
177 lines (151 loc) · 7.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import os
import time
from copy import deepcopy
import openmm
import numpy as np
import openmm.app as app
from openmm.unit import *
from openmmml import MLPotential
import pandas as pd
from utils.openmm_utils import parse_quantity, SystemArgs, cutoff_method, add_barostat, \
get_integrator, make_graphs
from utils.general_utils import printlog
from tqdm import tqdm
class EquilibrationProtocol:
def __init__(self,
production_ensemble: str,
system: openmm.System,
modeller: app.Modeller,
args: SystemArgs,
output_dir: str,
fixed_density: bool = True,
simulation_sequence: list=None,
force_amber = False,
use_existing_velocities = False):
self.output_dir = output_dir
self.modeller = modeller
self.args = args
self.system = system
self.simulation_sequence = simulation_sequence
self.force_amber = force_amber
self.use_existing_velocities = False
def run(self):
for simulation_id, equilibration_simulation in enumerate(self.simulation_sequence):
simulation_type = equilibration_simulation[0]
initial_temperature = equilibration_simulation[1]
final_temperature = equilibration_simulation[2]
duration = equilibration_simulation[3]
stepsize = equilibration_simulation[4]
ml_potential = equilibration_simulation[5]
print(f"[✓] Running {duration} {simulation_type} equilibration simulation, initial temp {initial_temperature} final temp {final_temperature}", os.path.join(self.output_dir, "output.log"))
self._run_simulation(simulation_type, duration, simulation_id, len(self.simulation_sequence)-1, initial_temperature, final_temperature, stepsize, ml_potential)
print(f"[✓] Finished equilibration simulation {simulation_id+1}/{len(self.simulation_sequence)}", os.path.join(self.output_dir, "output.log"))
def _run_simulation(self, simulation_type: str, duration: str, simulation_id: int = 0, max_simulation_id: int = 0, initial_temperature: float = 0, final_temperature: float = 0, stepsize: str = "1fs", ml_potential=None):
duration = parse_quantity(duration)
stepsize = parse_quantity(stepsize)
total_steps = int(duration / stepsize)
print(duration, stepsize, total_steps)
# gro = app.GromacsGroFile(self.args.pdb[:-4] + '.gro')
# top = app.GromacsTopFile(self.args.pdb[:-4] + '.top', periodicBoxVectors=gro.getPeriodicBoxVectors(), includeDir='.')
# system = top.createSystem(nonbondedMethod=app.PME, nonbondedCutoff=1.0*unit.nanometers, constraints=app.HBonds)
# # Read velocities from .gro file
# # Create a system with the Amber force field
# # forcefield = app.ForceField('/home/sebidom/dom/ml_ff_md/amber03.xml', '/home/sebidom/dom/ml_ff_md/tip4p-fb.xml')
# # system = forcefield.createSystem(
# # self.modeller.topology,
# # nonbondedMethod=cutoff_method[self.args.cutoffmethod],
# # nonbondedCutoff=self.args.nonbondedcutoff,
# # )
# if ml_potential is not None:
# print("Adding ML potential to Amber system in equilibration")
# chains = list(self.modeller.topology.chains())
# ml_atoms = [atom.index for atom in chains[0].atoms()]
# # Initialise the ML potential
# potential = MLPotential(ml_potential)
# # Add the ML system to the standard forcefield system
# system = potential.createMixedSystem(self.modeller.topology, system, ml_atoms)
# else:
system = deepcopy(self.system)
# Set up simulation
unit_cell_dims = self.modeller.getTopology().getUnitCellDimensions()
if simulation_type == "NVE":
raise NotImplementedError
elif simulation_type == "NVT":
pass
elif simulation_type == "NPT":
assert (unit_cell_dims is not None), "PDB file missing unit cell dimensions - cannot run NPT simulation."
add_barostat(barostat_type="MonteCarloBarostat",
system=system,
pressure=self.args.pressure,
temperature=self.args.temperature)
else:
raise ValueError(f"Invalid simulation type: {simulation_type}")
copied_args = SystemArgs(*[stepsize if f == "stepsize" else getattr(self.args, f) for f in self.args._fields])
integrator = get_integrator(args=copied_args, integrator_type=self.args.integrator)
properties = {}
if self.args.gpu is not "":
platform = openmm.Platform.getPlatformByName("CUDA")
properties["DeviceIndex"] = self.args.gpu
properties["Precision"] = self.args.precision
else:
platform = openmm.Platform.getPlatformByName("CPU")
# with open(self.args.pdb[:-4] + '.gro', 'r') as f:
# lines = f.readlines()
# velocities = []
# for line in lines[2:-1]:
# velocities.append([
# float(line[44:52]), # First velocity
# float(line[52:60]), # Second velocity
# float(line[60:68]) # Third velocity
# ])
# Create simulation object using the specified integrator
simulation = app.Simulation(
self.modeller.topology,
system,
integrator,
platform,
properties,
)
# Add reporter
simulation.reporters.append(
app.StateDataReporter(
os.path.join(self.output_dir, f"equilibration_{simulation_id}.log"),
100, # steps per save
step=True,
time=True,
speed=True,
temperature=True,
potentialEnergy=True,
kineticEnergy=True,
totalEnergy=True,
volume=True
if self.args.pressure
else False, # record volume and density for NPT simulations
density=True if self.args.pressure else False,
append=True if self.args.resume else False,
)
)
# Set initial temperature
# Load checkpoint if not the first simulation
checkpoint_path = os.path.join(self.output_dir, f"equilibration_{simulation_id - 1}.chk")
if simulation_id != 0 and os.path.exists(checkpoint_path):
simulation.loadCheckpoint(checkpoint_path)
elif self.use_existing_velocities:
simulation.context.setPositions(self.modeller.positions)
velocities = np.array(velocities) * sqrt(3)
simulation.context.setVelocities(velocities)
else:
simulation.context.setPositions(self.modeller.positions)
simulation.context.setVelocitiesToTemperature(initial_temperature)
for i in tqdm(range(int(total_steps))):
current_temp = initial_temperature + (final_temperature - initial_temperature) * i / total_steps
simulation.integrator.setTemperature(current_temp)
simulation.step(1)
# Make graphs
report = pd.read_csv(os.path.join(self.output_dir, f"equilibration_{simulation_id}.log"))
make_graphs(report, self.args.stepsize, self.output_dir, name=f"equilibration_{simulation_id}")
# Save state
if simulation_id < max_simulation_id:
simulation.saveCheckpoint(f"{self.output_dir}/equilibration_{simulation_id}.chk")
else:
simulation.saveCheckpoint(f"{self.output_dir}/equilibration_final.chk")