-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoptimization_run_from_0d.py
executable file
·179 lines (144 loc) · 5.44 KB
/
optimization_run_from_0d.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python
# coding=utf-8
import json
import os
import copy
import time
import numpy as np
from scipy.interpolate import CubicSpline
# svZeroDSolver from https://github.com/SimVascular/svZeroDSolver
import pysvzerod
from utils import f_geometries, f_geo_in, f_cali_0d_in, f_cali_0d_out
# Number of observations to extract in result refinement
NUM_OBS = 100
def refine(x: np.ndarray, y: np.ndarray, num: int) -> np.ndarray:
"""Refine a curve using cubic spline interpolation with derivative.
Args:
x: X-coordinates
y: Y-coordinates
num: New number of points of the refined data.
Returns:
new_y: New y-coordinates
new_dy: New dy-coordinates
"""
y = y.copy()
y[-1] = y[0]
x_new = np.linspace(x[0], x[-1], num)
spline = CubicSpline(x, y, bc_type="periodic")
new_y = spline(x_new)
new_dy = spline.derivative()(x_new)
return new_y.tolist(), new_dy.tolist()
def collect(out, res, name_vessel, name_other, is_out):
for f in ["flow", "pressure"]:
if is_out:
name = ":".join([f, name_vessel, name_other])
inout = "_out"
else:
name = ":".join([f, name_other, name_vessel])
inout = "_in"
i_vesel = res["name"] == name_vessel
data = np.array(res[f + inout][i_vesel])
time = np.array(res["time"][i_vesel])
out["y"][name], out["dy"][name] = refine(time, data, NUM_OBS)
def collect_results_from_0d(inp, res):
vessels = np.array(inp["vessels"])
out = {"y": {}, "dy": {}}
# collect connections between branches and junctions
for jc in inp["junctions"]:
for io in ["inlet", "outlet"]:
for vs in vessels[jc[io + "_vessels"]]:
collect(out, res, vs["vessel_name"], jc["junction_name"], io == "inlet")
# collect connections between branches and boundary conditions
for vs in inp["vessels"]:
if "boundary_conditions" in vs:
for bc_type, bc_name in vs["boundary_conditions"].items():
collect(out, res, vs["vessel_name"], bc_name, bc_type == "outlet")
# export results, time, and flow on same resampled time discretization
flow = out["y"]["flow:INFLOW:branch0_seg0"]
return out, np.linspace(0, np.max(res["time"]), len(flow)).tolist(), flow
def compare(geo, times):
print("\n", geo, "\n\n")
# run the estimation
inp0, cali = estimate(geo, times)
# compare 0D element values
for i in range(len(inp0["vessels"])):
for ele in cali["vessels"][i]["zero_d_element_values"].keys():
sol = cali["vessels"][i]["zero_d_element_values"][ele]
if ele in inp0["vessels"][i]["zero_d_element_values"]:
ref = inp0["vessels"][i]["zero_d_element_values"][ele]
else:
ref = 0.0
# calculate error
if ref == 0.0:
err = 1.0
else:
err = np.abs(sol / ref - 1.0)
# print results
out = str(i) + "\t" + ele[0]
for j in [ref, sol]:
out += "\t\t{:.1e}".format(j)
out += "\t\t{:+d}".format(int(np.log(err)))
# print(out)
def estimate(geo, times):
# read input file
with open(os.path.join(f_geo_in, geo + ".json")) as f:
inp = json.load(f)
inp0 = copy.deepcopy(inp)
# run forward simulation
try:
start_time = time.time()
res = pysvzerod.simulate(inp)
times["simulate"] += [time.time() - start_time]
except RuntimeError as e:
print("Simulation failed: ", e)
return
# collect results in format required for calibrator
out, bc_time, bc_flow = collect_results_from_0d(inp, res)
times["size"] += [len(out["y"])]
# replace inflow to match calibration data
for bc in inp["boundary_conditions"]:
if bc["bc_name"] == "INFLOW":
bc["bc_values"]["t"] = bc_time
bc["bc_values"]["Q"] = bc_flow
# set all elements to zero
for vessel in inp["vessels"]:
for ele in vessel["zero_d_element_values"].keys():
vessel["zero_d_element_values"][ele] = 0.0
# add calibration parameters
inp["calibration_parameters"] = {
"tolerance_gradient": 1e-5,
"tolerance_increment": 1e-10,
"maximum_iterations": 100,
"calibrate_stenosis_coefficient": True,
"set_capacitance_to_zero": False,
}
# only calibrate to last cycle
inp["simulation_parameters"]["output_all_cycles"] = False
# write output json
fname_in = os.path.join(f_cali_0d_in, geo + ".json")
with open(fname_in, "w") as f:
json.dump(inp | out, f, indent=4)
# run the calibration
with open(fname_in) as ff:
config = json.load(ff)
try:
start_time = time.time()
cali = pysvzerod.calibrate(config)
times["calibrate"] += [time.time() - start_time]
except RuntimeError as e:
print("Calibration failed: ", e)
return
# write calibrated json
fname_out = os.path.join(f_cali_0d_out, geo + ".json")
with open(fname_out, "w") as f:
json.dump(cali, f, indent=4)
return inp0, cali
if __name__ == "__main__":
# loop over all vmr models
files = np.loadtxt(f_geometries, dtype="str")
times = {"simulate": [], "calibrate": [], "size": []}
for f in files:
compare(f, times)
# report runtimes
for k, v in times.items():
print(k, np.sum(v), np.mean(v), np.std(v), np.min(v), np.max(v))