-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcover.py
200 lines (154 loc) · 6.14 KB
/
cover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python
# coding=utf-8
import argparse
import glob
import os
import csv
import re
import sys
import scipy
import pdb
import vtk
from scipy.interpolate import interp1d
import numpy as np
from collections import defaultdict, OrderedDict
from common import get_dict
from get_database import Database, Post, input_args
from vtk_functions import read_geo, write_geo, collect_arrays, get_all_arrays, ClosestPoints
from get_bc_integrals import get_res_names
from vtk_to_xdmf import write_xdmf
from simulation_io import get_caps_db, collect_results_db, collect_results_db_3d_3d, \
collect_results_db_3d_3d_spatial
from vtk.util.numpy_support import numpy_to_vtk as n2v
from vtk.util.numpy_support import vtk_to_numpy as v2n
# interpolate 1d to 3d in space and time (allow extrapolation due to round-off errors at bounds)
interp = lambda x_1d, y_1d, x_3d: interp1d(x_1d, y_1d.T, fill_value='extrapolate')(x_3d)
f = 'pressure'
out_path = '/home/pfaller/work/osmsc/cover/'
def main(db, geometries):
# get post-processing constants
post = Post()
for geo in geometries:
# generate plots
print(geo)
# read results
deform = 'deformable' in db.study
res, time = collect_results_db(db, geo, post.models, deformable=deform)
# name of reference solution
m_ref = '3d_rerun'
roms = ['0d', '1d']
# times
t_ref = time[m_ref]
res_all = defaultdict(list)
err = []
for m_rom in roms:
# reference time
t_rom = time[m_rom]
for br in list(res.keys()):
# retrieve 3d results
res_ref = res[br][f][m_ref + '_int_last']
# map paths to interval [0, 1]
path_rom = res[br][m_rom + '_path'] / res[br][m_rom + '_path'][-1]
path_ref = res[br][m_ref + '_path'] / res[br][m_ref + '_path'][-1]
# interpolate in space and time
res_rom = interp(path_rom, res[br][f][m_rom + '_int_last'], path_ref)
res_rom = interp(t_rom, res_rom, t_ref)
if m_rom == '0d':
# add 3d results only once in loop
res_all[m_ref] += [res_ref]
# error (use 0d since generally worse than 1d)
err += [np.linalg.norm(res_ref - res_rom, axis=0)]
# add rom results
res_all[m_rom] += [res_rom]
# error over all time steps
err_t = np.linalg.norm(np.array(err), axis=0)
# find time step with smallest error
i_min = np.argmin(err_t)
t_min = t_ref[i_min]
# all results at time step with smallest error
res_min = defaultdict(list)
for m in roms:
for br in list(res.keys()):
res_min[m] += [res_all[m][br][:, i_min]]
# generate "straight" centerline
cent = straighten(read_geo(db.get_centerline_path(geo)).GetOutput(), res)
# export for plotting
i_3d = np.where(time['3d_rerun_last_cycle_i'])[0][i_min]
for m in roms:
write_rom(db, geo, cent, res_min, m)
write_3d(db, geo, i_3d)
def write_3d(db, geo, i_3d):
# read 3d results
res_3d = read_geo('/home/pfaller/work/osmsc/studies/ini_1d_quad/3d_flow/' + geo + '.vtu').GetOutput()
# read 3d mesh
geo_3d = read_geo(db.get_volume_mesh(geo)).GetOutput()
# get results at 3d time step
name = f + '_' + str(db.get_3d_increment(geo) * i_3d).zfill(5)
# get results
array = n2v(v2n(res_3d.GetPointData().GetArray(name)) * Post().convert[f])
array.SetName(f)
# export
geo_3d.GetPointData().AddArray(array)
write_geo(out_path + '/3d/' + geo + '.vtu', geo_3d)
def write_rom(db, geo, cent, res, m_rom):
# assign results
branches = v2n(cent.GetPointData().GetArray('BranchId'))
res_cent = np.zeros(cent.GetNumberOfPoints())
for br, res_br in enumerate(res[m_rom]):
res_cent[branches == br] = res_br * Post().convert[f]
# set results
array = n2v(res_cent)
array.SetName(f)
cent.GetPointData().AddArray(array)
# export
write_geo(out_path + '/' + m_rom + '/' + geo + '.vtp', cent)
def straighten(cent, res, m_rom='0d'):
# read centerline arrays
branches = v2n(cent.GetPointData().GetArray('BranchId'))
area = v2n(cent.GetPointData().GetArray('CenterlineSectionArea'))
path = v2n(cent.GetPointData().GetArray('Path'))
points = v2n(cent.GetPoints().GetData())
# new "linearized" arrays
area_lin = np.zeros(cent.GetNumberOfPoints())
points_lin = np.zeros((cent.GetNumberOfPoints(), 3))
for br in range(np.max(v2n(cent.GetPointData().GetArray('BranchId'))) + 1):
# all branch nodes
i_br = branches == br
path_br = path[i_br]
area_br = area[i_br]
points_br = points[i_br]
# get branch segments
path_rom = res[br][m_rom + '_path']
# loop all branch segments
for i in range(len(path_rom) - 1):
# first and last point in segment
p0 = np.argmin(np.abs(path_br - path_rom[i]))
p1 = np.argmin(np.abs(path_br - path_rom[i + 1]))
# segment points
i_seg = np.where(i_br)[0][p0:p1+1]
# segments for interpolation
dist_seg = np.array([0.0, 1.0])
path_seg = path_br[p0:p1+1].copy()
path_seg -= path_seg[0]
path_seg /= path_seg[-1]
# linearly interpolate areas
area_seg = np.array([area_br[p0], area_br[p1]])
area_lin[i_seg] = interp(dist_seg, area_seg, path_seg)
# linearly interpolate points
points_seg = np.vstack((points_br[p0], points_br[p1]))
points_lin[i_seg] = interp(dist_seg, points_seg, path_seg).T
# add radius
array = n2v(np.sqrt(area_lin / np.pi))
array.SetName('segment_radius')
cent.GetPointData().AddArray(array)
# replace points
points = vtk.vtkPoints()
points.Initialize()
for p in points_lin:
points.InsertNextPoint(p)
cent.SetPoints(points)
return cent
if __name__ == '__main__':
descr = 'Plot comparison of xd-results'
d, g, _ = input_args(descr)
main(d, g)