from __future__ import division
import numpy as np
from ..tools.psd import psd_freq, cohere, psd, cpsd_quasisync, \
cpsd, phase_angle
from ..tools.misc import slice1d_along_axis, detrend
from .base import ma, TimeData
import copy
import warnings
import six
class TimeBinner(object):
def calc_omega(self, fs=None, coh=False):
"""
Calculate the radial-frequency vector for the psd's.
Parameters
----------
fs : float (optional)
The sample rate (Hz).
coh : bool
Calculate the frequency vector for coherence/cross-spectra
(default: False) i.e. use self.n_fft_coh instead of
self.n_fft.
"""
n_fft = self.n_fft
freq_dim = 'freq'
fs = self._parse_fs(fs)
if coh:
n_fft = self.n_fft_coh
freq_dim = 'coh_freq'
dat = ma.marray(psd_freq(n_fft, fs * 2 * np.pi),
ma.varMeta('\omega', {'s': -1}, [freq_dim]))
return dat
def _outshape(self, inshape, n_pad=0, n_bin=None):
"""
Returns `outshape` (the 'reshape'd shape) for an `inshape` array.
"""
n_bin = int(self._parse_nbin(n_bin))
return list(inshape[:-1]) + [int(inshape[-1] // n_bin), int(n_bin + n_pad)]
def _outshape_fft(self, inshape, n_fft=None, n_bin=None):
"""
Returns `outshape` (the fft 'reshape'd shape) for an `inshape` array.
"""
n_fft = self._parse_nfft(n_fft)
n_bin = self._parse_nbin(n_bin)
return list(inshape[:-1]) + [int(inshape[-1] // n_bin), int(n_fft // 2)]
def _parse_fs(self, fs=None):
if fs is not None:
return fs
return self.fs
def _parse_nbin(self, n_bin=None):
if n_bin is None:
return self.n_bin
return n_bin
def _parse_nfft(self, n_fft=None):
if n_fft is None:
return self.n_fft
return n_fft
def reshape(self, arr, n_pad=0, n_bin=None):
"""
Reshape the array `arr` to shape (...,n,n_bin+n_pad).
Parameters
----------
arr : np.ndarray
n_pad : int
Is used to add `n_pad`/2 points from the end of the previous
ensemble to the top of the current, and `n_pad`/2 points
from the top of the next ensemble to the bottom of the
current. Zeros are padded in the upper-left and lower-right
corners of the matrix (beginning/end of timeseries). In
this case, the array shape will be (...,`n`,`n_pad`+`n_bin`)
n_bin : float, int (optional)
Override this binner's n_bin.
Notes
-----
`n_bin` can be non-integer, in which case the output array
size will be `n_pad`+`n_bin`, and the decimal will
cause skipping of some data points in `arr`. In particular,
every mod(`n_bin`,1) bins will have a skipped point. For
example:
- for n_bin=2048.2 every 1/5 bins will have a skipped point.
- for n_bin=4096.9 every 9/10 bins will have a skipped point.
"""
n_bin = self._parse_nbin(n_bin)
npd0 = int(n_pad // 2)
npd1 = int((n_pad + 1) // 2)
shp = self._outshape(arr.shape, n_pad=0, n_bin=n_bin)
out = np.zeros(
self._outshape(arr.shape, n_pad=n_pad, n_bin=n_bin),
dtype=arr.dtype)
if np.mod(n_bin, 1) == 0:
# n_bin needs to be int
n_bin = int(n_bin)
# If n_bin is an integer, we can do this simply.
out[..., npd0: n_bin + npd0] = (
arr[..., :(shp[-2] * shp[-1])]).reshape(shp, order='C')
else:
inds = (np.arange(np.prod(shp[-2:])) * n_bin // int(n_bin)
).astype(int)
n_bin = int(n_bin)
out[..., npd0:n_bin + npd0] = (arr[..., inds]
).reshape(shp, order='C')
n_bin = int(n_bin)
if n_pad != 0:
out[..., 1:, :npd0] = out[..., :-1, n_bin:n_bin + npd0]
out[..., :-1, -npd1:] = out[..., 1:, npd0:npd0 + npd1]
if isinstance(arr, np.ma.MaskedArray):
out = np.ma.masked_where(self.reshape(arr.mask,
n_pad=n_pad,
n_bin=n_bin),
out)
if ma.valid and isinstance(out, ma.marray):
out.meta.dim_names += ['time2']
return out
def detrend(self, dat, n_pad=0, n_bin=None):
"""
Reshape the array `dat` and remove the best-fit trend line.
... Need to fix this to deal with NaNs...
"""
return detrend(self.reshape(dat, n_pad=n_pad, n_bin=n_bin), axis=-1)
def demean(self, dat, n_pad=0, n_bin=None):
"""
Reshape the array `dat` and remove the mean from each ensemble.
"""
dt = self.reshape(dat, n_pad=n_pad, n_bin=n_bin)
return dt - (dt.mean(-1)[..., None])
def mean(self, dat, axis=-1, n_bin=None, mask_thresh=None):
"""Average an array object.
Parameters
----------
n_bin : int (default is self.n_bin)
mask_thresh : float (between 0 and 1)
if the input data is a masked array, and mask_thresh is
not None mask the averaged values where the fraction of
bad points is greater than mask_thresh
"""
# Can I turn this 'swapaxes' stuff into a decorator?
if axis != -1:
dat = np.swapaxes(dat, axis, -1)
n_bin = self._parse_nbin(n_bin)
tmp = self.reshape(dat, n_bin=n_bin)
out = tmp.mean(-1)
if isinstance(dat, np.ma.MaskedArray) and mask_thresh is not None:
out.mask = tmp.mask.sum(-1).astype(np.float) > mask_thresh * n_bin
if axis != -1:
np.swapaxes(out, axis, -1)
if dat.__class__ is np.ndarray:
return out
return out.view(dat.__class__)
def mean_angle(self, dat, axis=-1, units='radians',
n_bin=None, mask_thresh=None):
"""Average an angle array.
Parameters
----------
units : {'radians' | 'degrees'}
n_bin : int (default is self.n_bin)
mask_thresh : float (between 0 and 1)
if the input data is a masked array, and mask_thresh is
not None mask the averaged values where the fraction of
bad points is greater than mask_thresh
"""
if units.lower().startswith('deg'):
dat = dat * np.pi / 180
elif units.lower().startswith('rad'):
pass
else:
raise ValueError("Units must be either 'rad' or 'deg'.")
return np.angle(self.mean(np.exp(1j * dat)))
def var(self, dat, n_bin=None):
return self.reshape(dat, n_bin=n_bin).var(-1)
def std(self, dat, n_bin=None):
return self.reshape(dat, n_bin=n_bin).std(-1)
def calc_acov(self, indat, n_bin=None):
"""
Calculate the auto-covariance of the raw-signal `indat`.
As opposed to calc_xcov, which returns the full
cross-covariance between two arrays, this function only
returns a quarter of the full auto-covariance. It computes the
auto-covariance over half of the range, then averages the two
sides (to return a 'quartered' covariance).
This has the advantage that the 0 index is actually zero-lag.
"""
n_bin = self._parse_nbin(n_bin)
out = np.empty(self._outshape(indat.shape, n_bin=n_bin)[:-1] +
[int(n_bin // 4)], dtype=indat.dtype)
dt1 = self.reshape(indat, n_pad=n_bin / 2 - 2)
# Here we de-mean only on the 'valid' range:
dt1 = dt1 - dt1[..., :, int(n_bin // 4):int(-n_bin // 4)].mean(-1)[..., None]
dt2 = self.demean(indat) # Don't pad the second variable.
dt2 = dt2 - dt2.mean(-1)[..., None]
se = slice(int(n_bin // 4) - 1, None, 1)
sb = slice(int(n_bin // 4) - 1, None, -1)
for slc in slice1d_along_axis(dt1.shape, -1):
tmp = np.correlate(dt1[slc], dt2[slc], 'valid')
# The zero-padding in reshape means we compute coherence
# from one-sided time-series for first and last points.
if slc[-2] == 0:
out[slc] = tmp[se]
elif slc[-2] == dt2.shape[-2] - 1:
out[slc] = tmp[sb]
else:
# For the others we take the average of the two sides.
out[slc] = (tmp[se] + tmp[sb]) / 2
return out
def calc_lag(self, npt=None, one_sided=False):
if npt is None:
npt = self.n_bin
if one_sided:
return np.arange(int(npt // 2), dtype=np.float32)
else:
return np.arange(npt, dtype=np.float32) - int(npt // 2)
def calc_xcov(self, indt1, indt2, npt=None,
n_bin1=None, n_bin2=None, normed=False):
"""
Calculate the cross-covariance between arrays indt1 and indt2
for each bin.
"""
n_bin1 = self._parse_nbin(n_bin1)
n_bin2 = self._parse_nbin(n_bin2)
shp = self._outshape(indt1.shape, n_bin=n_bin1)
shp[-2] = min(shp[-2], self._outshape(indt2.shape, n_bin=n_bin2)[-2])
out = np.empty(shp[:-1] + [npt], dtype=indt1.dtype)
tmp = int(n_bin2) - int(n_bin1) + npt
dt1 = self.reshape(indt1, n_pad=tmp - 1, n_bin=n_bin1)
# Note here I am demeaning only on the 'valid' range:
dt1 = dt1 - dt1[..., :, int(tmp // 2):int(-tmp // 2)].mean(-1)[..., None]
# Don't need to pad the second variable:
dt2 = self.demean(indt2, n_bin=n_bin2)
dt2 = dt2 - dt2.mean(-1)[..., None]
for slc in slice1d_along_axis(shp, -1):
out[slc] = np.correlate(dt1[slc], dt2[slc], 'valid')
if normed:
out /= (self.std(indt1, n_bin=n_bin1)[..., :shp[-2]] *
self.std(indt2, n_bin=n_bin2)[..., :shp[-2]] *
n_bin2)[..., None]
return out
def do_avg(self, rawdat, outdat=None, names=None, n_time=None):
"""
Parameters
----------
rawdat : raw_data_object
The raw data structure to be binned
outdat : avg_data_object
The bin'd (output) data object to which averaged data is added.
names : list of strings
The names of variables to be averaged. If `names` is None,
all data in `rawdat` will be binned.
"""
props = {}
if n_time is None:
n_time = rawdat.n_time
if outdat is None:
outdat = type(rawdat)()
props['n_bin'] = self.n_bin
props['n_fft'] = self.n_fft
if names is None:
names = rawdat.keys()
for ky in names:
if isinstance(ky, six.string_types) and '.' in ky:
g, nm = ky.split('.', 1)
if g not in outdat:
outdat[g] = type(rawdat[g])()
outdat[g][nm] = self.mean(rawdat[g][nm],
axis=rawdat[g]._time_dim)
elif isinstance(rawdat[ky], TimeData):
outdat[ky] = TimeData()
self.do_avg(rawdat[ky], outdat[ky], n_time=n_time)
elif (isinstance(rawdat[ky], np.ndarray) and
rawdat[ky].shape[rawdat._time_dim] == n_time):
outdat[ky] = self.mean(rawdat[ky], axis=rawdat._time_dim)
else:
outdat[ky] = copy.deepcopy(rawdat[ky])
if 'props' in outdat:
outdat['props'].update(props)
return outdat
def do_var(self, rawdat, outdat=None, names=None, suffix='_var'):
"""Calculate the variance of data attributes.
Parameters
----------
rawdat : raw_data_object
The raw data structure to be binned.
outdat : avg_data_object
The bin'd (output) data object to which variance data is added.
names : list of strings
The names of variables of which to calculate variance. If
`names` is None, all data in `rawdat` will be binned.
"""
if outdat is None:
outdat = TimeData()
if names is None:
names = rawdat.keys()
for ky in names:
if isinstance(rawdat[ky], TimeData):
outdat[ky] = TimeData()
self.do_avg(rawdat[ky], outdat[ky])
elif isinstance(rawdat[ky], np.ndarray):
outdat[ky] = self.reshape(rawdat[ky]).var(-1)
else:
outdat[ky] = copy.deepcopy(rawdat[ky])
return outdat
def __init__(self, n_bin, fs, n_fft=None, n_fft_coh=None):
"""
Initialize an averaging object.
Parameters
----------
n_bin : int
the number of data points to include in a 'bin' (average).
n_fft : int
the number of data points to use for fft (`n_fft`<=`n_bin`).
Default: `n_fft`=`n_bin`
n_fft_coh : int
the number of data points to use for coherence and cross-spectra ffts
(`n_fft_coh`<=`n_bin`). Default: `n_fft_coh`=`n_bin`/6
"""
self.n_bin = n_bin
self.fs = fs
self.n_fft = n_fft
self.n_fft_coh = n_fft_coh
if n_fft is None:
self.n_fft = n_bin
elif n_fft > n_bin:
self.n_fft = n_bin
print("n_fft larger than n_bin \
doesn't make sense, setting n_fft=n_bin")
if n_fft_coh is None:
self.n_fft_coh = int(self.n_bin // 6)
elif n_fft_coh >= n_bin:
self.n_fft_coh = int(n_bin // 6)
print("n_fft_coh must be smaller than n_bin, "
"setting n_fft_coh=n_bin / 6")
def _check_indata(self, rawdat):
if np.any(np.array(rawdat.shape) == 0):
raise RuntimeError(
"The input data cannot be averaged "
"because it is empty.")
if 'DutyCycle_NBurst' in rawdat.props and \
rawdat.props['DutyCycle_NBurst'] < self.n_bin:
warnings.warn(
"The averaging interval (n_bin = {}) is "
"larger than the burst interval (NBurst = {})!"
.format(self.n_bin, rawdat.props['DutyCycle_NBurst']))
if rawdat['props']['fs'] != self.fs:
raise Exception(
"The input data sample rate (dat.fs) does not "
"match the sample rate of this binning-object!")
def cohere(self, dat1, dat2, window='hann', debias=True,
noise=(0, 0), n_fft=None, n_bin1=None, n_bin2=None,):
"""
Calculate coherence between `dat1` and `dat2`.
"""
if n_fft is None:
n_fft = self.n_fft_coh
n_bin1 = self._parse_nbin(n_bin1)
n_bin2 = self._parse_nbin(n_bin2)
oshp = self._outshape_fft(dat1.shape, n_fft=n_fft, n_bin=n_bin1)
oshp[-2] = np.min([oshp[-2], int(dat2.shape[-1] // n_bin2)])
out = np.empty(oshp, dtype=dat1.dtype)
# The data is detrended in psd, so we don't need to do it here.
dat1 = self.reshape(dat1, n_pad=n_fft, n_bin=n_bin1)
dat2 = self.reshape(dat2, n_pad=n_fft, n_bin=n_bin2)
for slc in slice1d_along_axis(out.shape, -1):
out[slc] = cohere(dat1[slc], dat2[slc],
n_fft, debias=debias, noise=noise)
return out
def cpsd(self, dat1, dat2, fs=None, window='hann',
n_fft=None, n_bin1=None, n_bin2=None,):
"""
Calculate the 'cross power spectral density' of `dat`.
Parameters
----------
dat1 : np.ndarray
The first raw-data array of which to calculate the cpsd.
dat2 : np.ndarray
The second raw-data array of which to calculate the cpsd.
window : string
String indicating the window function to use (default: 'hanning').
Returns
-------
out : np.ndarray
The cross-spectral density of `dat1` and `dat2`
"""
fs = self._parse_fs(fs)
if n_fft is None:
n_fft = self.n_fft_coh
n_bin1 = self._parse_nbin(n_bin1)
n_bin2 = self._parse_nbin(n_bin2)
oshp = self._outshape_fft(dat1.shape, n_fft=n_fft, n_bin=n_bin1)
oshp[-2] = np.min([oshp[-2], int(dat2.shape[-1] // n_bin2)])
# The data is detrended in psd, so we don't need to do it here:
dat1 = self.reshape(dat1, n_pad=n_fft)
dat2 = self.reshape(dat2, n_pad=n_fft)
out = np.empty(oshp, dtype='c{}'.format(dat1.dtype.itemsize * 2))
if dat1.shape == dat2.shape:
cross = cpsd
else:
cross = cpsd_quasisync
for slc in slice1d_along_axis(out.shape, -1):
# PSD's are computed in radian units:
out[slc] = cross(dat1[slc], dat2[slc], n_fft,
2 * np.pi * fs, window=window)
return out
def phase_angle(self, dat1, dat2, window='hann',
n_fft=None, n_bin1=None, n_bin2=None,):
"""
Calculate the phase difference between two signals as a
function of frequency (complimentary to coherence).
Parameters
----------
dat1 : np.ndarray
The first raw-data array of which to calculate the cpsd.
dat2 : np.ndarray
The second raw-data array of which to calculate the cpsd.
window : string
String indicating the window function to use (default: 'hanning').
Returns
-------
out : np.ndarray
The phase difference between signal dat1 and dat2.
"""
if n_fft is None:
n_fft = self.n_fft_coh
n_bin1 = self._parse_nbin(n_bin1)
n_bin2 = self._parse_nbin(n_bin2)
oshp = self._outshape_fft(dat1.shape, n_fft=n_fft, n_bin=n_bin1)
oshp[-2] = np.min([oshp[-2], int(dat2.shape[-1] // n_bin2)])
# The data is detrended in psd, so we don't need to do it here:
dat1 = self.reshape(dat1, n_pad=n_fft)
dat2 = self.reshape(dat2, n_pad=n_fft)
out = np.empty(oshp, dtype='c{}'.format(dat1.dtype.itemsize * 2))
for slc in slice1d_along_axis(out.shape, -1):
# PSD's are computed in radian units:
out[slc] = phase_angle(dat1[slc], dat2[slc], n_fft,
window=window)
return out
def psd(self, dat, fs=None, window='hann', noise=0,
n_bin=None, n_fft=None, step=None, n_pad=None):
"""
Calculate 'power spectral density' of `dat`.
Parameters
----------
dat : data_object
The raw-data array of which to calculate the psd.
window : string
String indicating the window function to use (default: 'hanning').
noise : float
The white-noise level of the measurement (in the same units
as `dat`).
"""
fs = self._parse_fs(fs)
n_bin = self._parse_nbin(n_bin)
n_fft = self._parse_nfft(n_fft)
if n_pad is None:
n_pad = min(n_bin - n_fft, n_fft)
out = np.empty(self._outshape_fft(dat.shape, n_fft=n_fft, n_bin=n_bin))
# The data is detrended in psd, so we don't need to do it here.
dat = self.reshape(dat, n_pad=n_pad)
for slc in slice1d_along_axis(dat.shape, -1):
# PSD's are computed in radian units:
out[slc] = psd(dat[slc], n_fft, 2 * np.pi * fs,
window=window, step=step)
if ma.valid and ma.marray in dat.__class__.__mro__:
out = ma.marray(
out,
ma.varMeta('S(%s)' % dat.meta.name,
ma.unitsDict({'s': 1}) * dat.meta._units**2,
dim_names=dat.meta.dim_names[:-1] + ['freq'])
)
# The dat.meta.dim_names[:-1] drops the 'time2' dim_name.
if noise != 0:
# the two in 2*np.pi cancels with the two in 'self.fs/2':
out -= noise**2 / (np.pi * fs)
# Make sure all values of the PSD are >0 (but still small):
out[out < 0] = np.min(np.abs(out)) / 100
return out