diff --git a/seismic/ASDFdatabase/utils.py b/seismic/ASDFdatabase/utils.py index 0565150c..16a74cd1 100644 --- a/seismic/ASDFdatabase/utils.py +++ b/seismic/ASDFdatabase/utils.py @@ -11,8 +11,8 @@ from seismic.misc import split_list from obspy import Inventory -MAX_DATE = UTCDateTime(4102444800.0) -MIN_DATE = UTCDateTime(-2208988800.0) +MAX_DATE = UTCDateTime(4102444800.0) #2100-01-01 +MIN_DATE = UTCDateTime(-2208988800.0) #1900-01-01 def cleanse_inventory(iinv: Inventory) -> Inventory: oinv = iinv.copy() diff --git a/seismic/extract_event_traces.py b/seismic/extract_event_traces.py index a50cff0a..9e596fc6 100644 --- a/seismic/extract_event_traces.py +++ b/seismic/extract_event_traces.py @@ -447,12 +447,15 @@ def extract_data(catalog, inventory, waveform_getter, event_trace_datafile, nsl_dict[cproc][k] = v cproc = (cproc + 1)%nproc # end for + + log.info('Processing {} events..'.format(descs[wave])) # end if nsl_dict = comm.scatter(nsl_dict, root=0) for nsl, cha in nsl_dict.items(): if(cha == '-1'): + if(dry_run): continue # Nothing to do for made up entries, which exist for the sole purpose of balancing # MPI-barrier calls across all processors for irank in np.arange(nproc): @@ -475,13 +478,14 @@ def extract_data(catalog, inventory, waveform_getter, event_trace_datafile, stream_count = 0 sta_stream = Stream() - pbar=tqdm(desc=descs[wave], total=len(catalog) * len(curr_inv)) + status = defaultdict(int) for s in safe_iter_event_data(catalog, curr_inv, waveform_getter, use_rfstats=rfstats_map[wave], phase=phase_map[wave], - tt_model=tt_model, pbar=None,#pbar, + tt_model=tt_model, pbar=None, request_window=request_window, pad=pad, + status=status, dist_range=distance_range): # Write traces to output file in append mode so that arbitrarily large file # can be processed. If the file already exists, then existing streams will @@ -529,10 +533,8 @@ def extract_data(catalog, inventory, waveform_getter, event_trace_datafile, sta_stream += out_stream stream_count += 1 - pbar.set_description("[{}] {} | {}".format(descs[wave], grp_id, event_time)) - pbar.update() + log.info("[{}] {} | {}".format(descs[wave], grp_id, event_time)) # end for - pbar.close() for irank in np.arange(nproc): if(irank == rank): @@ -552,10 +554,24 @@ def extract_data(catalog, inventory, waveform_getter, event_trace_datafile, comm.Barrier() # end for + summary_str = \ + """ + Station: {} + Recording time: {} - {} + Matching events: {} + Events with no data: {} + Discarded event data: {} + {} streams written: {} + """.format(nsl, curr_inv.networks[0].start_date, curr_inv.networks[0].end_date, + status['events_processed'], status['no_data'], + status['data_discarded'], descs[wave], stream_count) + warn_str = \ + """ + No {} traces found for {}! Added a null trace. + """.format(descs[wave], nsl) + log.info(summary_str) if stream_count == 0: - log.warning("{}: No {} traces found! Added a null trace.".format(nsl, descs[wave])) - else: - log.info("{}: Wrote {} {} streams to output file".format(nsl, stream_count, descs[wave])) + log.warning(warn_str) # end if # end if # end for diff --git a/seismic/stream_io.py b/seismic/stream_io.py index 4af72280..219604c2 100644 --- a/seismic/stream_io.py +++ b/seismic/stream_io.py @@ -20,7 +20,7 @@ from rf.util import _get_stations from obspy.geodetics import gps2dist_azimuth from obspy.geodetics import kilometers2degrees - +from collections import defaultdict # pylint: disable=invalid-name @@ -35,7 +35,8 @@ ) def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, phase='P', - request_window=None, pad=10, pbar=None, **kwargs): + request_window=None, pad=10, pbar=None, + status:defaultdict(int)=None, **kwargs): """ Return iterator yielding three component streams per station and event. @@ -50,6 +51,9 @@ def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, pha :param float pad: add specified time in seconds to request window and trim afterwards again :param pbar: tqdm_ instance for displaying a progressbar + :param status: a dictionary containing running statistics, updated every + iteration, for keys: ['events_processed', 'no_data', + 'data_discarded'] :param kwargs: all other kwargs are passed to `~rf.rfstream.rfstats()` :return: three component streams with raw data @@ -71,6 +75,10 @@ def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, pha stations = _get_stations(inventory) if pbar is not None: pbar.total = len(events) * len(stations) + + events_processed = 0 + no_data = 0 + data_discarded = 0 for event, seedid in itertools.product(events, stations): if pbar is not None: pbar.update(1) @@ -108,6 +116,7 @@ def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, pha # end if # end if + events_processed += 1 net, sta, loc, cha = seedid.split('.') if(use_rfstats): @@ -126,6 +135,7 @@ def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, pha stream.trim(starttime, endtime) stream.merge() except Exception: # no data available + no_data += 1 continue if len(stream) != 3: @@ -133,6 +143,7 @@ def safe_iter_event_data(events, inventory, get_waveforms, use_rfstats=True, pha warn('Need 3 component seismograms. %d components ' 'detected for event %s, station %s.' % (len(stream), event.resource_id, seedid)) + no_data += 1 continue # end if @@ -154,6 +165,7 @@ def has_masked_values(data_stream): from warnings import warn warn('Gaps or overlaps detected for event %s, station %s.' % (event.resource_id, seedid)) + data_discarded += 1 continue else: for tr in stream: tr.data = np.array(tr.data) @@ -164,6 +176,12 @@ def has_masked_values(data_stream): tr.stats.update(stats) # end for + if(status is not None): + status['events_processed'] = events_processed + status['no_data'] = no_data + status['data_discarded'] = data_discarded + # end if + yield RFStream(stream) # end func