diff --git a/seismic/ASDFdatabase/analytics/waveform_analytics.py b/seismic/ASDFdatabase/analytics/wa.py similarity index 80% rename from seismic/ASDFdatabase/analytics/waveform_analytics.py rename to seismic/ASDFdatabase/analytics/wa.py index e1f08415..749c8c4a 100644 --- a/seismic/ASDFdatabase/analytics/waveform_analytics.py +++ b/seismic/ASDFdatabase/analytics/wa.py @@ -1,7 +1,7 @@ #!/bin/env python """ Description: - Generate QAQC report on raw data in mseed or asdf format + Generates a waveform-analytics report on raw data in mseed or asdf format References: @@ -55,6 +55,9 @@ import multiprocess from multiprocess import Manager, freeze_support from seismic.ASDFdatabase.analytics.sun import day_night_coverage, DAY_NIGHT_SECONDS +import uuid +from functools import partial +from glob import glob if(is_windows | is_osx): matplotlib.use('TKAgg') else: matplotlib.use('Agg') @@ -100,45 +103,27 @@ def __init__(self, get_time_range_func: Callable[[str, str, str, str], tuple], get_waveforms_func: Callable[[str, str, str, str, UTCDateTime, UTCDateTime], Stream], - prog_tracker: ProgressTracker, - network: str, - station: str, - location: str, - channel: str, - sampling_rate: int, response: Response, output_folder, + prog_tracker: ProgressTracker, start_time: UTCDateTime = None, end_time: UTCDateTime = None, nproc=1): self.get_time_range_func = get_time_range_func self.get_waveforms_func = get_waveforms_func + self.response = response + self.output_folder = output_folder self.progress_tracker = prog_tracker - self.network = network - self.station = station - self.location = location - self.channel = channel - self.sampling_rate = sampling_rate self.start_time = start_time self.end_time = end_time - self.response = response - self.output_folder = output_folder self.nproc = nproc self.ppsd_list = None - self.resp_amplitudes = None - self.NFFT = 2 ** 16 - self.num_freqs = self.NFFT // 2 + 1 - self.freqs = np.fft.fftfreq(self.NFFT, 1 / self.sampling_rate)[:self.num_freqs] - self.freqs = self.freqs[1:][::-1] - self.freqs[0] *= -1 - self.periods = 1. / self.freqs - self.sparse_periods = None - self.nm_periods, self.lnm = get_nlnm() - _, self.hnm = get_nhnm() self.AMP_DB_MIN = -200 self.AMP_DB_MAX = -50 self.PER_MIN = 1e-2 + self.channel_min_st = defaultdict(lambda:UTCDateTime('2200-01-01')) + self.channel_max_et = defaultdict(lambda:UTCDateTime(-1)) # Red, Orange, Yellow, Green self.cmap = LinearSegmentedColormap.from_list('royg', [(1, 0, 0), @@ -147,33 +132,52 @@ def __init__(self, (0, 1, 0)], N=5) self.cmap_norm = matplotlib.colors.Normalize(vmin=0, vmax=100) + # fetch noise-models + self.master_nm_periods, self.master_lnm = get_nlnm() + _, self.master_hnm = get_nhnm() # flip noise-model values and restrict them to data range - self.nm_periods = self.nm_periods[::-1] - self.lnm = self.lnm[::-1] - self.hnm = self.hnm[::-1] + self.master_nm_periods = self.master_nm_periods[::-1] + self.master_lnm = self.master_lnm[::-1] + self.master_hnm = self.master_hnm[::-1] - hclip = self.nm_periods <= self.periods[-1] - self.nm_periods = self.nm_periods[hclip] - self.lnm = self.lnm[hclip] - self.hnm = self.hnm[hclip] - self.st_list = None - self.et_list = None + # create temp-folder + self.temp_folder = os.path.join(self.output_folder, str(uuid.uuid4())) + os.makedirs(self.temp_folder, exist_ok=True) + # end func + + def analyse_data(self, + network: str, + station: str, + location: str, + channel: str, + sampling_rate: int): + self.resp_amplitudes = None + self.NFFT = 2 ** 16 + self.num_freqs = self.NFFT // 2 + 1 + self.freqs = np.fft.fftfreq(self.NFFT, 1 / sampling_rate)[:self.num_freqs] + self.freqs = self.freqs[1:][::-1] + self.freqs[0] *= -1 + self.periods = 1. / self.freqs + self.sparse_periods = None # setup period binnings self._setup_period_bins() # create array for response amplitudes - resp_amplitudes, _ = self.response.get_evalresp_response(t_samp=1 / self.sampling_rate, + resp_amplitudes, _ = self.response.get_evalresp_response(t_samp=1 / sampling_rate, nfft=self.NFFT, output="VEL") resp_amplitudes = resp_amplitudes[1:] resp_amplitudes = resp_amplitudes[::-1] self.resp_amplitudes = np.absolute(resp_amplitudes * np.conjugate(resp_amplitudes)) # collate timespans to be allocated to each parallel process - st, et = self.get_time_range_func(self.network, - self.station, - self.location, - self.channel) + st, et = self.get_time_range_func(network, + station, + location, + channel) + if(st < self.channel_min_st[channel]): self.channel_min_st[channel] = st + if(et > self.channel_max_et[channel]): self.channel_max_et[channel] = et + if (self.start_time and self.start_time > st): st = self.start_time if (self.end_time and self.end_time < et): et = self.end_time @@ -192,11 +196,12 @@ def __init__(self, # launch parallel computations if (1): p = Pool(ncpus=self.nproc) - p.map(self._generate_psds, proc_st_list, proc_et_list) + p.map(partial(self._generate_psds, network, station, location, channel, sampling_rate), + proc_st_list, proc_et_list) else: - self._generate_psds(proc_st_list[0], proc_et_list[0]) + self._generate_psds(network, station, location, channel, sampling_rate, + self.st_list, self.et_list) # end if - # end func def _setup_period_bins(self): @@ -229,6 +234,12 @@ def _setup_period_bins(self): # initialize periods used for subsampling raw PSD periods self.sparse_periods = np.array(per_octaves_center) + # restrict nm to current set of periods + hclip = self.master_nm_periods <= self.periods[-1] + self.nm_periods = self.master_nm_periods[hclip] + self.lnm = self.master_lnm[hclip] + self.hnm = self.master_hnm[hclip] + # initialize nearest-neighbour indices for mapping: # i) raw PSD periods to sparse-periods # ii) noise-model periods to sparse-periods @@ -241,7 +252,8 @@ def _setup_period_bins(self): (self.sparse_periods <= self.nm_periods[-1]))[0] # end func - def _generate_psds(self, start_time_list, end_time_list): + def _generate_psds(self, network, station, location, channel, sampling_rate, + start_time_list, end_time_list): def fft_taper(data): """ Cosine taper, 10 percent at each end (like done by [McNamara2004]_). @@ -255,10 +267,10 @@ def fft_taper(data): for start_time, end_time in zip(start_time_list, end_time_list): # print(start_time, end_time) - stream = self.get_waveforms_func(self.network, - self.station, - self.location, - self.channel, + stream = self.get_waveforms_func(network, + station, + location, + channel, start_time, end_time) @@ -268,9 +280,9 @@ def fft_taper(data): continue else: sr = stream[0].stats.sampling_rate - if (sr != self.sampling_rate): + if (sr != sampling_rate): print('Warning: discrepant sampling rate found. Expected {}, but found {} ' - 'in trace {} ({} - {}). Moving along..'.format(self.sampling_rate, sr, + 'in trace {} ({} - {}). Moving along..'.format(sampling_rate, sr, stream[0].get_id(), stream[0].stats.starttime, stream[0].stats.endtime)) @@ -286,7 +298,7 @@ def fft_taper(data): for i in np.arange(stream_len): samples_processed += len(stream[i].data) _spec, _ = mlab.psd(stream[i].data.astype('float32'), NFFT=self.NFFT, - Fs=self.sampling_rate, + Fs=sampling_rate, detrend=mlab.detrend_linear, window=fft_taper, noverlap=0, sides='onesided', scale_by_freq=True) @@ -320,7 +332,7 @@ def fft_taper(data): spec = 10. * np.log10(spec) # compute coverage - coverage_fraction = np.min([samples_processed / self.sampling_rate / (end_time - start_time), 1.]) + coverage_fraction = np.min([samples_processed / sampling_rate / (end_time - start_time), 1.]) # compare spec to low- and high-noise-model sparse_spec = spec[self.dense_to_sparse_indices] @@ -339,15 +351,13 @@ def fft_taper(data): ############################################ # Plot results and write output npz ############################################ - output_fn_stem = '{}.{}.{}.{}.{}.{}'.format(self.network, - self.station, - self.location, - self.channel, - start_time, - end_time) - if (is_windows): output_fn_stem = output_fn_stem.replace(':', '__') - output_fn_png = os.path.join(self.output_folder, output_fn_stem + '.png') - output_fn_npz = os.path.join(self.output_folder, output_fn_stem + '.npz') + nslc = '.'.join((network, station, location, channel)) + output_fn_stem = '{}.{}.{}'.format(nslc, + start_time, + end_time) + output_fn_stem = output_fn_stem.replace(':', '__') + output_fn_png = os.path.join(self.temp_folder, output_fn_stem + '.png') + output_fn_npz = os.path.join(self.temp_folder, output_fn_stem + '.npz') # save to npz np.savez(output_fn_npz, bounds=[start_time.timestamp, end_time.timestamp], @@ -385,6 +395,9 @@ def fft_taper(data): (1 - deviation_fraction) * 100), fontsize=4, ha='left') + ax.text(0.015, -85, + nslc, + fontsize=4, ha='left') fig.suptitle(start_time.strftime("%Y-%m-%d"), fontsize=7, y=0.85) @@ -397,15 +410,15 @@ def fft_taper(data): self.progress_tracker.increment() cv, mv = self.progress_tracker.now() - print('Processing data: [{}/{} days] {:2.1f}%'.format(cv, mv, cv / mv * 100), end='\r') + print('Processing data under {}: [{}/{} days] {:2.1f}%'.format(nslc, cv, mv, + cv / mv * 100), end='\r') sys.stdout.flush() # end for return None # end func - def process_results(self, output_fn): - png_dict = defaultdict(list) + def process_results(self, channel, output_fn): mean_spec = None mean_deviation = None total_coverage = None @@ -419,24 +432,28 @@ def process_results(self, output_fn): night_coverage_fractions = [] # load png files and results per_ids = np.arange(nper) - for start_time, end_time in tqdm(zip(self.st_list, self.et_list), - desc='Loading results: '): - output_fn_stem = '{}.{}.{}.{}.{}.{}'.format(self.network, - self.station, - self.location, - self.channel, - start_time, - end_time) - if (is_windows): output_fn_stem = output_fn_stem.replace(':', '__') - output_fn_png = os.path.join(self.output_folder, output_fn_stem + '.png') - output_fn_npz = os.path.join(self.output_folder, output_fn_stem + '.npz') - - if (os.path.exists(output_fn_png)): - png_dict[start_time.timestamp] = plt.imread(output_fn_png) + + npz_files = glob(os.path.join(self.temp_folder, '*.npz')) + npz_files_dict = defaultdict(list) + png_dict = defaultdict(list) + for npz_file in npz_files: + toks = os.path.basename(npz_file).split('.') + net, sta, loc, cha, start_time = toks[:5] + + if(cha != channel): continue # process only required channel + start_time = UTCDateTime(start_time).timestamp + npz_files_dict[start_time].append(npz_file) + + png_file = os.path.splitext(npz_file)[0] + '.png' + if (os.path.exists(png_file)): + png_dict[start_time] = plt.imread(png_file) # end if + # end for - if (os.path.exists(output_fn_npz)): - results = np.load(output_fn_npz) + for start_time in tqdm(sorted(npz_files_dict.keys()), + desc='Loading results: '): + for npz_file in npz_files_dict[start_time]: + results = np.load(npz_file) spec_count += 1 amp_ids = np.digitize(results['sparse_spec'], amp_bins) @@ -456,8 +473,9 @@ def process_results(self, output_fn): days.append(results['bounds'][0]) day_coverage_fractions.append(results['day_coverage_fraction']) night_coverage_fractions.append(results['night_coverage_fraction']) - # end if + # end for # end for + days = np.array(days) day_coverage_fractions = np.array(day_coverage_fractions) night_coverage_fractions = np.array(night_coverage_fractions) @@ -497,10 +515,8 @@ def process_results(self, output_fn): ax1.get_xaxis().set_visible(False) ax1.get_yaxis().set_visible(False) # Set report heading and overall assessment - title = 'Analytics Report for {}.{}.{}.{}'.format(self.network, - self.station, - self.location, - self.channel) + title = 'Analytics Report for {}'.format(channel) + ax1.text(0.5, 0.7, title, ha='center', va='center', fontsize=18) ax1.text(0.5, 0.6, 'Report generated on: {} (UTC)'. \ format(UTCDateTime.now().strftime("%Y-%m-%dT%H:%M:%S")), @@ -516,10 +532,11 @@ def process_results(self, output_fn): ((1 - mean_deviation) * 100)), fontsize=12, ha='left', c='k') ax1.text(0.015, 0.25, - 'Recording Interval: {} - {} ({:.2f} DAYS)'.format(self.st_list[0].strftime("%Y-%m-%d"), - self.et_list[-1].strftime("%Y-%m-%d"), - (self.et_list[-1] - self.st_list[ - 0]) / DAY_NIGHT_SECONDS), + 'Recording Interval: {} - {} ({:.2f} DAYS)'.format(\ + UTCDateTime(self.channel_min_st[channel]).strftime("%Y-%m-%d"), + UTCDateTime(self.channel_max_et[channel]).strftime("%Y-%m-%d"), + (self.channel_max_et[channel] - self.channel_min_st[channel]) / + DAY_NIGHT_SECONDS), fontsize=12, ha='left', c='k') ax1.text(0.015, 0.15, """Data Processed: {:.2f} DAYS""".format(total_coverage), @@ -710,18 +727,50 @@ def get_cpu_count(nproc): return result # end func -def select_channel(meta_list): - answer_list = [str(i + 1) for i in np.arange(len(meta_list))] - answer = None - print('\n############################') - print('# Multiple channels found: #') - print('############################\n') - for i in np.arange(len(meta_list)): print('{}) {}'.format(answer_list[i], - '.'.join(meta_list[i][:4]))) - while (answer not in answer_list): - answer = input('Please select desired channel: ') - # wend - return meta_list[int(answer) - 1][:4] +def select_channel(mi:MseedIndex, sd:UTCDateTime, ed:UTCDateTime)->list([]): + meta_list = mi.get_stations(sd, ed) + cov_dict = mi.get_channel_coverages() + channel_meta_dict = defaultdict(list) + channel_cov_dict = defaultdict(float) + + for meta in meta_list: + cha = meta[3] + nslc = '.'.join(meta[:4]) + channel_meta_dict[cha].append(meta) + channel_cov_dict[cha] += cov_dict[nslc] + # end for + + if(len(meta_list) == 0): + raise RuntimeError('No stations found between {} -- {}. Aborting..'.format(sd, ed)) + elif(len(meta_list) == 1): + return channel_meta_dict[list(channel_meta_dict.keys())[0]] + else: + answer_list = [str(i + 1) for i in np.arange(len(channel_meta_dict))] + answer = None + print('\n############################') + print('# Multiple channels found: #') + print('############################\n') + sorted_channels = sorted(channel_meta_dict.keys()) + for i, cha in enumerate(sorted_channels): + print('{}) {} ({:2.3f} days)'.format(answer_list[i], + cha, channel_cov_dict[cha]/DAY_NIGHT_SECONDS)) + # end for + + while (answer not in answer_list): + answer = input('Please select desired channel: ') + # wend + + channel_meta_list = channel_meta_dict[sorted_channels[int(answer)-1]] + + print('\nData found under the following NSLC list will be analysed:\n') + for channel_meta in channel_meta_list: + nslc = '.'.join(channel_meta) + print('{} ({:2.3f} days)'.format(nslc, cov_dict[nslc]/DAY_NIGHT_SECONDS)) + # end for + print('\n') + + return channel_meta_list + # end if # end func @click.command(name='mseed', context_settings=CONTEXT_SETTINGS) @@ -731,26 +780,21 @@ def select_channel(meta_list): type=str) @click.argument('instrument-response', required=True, type=click.Path(exists=True)) -@click.argument('sampling-rate', required=True, - type=int) @click.argument('output-folder', required=True, - type=click.Path(exists=True)) + type=click.Path(exists=True, file_okay=False, dir_okay=True)) @click.option('--start-date', type=str, default=None, show_default=True, help="Start date in UTC format for processing data") @click.option('--end-date', type=str, default=None, show_default=True, help="End date in UTC format for processing data") @click.option('--nproc', type=int, default=-1, show_default=True, - help="Number of parallel processes use. Default is to use all available cores.") + help="Number of parallel processes to use. Default is to use all available cores.") def process_mseed(mseed_folder, mseed_pattern, instrument_response, - sampling_rate, output_folder, start_date, end_date, - nproc): + output_folder, start_date, end_date, nproc): """ MSEED_FOLDER: Path to folder containing mseed files\n MSEED_PATTERN: File pattern to be used to capture files pertaining to specific channels. Note that pattern must be specified within quotes. \n - INSTRUMENT_RESPONSE: Path to inventory containing instrument response in - StationXML or .resp format\n - SAMPLING_RATE: Sampling rate used to record the mssed files + INSTRUMENT_RESPONSE: Path to instrument response in .resp format\n OUTPUT_FOLDER: Path to output folder\n """ try: @@ -761,52 +805,63 @@ def process_mseed(mseed_folder, mseed_pattern, instrument_response, raise RuntimeError('Invalid start- or end-dates') # end try + if(start_date and end_date and ((end_date.date - start_date.date).days<=0)): + raise RuntimeError('Invalid start- and end-dates. Aborting..') + # end if + # instantiate MseedIndex print('Inspecting mseed files in {}..'.format(mseed_folder)) mseed_index = MseedIndex(mseed_folder, mseed_pattern) + # define bound getter methods + def get_waveforms_func(net, sta, loc, cha, st, et): + return mseed_index.get_waveforms(net, sta, loc, cha, st, et) + # end func + + def get_time_range_func(net, sta, loc, cha): + return mseed_index.get_time_range(net, sta, loc, cha) + # end func + sd = MIN_DATE if start_date is None else start_date ed = MAX_DATE if end_date is None else end_date - meta_list = mseed_index.get_stations(sd, ed) - meta = None - if(len(meta_list) == 0): - raise RuntimeError('No stations found between {} -- {}. Aborting..'.format(sd, ed)) - elif(len(meta_list) > 1): - meta = select_channel(meta_list) - else: - meta = meta_list[0] - # end if - net, sta, loc, cha = meta + + nproc = get_cpu_count(nproc) print('Loading response..') - resp = get_response(instrument_response, net, sta, loc, cha) - if(resp is not None): + resp = get_response(instrument_response) + if(resp is not None): print('Found response: {}'.format(resp)) sys.stdout.flush() - else: + else: raise(RuntimeError('No instrument response found. Aborting..')) # end if + # Recordings often feature garbled network, station and location codes. + # We only consider unique channel codes as meaningful and as such, get a list of + # net, sta, loc, cha combinations that share the same channel code. We also + # account for the possibility of each combination of net, sta, loc, cha to have + # recordings under different sampling rates + meta_list = select_channel(mseed_index, sd, ed) + sr_dict = mseed_index.get_channel_sampling_rates() + # instantiate progress tracker manager = Manager() prog_tracker = ProgressTracker(manager) + sa = StationAnalytics(get_time_range_func, get_waveforms_func, + resp, output_folder, prog_tracker, sd, ed, nproc) + cha = None + for meta in meta_list: + net, sta, loc, cha = meta - def get_waveforms_func(net, sta, loc, cha, st, et): - return mseed_index.get_waveforms(net, sta, loc, cha, st, et) - # end func + nslc = '.'.join(meta) + sampling_rate = sr_dict[nslc] - def get_time_range_func(net, sta, loc, cha): - return mseed_index.get_time_range(net, sta, loc, cha) - # end func + sa.analyse_data(net, sta, loc, cha, sampling_rate) + # end for - nproc = get_cpu_count(nproc) - sa = StationAnalytics(get_time_range_func, get_waveforms_func, - prog_tracker, net, sta, loc, cha, sampling_rate, resp, - output_folder, sd, ed, nproc) + if(cha is not None): sa.process_results(cha, '') - report_fn = os.path.join(output_folder, '.'.join(meta) + '.pdf') - sa.process_results(report_fn) - print('Done..') + print('\nDone..') # end func @click.command(name='asdf', context_settings=CONTEXT_SETTINGS) @@ -853,6 +908,10 @@ def process_asdf(asdf_source, network, station, location, channel, instrument_re raise RuntimeError('Invalid start- or end-dates') # end try + if(start_date and end_date and ((end_date.date - start_date.date).days<=0)): + raise RuntimeError('Invalid start- and end-dates. Aborting..') + # end if + # instantiate FederatedASDFDataSet fds = FederatedASDFDataSet(asdf_source) diff --git a/seismic/ASDFdatabase/utils.py b/seismic/ASDFdatabase/utils.py index fc231533..6ed75519 100644 --- a/seismic/ASDFdatabase/utils.py +++ b/seismic/ASDFdatabase/utils.py @@ -186,8 +186,8 @@ def __init__(self, mseed_folder, pattern): self.tree = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list)))) self.stream_cache = MseedIndex.StreamCache() self.mseed_files = np.array(sorted(glob(os.path.join(self.mseed_folder, pattern)))) - self.sr_coverage_dict = defaultdict(float) # dict keyed by unique sampling rates found, with coverage as values - self.ch_coverage_dict = defaultdict(float) # dict keyed by unique channels found, with coverage as values + self.sr_dict = defaultdict(list) # sampling rates for each nslc combination + self.coverage_dict = defaultdict(float) # dict keyed by nslc, with coverage as values fc = len(self.mseed_files) if(fc > 0): @@ -220,14 +220,28 @@ def __init__(self, mseed_folder, pattern): if(nc == sc == lc == cc == ''): continue self.meta_list.append([i, nc, sc, lc, cc, st, et]) + nslc = '.'.join((nc, sc, lc, cc)) # store coverage for unique channels and sampling rates cov = float(et - st) # coverage in seconds - self.sr_coverage_dict[sr] += cov - self.ch_coverage_dict[cc] += cov + self.sr_dict[nslc].append(sr) + self.coverage_dict[nslc] += cov # end for # if (i > 0): break # end for + # take median of sampling rates found + for k, v in self.sr_dict.items(): + vals = np.array(self.sr_dict[k]) + + unique_sr = set(vals) + chosen_sr = np.median(vals) + if(len(unique_sr) > 1): + print('Multiple sampling rates ({}) found under {}. ' + 'Selecting median sampling rate ({}).'.format(unique_sr, k, chosen_sr)) + # end if + self.sr_dict[k] = chosen_sr + # end for + print('\nCreating metadata index for a total of {} traces found..'.format(len(self.meta_list))) for row in tqdm(self.meta_list): @@ -237,15 +251,15 @@ def __init__(self, mseed_folder, pattern): self.tree[nc][sc][lc][cc] = index.Index() # end if self.tree[nc][sc][lc][cc].insert(idx, (st, 1, et, 1)) - # end for + # end for # end func - def get_sampling_rate_coverage(self) -> defaultdict: - return self.sr_coverage_dict + def get_channel_sampling_rates(self) -> defaultdict: + return self.sr_dict # end func - def get_channel_coverage(self) -> defaultdict: - return self.ch_coverage_dict + def get_channel_coverages(self) -> defaultdict: + return self.coverage_dict # end func def __getstate__(self):