"""
follows Gary's TTF.m in
iris_mt_scratch/egbert_codes-20210121T193218Z-001/egbert_codes/matlabPrototype_10-13-20/TF/classes
2021-07-02 Removed some prototype methods intended to edit specific station data
when many stations are being processed. MMT methods to be addressed later.
2021-07-20: Addressing Issue #12. If we are going to use xarray it is
tempting to use the frequency band centers as the axis for the arrays
here, rather than simple integer indexing. This has the advantage of
making the data structures more explicit and self describing. We can also
continue to use integer indices to assign and access tf values if needed.
However, one concern I have is that if we use floating point numbers for the
frequencies (or periods) we run the risk of machine roundoff error giving
problems down stream. One way around this is to add a .band_centers()
method to FrequencyBands() which will provide is a list of band centers and
then rather than access by the band center, we can use an access method that
gets us the frequencies between the band_egdes, which will be a unique frequency
... however, if we use overlapping bands this will in general get complicated.
However, for an MT TF representation, we do not in general use overlapping
bands. A reasonably general, and simple solution is to make FrequencyBands
support an iterable of bands, accessable by integer position, and by center
frequency.
"""
import numpy as np
import xarray as xr
[docs]class TransferFunction(object):
"""
Class to contain transfer function array.
2021-07-21: adding processing_config so that it lives with this object.
This will facilitate the writing of z-files if we use a band setup file
that is not of EMTF style
Parameters:
TF : numpy array
array of transfer functions: TF(Nout, Nin, Nperiods)
T : numpy array
list of periods
Header : transfer_function_header.TransferFunctionHeader() object.
TF header contains local site header, remote site header if
appropriate, and information about estimation approach???
cov_ss_inv : numpy array
inverse signal power matrix. aka Cov_SS in EMTF matlab codes
cov_nn : numpy array
noise covariance matrix: aka Cov_NN in EMTF matlab codes
num_segments : integer array?
Number of samples used to estimate TF for each band, and for each \
output channel (might be different for different channels)
R2 : xarray.DataArray
multiple coherence for each output channel / band
FullCov : boolean
true if full covariance is provided
properties (Dependent)
StdErr % standard errors of TF components, same size and order as TF
NBands
freqs % inverse of period
Nout
Nin
"""
def __init__(self, tf_header, frequency_bands, **kwargs):
"""
change 2021-07-23 to require a frequency_bands object. We may want
to just pass the band_edges. I'm not a fan of forcing dependency of
TF on the FrequencyBands class, but it will simplify writting the z_file
interfaces. The old TTF.py class shows an example that just accepted
num_bands as an initialation variable.
Parameters
----------
tf_header
frequency_bands
"""
print("TODO: change self.T to self.period")
self.tf_header = tf_header
self.frequency_bands = frequency_bands
self.num_segments = None
self.cov_ss_inv = None
self.cov_nn = None
self.R2 = None
self.initialized = False
self.processing_config = kwargs.get("processing_config", None)
self.num_segments
if self.tf_header is not None:
if self.num_bands is not None:
self._initialize_arrays()
@property
def tf(self):
return self.transfer_function
@property
def num_bands(self):
"""
temporary function to allow access to old property num_bands used in
the matlab codes for initialization
Returns num_bands : int
a count of the frequency bands associated with the TF
-------
"""
return self.frequency_bands.number_of_bands
@property
def periods(self):
periods = self.frequency_bands.band_centers(frequency_or_period="period")
periods = np.flipud(periods)
return periods
# return self.frequency_bands.band_centers(frequency_or_period="period")
def _initialize_arrays(self):
"""
There are four separate data strucutres, indexed by period here:
TF (num_channels_out, num_channels_in)
cov_ss_inv (num_channels_in, num_channels_in, num_bands),
Cov_NN (num_channels_out, num_channels_out),
R2 num_channels_out)
We use frequency in Fourier domain and period in TF domain. Might
be better to be consistent. It would be inexpensive to duplicate
the axis and simply provide both.
Each of these is indexed by period (or frequency)
TODO: These may be better cast as xarrays. Review after curves up
and running
TODO: also, I prefer np.full(dim_tuple, np.nan) for init
Returns
-------
"""
if self.tf_header is None:
print("header needed to allocate transfer function arrays")
raise Exception
# <transfer function xarray>
tf_array_dims = (self.num_channels_out, self.num_channels_in, self.num_bands)
tf_array = np.zeros(tf_array_dims, dtype=np.complex128)
self.transfer_function = xr.DataArray(
tf_array,
dims=["output_channel", "input_channel", "period"], # frequency"],
coords={
# "frequency": self.frequency_bands.band_centers(),
"period": self.periods,
"output_channel": self.tf_header.output_channels,
"input_channel": self.tf_header.input_channels,
},
)
# </transfer function xarray>
# <num_segments xarray>
num_segments = np.zeros((self.num_channels_out, self.num_bands), dtype=np.int32)
num_segments_xr = xr.DataArray(
num_segments,
dims=["channel", "period"], # "frequency"],
coords={
# "frequency": self.frequency_bands.band_centers(),
"period": self.periods,
"channel": self.tf_header.output_channels,
},
)
self.num_segments = num_segments_xr
# <num_segments xarray>
# <Inverse signal covariance>
cov_ss_dims = (self.num_channels_in, self.num_channels_in, self.num_bands)
cov_ss_inv = np.zeros(cov_ss_dims, dtype=np.complex128)
self.cov_ss_inv = xr.DataArray(
cov_ss_inv,
dims=["input_channel_1", "input_channel_2", "period"],
coords={
"input_channel_1": self.tf_header.input_channels,
"input_channel_2": self.tf_header.input_channels,
"period": self.periods,
},
)
# </Inverse signal covariance>
# <Noise covariance>
cov_nn_dims = (self.num_channels_out, self.num_channels_out, self.num_bands)
cov_nn = np.zeros(cov_nn_dims, dtype=np.complex128)
self.cov_nn = xr.DataArray(
cov_nn,
dims=["output_channel_1", "output_channel_2", "period"],
coords={
"output_channel_1": self.tf_header.output_channels,
"output_channel_2": self.tf_header.output_channels,
"period": self.periods,
},
)
# </Noise covariance>
# <Coefficient of determination>
self.R2 = xr.DataArray(
np.zeros((self.num_channels_out, self.num_bands)),
dims=["output_channel", "period"],
coords={
"output_channel": self.tf_header.output_channels,
"period": self.periods,
},
)
# </Coefficient of determination>
self.initialized = True
@property
def minimum_period(self):
return np.min(self.periods)
@property
def maximum_period(self):
return np.max(self.periods)
@property
def num_channels_in(self):
return self.tf_header.num_input_channels
@property
def num_channels_out(self):
return self.tf_header.num_output_channels
[docs] def frequency_index(self, frequency):
return self.period_index(1.0 / frequency)
# frequency_index = np.isclose(self.num_segments.frequency, frequency)
# frequency_index = np.where(frequency_index)[0][0]
# return frequency_index
[docs] def period_index(self, period):
period_index = np.isclose(self.num_segments.period, period)
period_index = np.where(period_index)[0][0]
return period_index
# return self.frequency_index(1.0 / period)
[docs] def tf_to_df(self):
pass
# import pandas as pd
# columns = ["input_channel", "output_channel", "frequency", ]
# #"decimation_level"]
# df = pd.DataFrame(columns=columns)
[docs] def set_tf(self, regression_estimator, period):
"""
This sets TF elements for one band, using contents of TRegression
object. This version assumes there are estimates for Nout output
channels
"""
index = self.period_index(period)
tf = regression_estimator.b_to_xarray()
output_channels = list(regression_estimator._Y.data_vars)
input_channels = list(regression_estimator._X.data_vars)
for out_ch in output_channels:
for inp_ch in input_channels:
self.tf[:, :, index].loc[out_ch, inp_ch] = tf.loc[out_ch, inp_ch]
if regression_estimator.noise_covariance is not None:
for out_ch_1 in output_channels:
for out_ch_2 in output_channels:
tmp = regression_estimator.cov_nn.loc[out_ch_1, out_ch_2]
self.cov_nn[:, :, index].loc[out_ch_1, out_ch_2] = tmp
if regression_estimator.inverse_signal_covariance is not None:
for inp_ch_1 in input_channels:
for inp_ch_2 in input_channels:
tmp = regression_estimator.cov_ss_inv.loc[inp_ch_1, inp_ch_2]
self.cov_ss_inv[:, :, index].loc[inp_ch_1, inp_ch_2] = tmp
if regression_estimator.R2 is not None:
for out_ch in output_channels:
tmp = regression_estimator.R2.loc[out_ch]
self.R2[:, index].loc[out_ch] = tmp
self.num_segments.data[:, index] = regression_estimator.n_data
return
[docs] def standard_error(self):
"""
TODO: make this a property that returns self._standard_error so it doesn't
compute every time you call it.
Returns
-------
"""
stderr = np.zeros(self.tf.data.shape)
standard_error = xr.DataArray(
stderr,
dims=["output_channel", "input_channel", "period"],
coords={
"output_channel": self.tf_header.output_channels,
"input_channel": self.tf_header.input_channels,
"period": self.periods,
},
)
for out_ch in self.tf_header.output_channels:
for inp_ch in self.tf_header.input_channels:
for T in self.periods:
cov_ss = self.cov_ss_inv.loc[inp_ch, inp_ch, T]
cov_nn = self.cov_nn.loc[out_ch, out_ch, T]
standard_error.loc[out_ch, inp_ch, T] = np.sqrt(cov_ss * cov_nn)
return standard_error
[docs] def from_emtf_zfile(self):
pass
[docs] def to_emtf_zfile(self):
pass