Source code for aurora.pipelines.time_series_helpers

import scipy.signal as ssig

from aurora.time_series.windowing_scheme import WindowingScheme


[docs]def validate_sample_rate(run_ts, config): if run_ts.sample_rate != config.sample_rate: print( f"sample rate in run time series {run_ts.sample_rate} and " f"processing config {config.sample_rate} do not match" ) raise Exception return
[docs]def apply_prewhitening(config, run_xrts_input): if config["prewhitening_type"] == "first difference": run_xrts = run_xrts_input.diff("time") else: run_xrts = run_xrts_input return run_xrts
[docs]def apply_recoloring(config, stft_obj): if config["prewhitening_type"] == "first difference": from aurora.time_series.frequency_domain_helpers import get_fft_harmonics from numpy import pi freqs = get_fft_harmonics(config.num_samples_window, config.sample_rate) prewhitening_correction = 1.0j * 2 * pi * freqs # jw stft_obj /= prewhitening_correction return stft_obj
[docs]def run_ts_to_stft_scipy(config, run_xrts_orig): """ Parameters ---------- config run_xrts Returns ------- """ import xarray as xr run_xrts = apply_prewhitening(config, run_xrts_orig) windowing_scheme = WindowingScheme( taper_family=config.taper_family, num_samples_window=config.num_samples_window, num_samples_overlap=config.num_samples_overlap, taper_additional_args=config.taper_additional_args, sample_rate=config.sample_rate, ) # stft_obj = run_xrts.copy(deep=True) stft_obj = xr.Dataset() for channel_id in run_xrts.data_vars: ff, tt, specgm = ssig.spectrogram( run_xrts[channel_id].data, fs=config.sample_rate, window=windowing_scheme.taper, nperseg=config.num_samples_window, noverlap=config.num_samples_overlap, detrend="linear", scaling="density", mode="complex", ) # drop Nyquist> ff = ff[:-1] specgm = specgm[:-1, :] import numpy as np specgm *= np.sqrt(2) # make time_axis tt = tt - tt[0] tt *= config.sample_rate time_axis = run_xrts.time.data[tt.astype(int)] xrd = xr.DataArray( specgm.T, dims=["time", "frequency"], coords={"frequency": ff, "time": time_axis}, ) stft_obj.update({channel_id: xrd}) stft_obj = apply_recoloring(config, stft_obj) return stft_obj
[docs]def run_ts_to_stft(config, run_xrts_orig): """ Parameters ---------- config : ShortTimeFourierTransformConfig object run_ts ; mth5.RunTS (but could be replaced by the xr.dataset....) Returns ------- """ from aurora.time_series.windowed_time_series import WindowedTimeSeries windowing_scheme = WindowingScheme( taper_family=config.taper_family, num_samples_window=config.num_samples_window, num_samples_overlap=config.num_samples_overlap, taper_additional_args=config.taper_additional_args, sample_rate=config.sample_rate, ) run_xrts = apply_prewhitening(config, run_xrts_orig) windowed_obj = windowing_scheme.apply_sliding_window( run_xrts, dt=1.0 / config.sample_rate ) windowed_obj = WindowedTimeSeries.detrend(data=windowed_obj, detrend_type="linear") tapered_obj = windowed_obj * windowing_scheme.taper # stft_obj = WindowedTimeSeries.apply_stft(data=tapered_obj, # sample_rate=windowing_scheme.sample_rate, # detrend_type="linear", # scale_factor=windowing_scheme.linear_spectral_density_calibration_factor) stft_obj = windowing_scheme.apply_fft( tapered_obj, detrend_type=config.extra_pre_fft_detrend_type ) stft_obj = apply_recoloring(config, stft_obj) return stft_obj
[docs]def run_ts_to_calibrated_stft(run_ts, run_obj, config, units="MT"): """ Parameters ---------- run_ts run_obj config units Returns ------- """ stft_obj = run_ts_to_stft(config, run_ts.dataset) stft_obj = calibrate_stft_obj(stft_obj, run_obj, units=units) return stft_obj
[docs]def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None): """ Parameters ---------- stft_obj run_obj units scale_factors : dict keyed by channel, supports a single scalar to apply to that channels data Useful for debugging. Should not be used in production and should throw a warning if it is not None Returns ------- """ for channel_id in stft_obj.keys(): mth5_channel = run_obj.get_channel(channel_id) channel_filter = mth5_channel.channel_response_filter if not channel_filter.filters_list: print("WARNING UNEXPECTED CHANNEL WITH NO FILTERS") # ONE OFF HACK FOR SAO missing data if channel_id == "hy": print("WARNING ONE-OFF PKD SAO RR") channel_filter = run_obj.get_channel("hx").channel_response_filter calibration_response = channel_filter.complex_response(stft_obj.frequency.data) if channel_scale_factors: try: channel_scale_factor = channel_scale_factors[channel_id] except KeyError: channel_scale_factor = 1.0 calibration_response /= channel_scale_factor if units == "SI": print("Warning: SI Units are not robustly supported issue #36") stft_obj[channel_id].data /= calibration_response return stft_obj