Skip to content

Commit

Permalink
Resolved Weird HDF5 Hangs in _FederatedASDFDataSet
Browse files Browse the repository at this point in the history
* Caught a stealthy memory leak, likely within Obspy Inventory, that was
  causing pyasdf Datasets to not close cleanly. They would instead hang
  at low level HDF5 calls to close a given file opened in parallel.
  The fix required calls to the garbage collector as inventories are
  aggregated within _FederatedASDFDataSetImpl

* Minor refactoring in utils.py
  • Loading branch information
geojunky committed May 7, 2024
1 parent c0356ff commit fc00359
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
11 changes: 8 additions & 3 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import sqlite3
import hashlib
from functools import partial
from seismic.ASDFdatabase.utils import MIN_DATE, MAX_DATE
from seismic.ASDFdatabase.utils import MIN_DATE, MAX_DATE, cleanse_inventory
from seismic.misc import split_list, setup_logger
import pickle as cPickle
import pandas as pd
from rtree import index
import traceback
import gc

def split_list_by_timespan(l, n):
lmin = np.min(l[:, 1])
Expand Down Expand Up @@ -355,7 +356,7 @@ def decode_tag(tag, type='raw_recording'):
wsta = set(list(ds.waveforms.list()))
msta = set(list(coords_dict.keys()))
if (len(wsta) != len(msta)):
missing = set(wsta) - set(msta)
missing = wsta - msta
print('WARNING: {} stations with missing metadata found in {}..'.\
format(len(missing), self.asdf_file_names[ids]))
# end if
Expand All @@ -365,7 +366,7 @@ def decode_tag(tag, type='raw_recording'):
masterinv = ds.waveforms[k].StationXML
else:
try:
masterinv += ds.waveforms[k].StationXML
masterinv += cleanse_inventory(ds.waveforms[k].StationXML)
except Exception as e:
print(e)
# end try
Expand All @@ -384,6 +385,10 @@ def decode_tag(tag, type='raw_recording'):
'(?, ?, ?, ?, ?, ?)', metadatalist)
self.conn.execute('insert into masterinv(inv) values(?)',
[cPickle.dumps(masterinv, cPickle.HIGHEST_PROTOCOL)])

# clean up memory bloat caused by aggregated inventory
del masterinv
gc.collect()
self.conn.commit()
self.conn.close()
# end if
Expand Down
6 changes: 3 additions & 3 deletions seismic/ASDFdatabase/asdf2salvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pyasdf
from mpi4py import MPI
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
from seismic.ASDFdatabase.utils import remove_comments
from seismic.ASDFdatabase.utils import cleanse_inventory
from seismic.misc import split_list, recursive_glob
import click
from tqdm import tqdm
Expand Down Expand Up @@ -265,7 +265,7 @@ def is_preferred_component(cha: str):
try:
oinv = inventory.select(network=net, station=sta,
location=loc, channel=cha)
oinv = remove_comments(oinv)
oinv = cleanse_inventory(oinv)
ods.add_stationxml(oinv)

ods.add_waveforms(stream, tag)
Expand Down Expand Up @@ -370,7 +370,7 @@ def is_preferred_component(cha: str):
station=tr.stats.station,
location=tr.stats.location,
channel=tr.stats.channel)
oinv = remove_comments(oinv)
oinv = cleanse_inventory(oinv)
ods.add_stationxml(oinv)

ods.add_waveforms(tr, tag)
Expand Down
4 changes: 2 additions & 2 deletions seismic/ASDFdatabase/cwb2asdf/cwb2asdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from tqdm import tqdm
from seismic.misc import split_list
from seismic.misc import recursive_glob
from seismic.ASDFdatabase.utils import remove_comments
from seismic.ASDFdatabase.utils import cleanse_inventory

def make_ASDF_tag(tr, tag):
# def make_ASDF_tag(ri, tag):
Expand Down Expand Up @@ -182,7 +182,7 @@ def _write(ds, ostream, inventory_dict, netsta_set):
ustationInv[ustation] = None
else:
# remove comments from inventory
ustationInv[ustation] = remove_comments(sinv)
ustationInv[ustation] = cleanse_inventory(sinv)
# end if
# end for
# end if
Expand Down
6 changes: 5 additions & 1 deletion seismic/ASDFdatabase/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
MAX_DATE = UTCDateTime(4102444800.0)
MIN_DATE = UTCDateTime(-2208988800.0)

def remove_comments(iinv: Inventory) -> Inventory:
def cleanse_inventory(iinv: Inventory) -> Inventory:
oinv = iinv.copy()

# drop networks with no meaningful data
oinv = Inventory(networks=[net for net in oinv.networks
if net.total_number_of_stations is not None])
for net in oinv.networks:
net.comments = []
for sta in net.stations:
Expand Down

0 comments on commit fc00359

Please sign in to comment.