Skip to content

Commit

Permalink
CLI for generating waveform analytics on waveform data from mseed and…
Browse files Browse the repository at this point in the history
… ASDF input data (#258)

* Adding a First-cut QCQC CLI App

* Miscellaneous Changes

* Removed MPI-dependence from MseedIndex
* Updated response-factory to cater to requirements from QAQC App

* Adding Support for ASDF Input to WaveformAnalytics

* Added support for serial processing of waveform data from FedASDF
  infrastructure
* Minor changes to FederatedASDFViewer.py

* Instrument Response

* Added support for reading instrument responses in both stationXML and
  .resp formats
  • Loading branch information
geojunky authored Feb 7, 2024
1 parent e547ff9 commit 321eab3
Show file tree
Hide file tree
Showing 5 changed files with 949 additions and 77 deletions.
6 changes: 6 additions & 0 deletions seismic/ASDFdatabase/FederatedASDFViewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,12 @@ def setTraceImage(nc, sc, lc, cc, st=None, et=None):
show_noise_models=False, show=False)
fig.set_size_inches(TRACE_FIG_WIDTH, TRACE_FIG_HEIGHT)
# end if
fig.axes[0].text(0.01, 0.01,
'SR: {} Hz'.format(stream[0].stats.sampling_rate),
bbox=dict(facecolor='white', linewidth=0, alpha=0.7),
color='k',
fontsize=7, weight='bold',
transform=fig.axes[0].transAxes)
# end if

ti = FigureImage(fig=fig)
Expand Down
7 changes: 2 additions & 5 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,8 @@ def get_waveforms(self, network, station, location, channel, starttime,
# end if
# end for

# Trim traces
for t in s:
t.trim(starttime=starttime,
endtime=endtime)
# end for
# Trim stream
s.trim(starttime=starttime, endtime=endtime, nearest_sample=True)

# apply corrections if available
if(self.corrections_enabled):
Expand Down
204 changes: 137 additions & 67 deletions seismic/ASDFdatabase/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import numpy as np
from rtree import index
from glob import glob
from mpi4py import MPI
from collections import defaultdict
from obspy import read
from obspy.core import Stream, Trace
Expand Down Expand Up @@ -31,69 +30,134 @@ def remove_comments(iinv: Inventory) -> Inventory:
# end func

class MseedIndex:
class StreamCache:
def __init__(self):
self.streams = defaultdict(list)
self.read_times = defaultdict(list)
# end func

def get(self, fn):
if (fn in self.streams.keys()):
# print('found stream..')
return self.streams[fn]
else:
# print('reading stream..')
result = self._add(fn)
self._cleanup()

return result
# end if
# end func

def flush(self):
self.streams = defaultdict(list)
self.read_times = defaultdict(list)
# end func

def _cleanup(self):
MAX_STREAMS = 5
# before = len(self.streams)
while (len(self.streams) > MAX_STREAMS):
time_key = sorted(self.read_times.keys())[0]
file_key = self.read_times[time_key]

self.read_times.pop(time_key)
self.streams.pop(file_key)
# wend
# after = len(self.streams)
# if(before > after): print('cleaned up {} streams..'.format(before-after))
# end func

def _add(self, fn):
try:
self.streams[fn] = read(fn)
self.read_times[UTCDateTime.now().timestamp] = fn
return self.streams[fn]
except Exception as e:
print("Failed to read {} with error {}. Moving along..".format(fn, e))
# end try
# end func
# end class

def __init__(self, mseed_folder, pattern):
self.mseed_folder = mseed_folder
self.comm = MPI.COMM_WORLD
self.nproc = self.comm.Get_size()
self.rank = self.comm.Get_rank()
self.tree = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))

work_load = None
offsets = None
if (self.rank == 0):
self.mseed_files = np.array(sorted(glob(os.path.join(self.mseed_folder, pattern))))

#self.mseed_files = self.mseed_files[:1000]

work_load = split_list(self.mseed_files, self.nproc)
counts = np.array([len(item) for item in work_load])
offsets = np.append(0, np.cumsum(counts[:-1]))
self.stream_cache = MseedIndex.StreamCache()
self.mseed_files = np.array(sorted(glob(os.path.join(self.mseed_folder, pattern))))

fc = len(self.mseed_files)
if(fc > 0):
print('Found {} files:'.format(fc))
print(os.path.basename(self.mseed_files[0]))
print('..')
print('..')
else:
raise RuntimeError('No mseed files found with pattern {}. Aborting..'.format(pattern))
# end if
self.local_mseed_files = self.comm.scatter(work_load)
self.offsets = self.comm.bcast(offsets, root=0)

print('Reading metadata from mseed files..')
meta_list = []
for i, mseed_file in enumerate(tqdm(self.local_mseed_files, desc='Rank {}'.format(self.rank))):
self.meta_list = []
for i, mseed_file in enumerate(tqdm(self.mseed_files)):
st = None
try:
st = read(mseed_file, headonly=True)
except:
print("Failed to read {}. Moving along..". format(mseed_file))
except Exception as e:
print("Failed to read {} with error {}. Moving along..".format(mseed_file, e))
continue
# end try

for tr in st:
nc, sc, lc, cc, st, et = \
tr.stats.network, tr.stats.station, tr.stats.location, \
tr.stats.channel, tr.stats.starttime.timestamp, \
tr.stats.endtime.timestamp
tr.stats.channel, tr.stats.starttime.timestamp, \
tr.stats.endtime.timestamp

meta_list.append([self.offsets[self.rank] + i, nc, sc, lc, cc, st, et])
# skip bogus traces
if(nc == sc == lc == cc == ''): continue
self.meta_list.append([i, nc, sc, lc, cc, st, et])
# end for
#if (i > 0): break
# if (i > 0): break
# end for

meta_list = self.comm.gather(meta_list, root=0)
if (self.rank == 0):
meta_list = [item for ritem in meta_list for item in ritem] # flatten list of lists

print('\nCreating metadata index for {} traces..'.format(len(meta_list)))
print('\nCreating metadata index for {} mseed files..'.format(len(self.meta_list)))

for row in tqdm(meta_list):
idx, nc, sc, lc, cc, st, et = row
for row in tqdm(self.meta_list):
idx, nc, sc, lc, cc, st, et = row

if (type(self.tree[nc][sc][lc][cc]) != index.Index):
self.tree[nc][sc][lc][cc] = index.Index()
# end if
self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1))
if (type(self.tree[nc][sc][lc][cc]) != index.Index):
self.tree[nc][sc][lc][cc] = index.Index()
# end if
self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1))
# end for
# end if
# end func

def get_waveforms(self, net, sta, loc, cha, st: UTCDateTime, et: UTCDateTime):
assert self.rank == 0, 'This function is only accessible from Rank 0. Aborting..'
def __getstate__(self):
#print('pickling..')
return self.__dict__
# end func

def __setstate__(self, d):
#print('unpickling..')
self.__dict__ = d

# recreate tree, because serialization/deserialization across
# processes do not preserve hashed objects
self.tree = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
for row in self.meta_list:
idx, nc, sc, lc, cc, st, et = row

if (type(self.tree[nc][sc][lc][cc]) != index.Index):
self.tree[nc][sc][lc][cc] = index.Index()
# end if
self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1))
# end for
# end func

def flush_cache(self):
self.stream_cache.flush()
# end func

def get_waveforms(self, net, sta, loc, cha, st: UTCDateTime, et: UTCDateTime):
epsilon = 1e-5
st_ts = st.timestamp + epsilon
et_ts = et.timestamp - epsilon
Expand All @@ -102,55 +166,64 @@ def get_waveforms(self, net, sta, loc, cha, st: UTCDateTime, et: UTCDateTime):
try:
target_index = self.tree[net][sta][loc][cha]

if(type(target_index) == index.Index):
if (type(target_index) == index.Index):
file_indices = np.array(list(target_index.intersection((st_ts, 1, et_ts, 1))), dtype='i4')

# since file names are repeated for multiple traces, we need a unique set
for mfile in set(self.mseed_files[file_indices]):
result += read(mfile).select(network=net,
station=sta,
location=loc,
channel=cha).slice(st, et, nearest_sample=False).copy()
temp_stream = self.stream_cache.get(mfile).select(network=net,
station=sta,
location=loc,
channel=cha)
result += temp_stream.slice(st, et, nearest_sample=False).copy()
# end for
else:
print('empty index')
# end if
except Exception as e:
print(str(e))
print('error in mseedindex', str(e))
# end try

return result
# end func

def get_stations(self, st:UTCDateTime, et:UTCDateTime, net=None, sta=None, loc=None, cha=None):
assert self.rank == 0, 'This function is only accessible from Rank 0. Aborting..'

def get_stations(self, st: UTCDateTime, et: UTCDateTime, net=None, sta=None, loc=None, cha=None):
epsilon = 1e-5
st_ts = st.timestamp + epsilon
et_ts = et.timestamp - epsilon

_net = _sta = _loc = _cha = None

if(net==None): _net = self.tree.keys()
else: _net = [net]
if (net == None):
_net = self.tree.keys()
else:
_net = [net]

result = []
for nc in _net:
if (sta==None): _sta = self.tree[nc].keys()
else: _sta = [sta]
if (sta == None):
_sta = self.tree[nc].keys()
else:
_sta = [sta]

for sc in _sta:
if (loc==None): _loc = self.tree[nc][sc].keys()
else: _loc = [loc]
if (loc == None):
_loc = self.tree[nc][sc].keys()
else:
_loc = [loc]

for lc in _loc:
if (cha==None): _cha = self.tree[nc][sc][lc].keys()
else: _cha= [cha]
if (cha == None):
_cha = self.tree[nc][sc][lc].keys()
else:
_cha = [cha]

for cc in _cha:
target_index = self.tree[nc][sc][lc][cc]
if (type(target_index) == index.Index):
entries = list(target_index.intersection((st_ts, 1, et_ts, 1)))

if(len(entries)): result.append((nc, sc, lc, cc))
if (len(entries)): result.append((nc, sc, lc, cc))
# end if
# end for
# end for
Expand All @@ -161,7 +234,6 @@ def get_stations(self, st:UTCDateTime, et:UTCDateTime, net=None, sta=None, loc=N
# end func

def get_time_range(self, net, sta, loc, cha):
assert self.rank == 0, 'This function is only accessible from Rank 0. Aborting..'

target_index = self.tree[net][sta][loc][cha]

Expand All @@ -175,13 +247,11 @@ def get_time_range(self, net, sta, loc, cha):
# end func

if __name__=="__main__":
msi = MseedIndex('/g/data/ha3/ac5759/semi-perm-iris', '*.mseed')

if(msi.rank == 0):
print(msi.tree['AU'].keys())
print(msi.tree['AU']['AXCOZ'].keys())
r = msi.get_waveforms('AU', 'AXCOZ', '00', 'HHZ', UTCDateTime(2021, 1, 3, 22, 9, 26), UTCDateTime(2021, 1, 4, 0, 0))
print(r)
print(msi.get_time_range('AU', 'AXCOZ', '00', 'HHN'))
# end if
msi = MseedIndex('/g/data/ha3/ac5759/semi-perm-iris', '*AXCOZ*HHZ*j168.mseed')

print(msi.tree['AU'].keys())
print(msi.tree['AU']['AXCOZ'].keys())
r = msi.get_waveforms('AU', 'AXCOZ', '00', 'HHZ', UTCDateTime(2022, 1, 3, 22, 9, 26), UTCDateTime(2023, 1, 4, 0, 0))
print(r)
print(msi.get_time_range('AU', 'AXCOZ', '00', 'HHZ'))
# end if
Loading

0 comments on commit 321eab3

Please sign in to comment.