Skip to content

Commit

Permalink
Miscellaneous Changes
Browse files Browse the repository at this point in the history
* Removed MPI-dependence from MseedIndex
* Updated response-factory to cater to requirements from QAQC App
  • Loading branch information
geojunky committed Jan 24, 2024
1 parent 8bf5240 commit ac8caa0
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 77 deletions.
7 changes: 2 additions & 5 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,8 @@ def get_waveforms(self, network, station, location, channel, starttime,
# end if
# end for

# Trim traces
for t in s:
t.trim(starttime=starttime,
endtime=endtime)
# end for
# Trim stream
s.trim(starttime=starttime, endtime=endtime, nearest_sample=True)

# apply corrections if available
if(self.corrections_enabled):
Expand Down
204 changes: 137 additions & 67 deletions seismic/ASDFdatabase/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import numpy as np
from rtree import index
from glob import glob
from mpi4py import MPI
from collections import defaultdict
from obspy import read
from obspy.core import Stream, Trace
Expand Down Expand Up @@ -31,69 +30,134 @@ def remove_comments(iinv: Inventory) -> Inventory:
# end func

class MseedIndex:
class StreamCache:
def __init__(self):
self.streams = defaultdict(list)
self.read_times = defaultdict(list)
# end func

def get(self, fn):
if (fn in self.streams.keys()):
# print('found stream..')
return self.streams[fn]
else:
# print('reading stream..')
result = self._add(fn)
self._cleanup()

return result
# end if
# end func

def flush(self):
self.streams = defaultdict(list)
self.read_times = defaultdict(list)
# end func

def _cleanup(self):
MAX_STREAMS = 5
# before = len(self.streams)
while (len(self.streams) > MAX_STREAMS):
time_key = sorted(self.read_times.keys())[0]
file_key = self.read_times[time_key]

self.read_times.pop(time_key)
self.streams.pop(file_key)
# wend
# after = len(self.streams)
# if(before > after): print('cleaned up {} streams..'.format(before-after))
# end func

def _add(self, fn):
try:
self.streams[fn] = read(fn)
self.read_times[UTCDateTime.now().timestamp] = fn
return self.streams[fn]
except Exception as e:
print("Failed to read {} with error {}. Moving along..".format(fn, e))
# end try
# end func
# end class

def __init__(self, mseed_folder, pattern):
self.mseed_folder = mseed_folder
self.comm = MPI.COMM_WORLD
self.nproc = self.comm.Get_size()
self.rank = self.comm.Get_rank()
self.tree = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))

work_load = None
offsets = None
if (self.rank == 0):
self.mseed_files = np.array(sorted(glob(os.path.join(self.mseed_folder, pattern))))

#self.mseed_files = self.mseed_files[:1000]

work_load = split_list(self.mseed_files, self.nproc)
counts = np.array([len(item) for item in work_load])
offsets = np.append(0, np.cumsum(counts[:-1]))
self.stream_cache = MseedIndex.StreamCache()
self.mseed_files = np.array(sorted(glob(os.path.join(self.mseed_folder, pattern))))

fc = len(self.mseed_files)
if(fc > 0):
print('Found {} files:'.format(fc))
print(os.path.basename(self.mseed_files[0]))
print('..')
print('..')
else:
raise RuntimeError('No mseed files found with pattern {}. Aborting..'.format(pattern))
# end if
self.local_mseed_files = self.comm.scatter(work_load)
self.offsets = self.comm.bcast(offsets, root=0)

print('Reading metadata from mseed files..')
meta_list = []
for i, mseed_file in enumerate(tqdm(self.local_mseed_files, desc='Rank {}'.format(self.rank))):
self.meta_list = []
for i, mseed_file in enumerate(tqdm(self.mseed_files)):
st = None
try:
st = read(mseed_file, headonly=True)
except:
print("Failed to read {}. Moving along..". format(mseed_file))
except Exception as e:
print("Failed to read {} with error {}. Moving along..".format(mseed_file, e))
continue
# end try

for tr in st:
nc, sc, lc, cc, st, et = \
tr.stats.network, tr.stats.station, tr.stats.location, \
tr.stats.channel, tr.stats.starttime.timestamp, \
tr.stats.endtime.timestamp
tr.stats.channel, tr.stats.starttime.timestamp, \
tr.stats.endtime.timestamp

meta_list.append([self.offsets[self.rank] + i, nc, sc, lc, cc, st, et])
# skip bogus traces
if(nc == sc == lc == cc == ''): continue
self.meta_list.append([i, nc, sc, lc, cc, st, et])
# end for
#if (i > 0): break
# if (i > 0): break
# end for

meta_list = self.comm.gather(meta_list, root=0)
if (self.rank == 0):
meta_list = [item for ritem in meta_list for item in ritem] # flatten list of lists

print('\nCreating metadata index for {} traces..'.format(len(meta_list)))
print('\nCreating metadata index for {} traces..'.format(len(self.meta_list)))

for row in tqdm(meta_list):
idx, nc, sc, lc, cc, st, et = row
for row in tqdm(self.meta_list):
idx, nc, sc, lc, cc, st, et = row

if (type(self.tree[nc][sc][lc][cc]) != index.Index):
self.tree[nc][sc][lc][cc] = index.Index()
# end if
self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1))
if (type(self.tree[nc][sc][lc][cc]) != index.Index):
self.tree[nc][sc][lc][cc] = index.Index()
# end if
self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1))
# end for
# end if
# end func

def get_waveforms(self, net, sta, loc, cha, st: UTCDateTime, et: UTCDateTime):
assert self.rank == 0, 'This function is only accessible from Rank 0. Aborting..'
def __getstate__(self):
#print('pickling..')
return self.__dict__
# end func

def __setstate__(self, d):
#print('unpickling..')
self.__dict__ = d

# recreate tree, because serialization/deserialization across
# processes do not preserve hashed objects
self.tree = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
for row in self.meta_list:
idx, nc, sc, lc, cc, st, et = row

if (type(self.tree[nc][sc][lc][cc]) != index.Index):
self.tree[nc][sc][lc][cc] = index.Index()
# end if
self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1))
# end for
# end func

def flush_cache(self):
self.stream_cache.flush()
# end func

def get_waveforms(self, net, sta, loc, cha, st: UTCDateTime, et: UTCDateTime):
epsilon = 1e-5
st_ts = st.timestamp + epsilon
et_ts = et.timestamp - epsilon
Expand All @@ -102,55 +166,64 @@ def get_waveforms(self, net, sta, loc, cha, st: UTCDateTime, et: UTCDateTime):
try:
target_index = self.tree[net][sta][loc][cha]

if(type(target_index) == index.Index):
if (type(target_index) == index.Index):
file_indices = np.array(list(target_index.intersection((st_ts, 1, et_ts, 1))), dtype='i4')

# since file names are repeated for multiple traces, we need a unique set
for mfile in set(self.mseed_files[file_indices]):
result += read(mfile).select(network=net,
station=sta,
location=loc,
channel=cha).slice(st, et, nearest_sample=False).copy()
temp_stream = self.stream_cache.get(mfile).select(network=net,
station=sta,
location=loc,
channel=cha)
result += temp_stream.slice(st, et, nearest_sample=False).copy()
# end for
else:
print('empty index')
# end if
except Exception as e:
print(str(e))
print('error in mseedindex', str(e))
# end try

return result
# end func

def get_stations(self, st:UTCDateTime, et:UTCDateTime, net=None, sta=None, loc=None, cha=None):
assert self.rank == 0, 'This function is only accessible from Rank 0. Aborting..'

def get_stations(self, st: UTCDateTime, et: UTCDateTime, net=None, sta=None, loc=None, cha=None):
epsilon = 1e-5
st_ts = st.timestamp + epsilon
et_ts = et.timestamp - epsilon

_net = _sta = _loc = _cha = None

if(net==None): _net = self.tree.keys()
else: _net = [net]
if (net == None):
_net = self.tree.keys()
else:
_net = [net]

result = []
for nc in _net:
if (sta==None): _sta = self.tree[nc].keys()
else: _sta = [sta]
if (sta == None):
_sta = self.tree[nc].keys()
else:
_sta = [sta]

for sc in _sta:
if (loc==None): _loc = self.tree[nc][sc].keys()
else: _loc = [loc]
if (loc == None):
_loc = self.tree[nc][sc].keys()
else:
_loc = [loc]

for lc in _loc:
if (cha==None): _cha = self.tree[nc][sc][lc].keys()
else: _cha= [cha]
if (cha == None):
_cha = self.tree[nc][sc][lc].keys()
else:
_cha = [cha]

for cc in _cha:
target_index = self.tree[nc][sc][lc][cc]
if (type(target_index) == index.Index):
entries = list(target_index.intersection((st_ts, 1, et_ts, 1)))

if(len(entries)): result.append((nc, sc, lc, cc))
if (len(entries)): result.append((nc, sc, lc, cc))
# end if
# end for
# end for
Expand All @@ -161,7 +234,6 @@ def get_stations(self, st:UTCDateTime, et:UTCDateTime, net=None, sta=None, loc=N
# end func

def get_time_range(self, net, sta, loc, cha):
assert self.rank == 0, 'This function is only accessible from Rank 0. Aborting..'

target_index = self.tree[net][sta][loc][cha]

Expand All @@ -175,13 +247,11 @@ def get_time_range(self, net, sta, loc, cha):
# end func

if __name__=="__main__":
msi = MseedIndex('/g/data/ha3/ac5759/semi-perm-iris', '*.mseed')

if(msi.rank == 0):
print(msi.tree['AU'].keys())
print(msi.tree['AU']['AXCOZ'].keys())
r = msi.get_waveforms('AU', 'AXCOZ', '00', 'HHZ', UTCDateTime(2021, 1, 3, 22, 9, 26), UTCDateTime(2021, 1, 4, 0, 0))
print(r)
print(msi.get_time_range('AU', 'AXCOZ', '00', 'HHN'))
# end if
msi = MseedIndex('/g/data/ha3/ac5759/semi-perm-iris', '*AXCOZ*HHZ*j168.mseed')

print(msi.tree['AU'].keys())
print(msi.tree['AU']['AXCOZ'].keys())
r = msi.get_waveforms('AU', 'AXCOZ', '00', 'HHZ', UTCDateTime(2022, 1, 3, 22, 9, 26), UTCDateTime(2023, 1, 4, 0, 0))
print(r)
print(msi.get_time_range('AU', 'AXCOZ', '00', 'HHZ'))
# end if
10 changes: 5 additions & 5 deletions seismic/inventory/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, source_inventory):
:type source_inventory: obspy.core.inventory.inventory.Inventory
"""
self.m_inventory = source_inventory
self.m_response = None
self._get_response_from_inventory()
#end func

Expand All @@ -81,15 +82,14 @@ def _get_response_from_inventory(self):
# end if
# end if

if (found < 3):
if (found < 1):
msg = 'Network, station or channel information missing in RESP file.'
raise RuntimeError(msg)
else:
seedid = self.m_inventory.get_contents()['channels'][0]
self.m_response = self.m_inventory.get_response(seedid, c.start_date)
# end if

seed_id = '%s.%s..%s' % (n.code, s.code, c.code)
self.m_response = self.m_inventory.get_response(seed_id, s.start_date)
#end func

# end class

class ResponseFromStationXML(ResponseFromInventory):
Expand Down

0 comments on commit ac8caa0

Please sign in to comment.