Source code for dolfyn.adv.motion

from __future__ import division
import numpy as np
import scipy.signal as sig
from scipy.integrate import cumtrapz
from ..rotate import vector as rot
import warnings


def get_body2imu(make_model):
    if make_model == 'nortek vector':
        # In inches it is: (0.25, 0.25, 5.9)
        return np.array([0.00635, 0.00635, 0.14986])
    elif make_model.startswith('nortek signature'):
        return np.array([0.0, 0.0, 0.0])
    else:
        raise Exception("The imu->body vector is unknown for this instrument.")


class CalcMotion(object):

    """
    A 'calculator' for computing the velocity of points that are
    rigidly connected to an ADV-body with an IMU.

    Parameters
    ----------

    advo : `adv_raw<dolfyn.adv.base.adv_raw>`
           The IMU-adv object that will be used to compute motion.

    accel_filtfreq : float
      the frequency at which to high-pass filter the acceleration
      signal to remove low-frequency drift. (default: 0.03 Hz)

    vel_filtfreq : float (optional)
      a second frequency to high-pass filter the integrated
      acceleration.  (default: 1/3 of accel_filtfreq)

    Examples
    --------

    >>> from dolfyn.adv import api as avm
    >>> from dolfyn.adv import motion as avmot

    >>> dat = avm.read_nortek('my_data_file.vec')

    >>> mcalc = avmot.CalcMotion(dat)

    # Calculate the motion of a point that is (.3, .1, .06) meters
    # from the adv-body origin:
    >>> mot = mcalc([.3, .1, .06])

    """
    _default_accel_filtfreq = 0.03

    def __init__(self, advo,
                 accel_filtfreq=None,
                 vel_filtfreq=None,
                 to_earth=True):

        self.advo = advo
        self._check_filtfreqs(accel_filtfreq,
                              vel_filtfreq)
        self.to_earth = to_earth

        self._set_Accel()
        self._set_acclow()
        self.angrt = advo['orient']['angrt']  # No copy because not modified.

    def _check_filtfreqs(self, accel_filtfreq, vel_filtfreq):
        datval = self.advo.props.get('motion accel_filtfreq Hz', None)
        if datval is None:
            if accel_filtfreq is None:
                accel_filtfreq = self._default_accel_filtfreq
                # else use the accel_filtfreq value
        else:
            if accel_filtfreq is None:
                accel_filtfreq = datval
            else:
                if datval != accel_filtfreq:
                    # Neither are None!?!
                    warnings.warn(
                        "The data object specifies a value for accel_filtfreq "
                        "of {} Hz. Overriding this with the user-specified "
                        "value: {} Hz.".format(datval, accel_filtfreq))
        if vel_filtfreq is None:
            vel_filtfreq = self.advo.props.get('motion vel_filtfreq Hz', None)
        if vel_filtfreq is None:
            vel_filtfreq = accel_filtfreq / 3.0
        self.accel_filtfreq = accel_filtfreq
        self.accelvel_filtfreq = vel_filtfreq

    def _set_Accel(self, ):
        advo = self.advo
        if advo.props['coord_sys'] == 'inst':
            self.accel = np.einsum('ij...,i...->j...',
                                   advo['orient']['orientmat'],
                                   advo['orient']['accel'])
        elif self.advo.props['coord_sys'] == 'earth':
            self.accel = advo['orient']['accel'].copy()
        else:
            raise Exception(("Invalid coordinate system '%s'. The coordinate "
                             "system must either be 'earth' or 'inst' to "
                             "perform motion correction.")
                            % (self.advo.props['coord_sys'], ))

    def _set_acclow(self, ):
        """
        """
        self.acclow = acc = self.accel.copy()
        if self.accel_filtfreq == 0:
            acc[:] = acc.mean(-1)[..., None]
        else:
            flt = sig.butter(1, self.accel_filtfreq / (self.advo['props']['fs'] / 2))
            for idx in range(3):
                acc[idx] = sig.filtfilt(flt[0], flt[1], acc[idx])

    def __call__(self, vec):
        """
        Calculate the motion of the point specified by vec (in meters,
        in the adv-body coordinate system).

        Parameters
        ----------

        vec : |np.ndarray| (len(3) or 3 x M)
          The vector in meters (or set of vectors) from the
          body-origin (center of head end-cap) to the point of
          interest (in the body coord-sys).

        Returns
        -------
        umot : |np.ndarray| (3 x M x N_time)
          The motion (velocity) array (3, n_time).

        """
        return self.calc_velacc() + self.calc_velrot(np.array(vec), )

    def calc_velacc(self, ):
        """
        Calculates the translational velocity from the high-pass
        filtered acceleration signal.

        Returns
        -------
        velacc : |np.ndarray| (3 x n_time)
               The acceleration-induced velocity array (3, n_time).
        """
        samp_freq = self.advo['props']['fs']

        # Get high-pass accelerations
        hp = self.accel - self.acclow

        # Integrate in time to get velocities
        dat = np.concatenate((np.zeros(list(hp.shape[:-1]) + [1]),
                              cumtrapz(hp, dx=1 / samp_freq)), axis=-1)

        # Filter again?
        if self.accelvel_filtfreq > 0:
            filt_freq = self.accelvel_filtfreq
            # 2nd order Butterworth filter
            # Applied twice by 'filtfilt' = 4th order butterworth
            filt = sig.butter(2, float(filt_freq) / (samp_freq / 2))
            for idx in range(hp.shape[0]):
                dat[idx] = dat[idx] - sig.filtfilt(filt[0], filt[1], dat[idx])
        return dat

    def calc_velrot(self, vec, to_earth=None):

        """
        Calculate the induced velocity due to rotations of the instrument
        about the IMU center.

        Parameters
        ----------

        vec : |np.ndarray| (len(3) or 3 x M)
          The vector in meters (or vectors) from the body-origin
          (center of head end-cap) to the point of interest (in the
          body coord-sys).

        Returns
        -------
        velrot : |np.ndarray| (3 x M x N_time)
          The rotation-induced velocity array (3, n_time).

        """

        if to_earth is None:
            to_earth = self.to_earth

        dimflag = False
        if vec.ndim == 1:
            vec = vec.copy().reshape((3, 1))
            dimflag = True

        # Correct for the body->imu distance.
        # The nortek_body2imu vector is subtracted because of
        # vector addition:
        # body2head = body2imu + imu2head
        # Thus:
        # imu2head = body2head - body2imu
        vec = vec - get_body2imu(self.advo._make_model)[:, None]

        # This motion of the point *vec* due to rotations should be the
        # cross-product of omega (rotation vector) and the vector.
        #   u=dz*omegaY-dy*omegaZ,v=dx*omegaZ-dz*omegaX,w=dy*omegaX-dx*omegaY
        # where vec=[dx,dy,dz], and angrt=[omegaX,omegaY,omegaZ]
        velrot = np.array([(vec[2][:, None] * self.angrt[1] -
                            vec[1][:, None] * self.angrt[2]),
                           (vec[0][:, None] * self.angrt[2] -
                            vec[2][:, None] * self.angrt[0]),
                           (vec[1][:, None] * self.angrt[0] -
                            vec[0][:, None] * self.angrt[1]),
                           ])

        if to_earth:
            velrot = np.einsum('ji...,j...->i...', self.advo['orientmat'], velrot)

        if dimflag:
            return velrot[:, 0, :]

        return velrot


def _calc_probe_pos(advo, separate_probes=False):
    """
    !!!Currently this only works for Nortek Vectors!

    In the future, we could use the transformation matrix (and a
    probe-length lookup-table?)
    """
    # According to the ADV_DataSheet, the probe-length radius is
    # 8.6cm @ 120deg from probe-stem axis.  If I subtract 1cm
    # (!!!checkthis) to get acoustic receiver center, this is
    # 7.6cm.  In the coordinate sys of the center of the probe
    # then, the positions of the centers of the receivers is:
    p = advo.props
    if separate_probes and p['inst_make'].lower() == 'nortek' and\
       p['inst_model'].lower == 'vector':
        r = 0.076
        # The angle between the x-y plane and the probes
        phi = np.deg2rad(-30)
        # The angles of the probes from the x-axis:
        theta = np.deg2rad(np.array([0., 120., 240.]))
        return (np.dot(advo.props['inst2head_rotmat'].T,
                       np.array([r * np.cos(theta),
                                 r * np.sin(theta),
                                 r * np.tan(phi) * np.ones(3)])) +
                advo.props['inst2head_vec'][:, None]
                )
    else:
        return advo.props['inst2head_vec']


def correct_motion(advo,
                   accel_filtfreq=None,
                   vel_filtfreq=None,
                   to_earth=True,
                   separate_probes=False, ):
    """
    This function performs motion correction on an IMU-ADV data
    object. The IMU and ADV data should be tightly synchronized and
    contained in a single data object.

    Parameters
    ----------

    advo : dolfyn.adv.adv class

    accel_filtfreq : float
      the frequency at which to high-pass filter the acceleration
      signal to remove low-frequency drift.

    vel_filtfreq : float (optional)
      a second frequency to high-pass filter the integrated
      acceleration.  (default: 1/3 of accel_filtfreq)

    to_earth : bool (optional, default: True)
      All variables in the advo.props['rotate_vars'] list will be
      rotated into either the earth frame (to_earth=True) or the
      instrument frame (to_earth=False).

    separate_probes : bool (optional, default: False)
      a flag to perform motion-correction at the probe tips, and
      perform motion correction in beam-coordinates, then transform
      back into XYZ/earth coordinates. This correction seems to be
      lower than the noise levels of the ADV, so the defualt is to not
      use it (False).

    Returns
    -------
    This function returns None, it operates on the input data object,
    ``advo``. The following attributes are added to `advo`:

      ``velraw`` is the uncorrected velocity

      ``velrot`` is the rotational component of the head motion (from
                 angrt)

      ``velacc`` is the translational component of the head motion (from
                 accel, the high-pass filtered accel signal)

      ``acclow`` is the low-pass filtered accel signal (i.e., 

    The primary velocity vector attribute, ``vel``, is motion corrected
    such that:

          vel = velraw + velrot + velacc

    The signs are correct in this equation. The measured velocity
    induced by head-motion is *in the opposite direction* of the head
    motion.  i.e. when the head moves one way in stationary flow, it
    measures a velocity in the opposite direction. Therefore, to
    remove the motion from the raw signal we *add* the head motion.

    Notes
    -----

    Acceleration signals from inertial sensors are notorious for
    having a small bias that can drift slowly in time. When
    integrating these signals to estimate velocity the bias is
    amplified and leads to large errors in the estimated
    velocity. There are two methods for removing these errors,

    1) high-pass filter the acceleration signal prior and/or after
       integrating. This implicitly assumes that the low-frequency
       translational velocity is zero.
    2) provide a slowly-varying reference position (often from a GPS)
       to an IMU that can use the signal (usually using Kalman
       filters) to debias the acceleration signal.

    Because method (1) removes `real` low-frequency acceleration,
    method (2) is more accurate. However, providing reference position
    estimates to undersea instruments is practically challenging and
    expensive. Therefore, lacking the ability to use method (2), this
    function utilizes method (1).

    For deployments in which the ADV is mounted on a mooring, or other
    semi-fixed structure, the assumption of zero low-frequency
    translational velocity is a reasonable one. However, for
    deployments on ships, gliders, or other moving objects it is
    not. The measured velocity, after motion-correction, will still
    hold some of this contamination and will be a sum of the ADV
    motion and the measured velocity on long time scales.  If
    low-frequency motion is known separate from the ADV (e.g. from a
    bottom-tracking ADP, or from a ship's GPS), it may be possible to
    remove that signal from the ADV signal in post-processing. The
    accuracy of this approach has not, to my knowledge, been tested
    yet.

    Examples
    --------

    >>> from dolfyn.adv import api as avm
    >>> dat = avm.read_nortek('my_data_file.vec')
    >>> avm.motion.correct_motion(dat)

    ``dat`` will now have motion-corrected.

    """

    if hasattr(advo, 'velrot') or advo.props.get('motion corrected', False):
        raise Exception('The data object appears to already have been '
                        'motion corrected.')

    if advo.props['coord_sys'] != 'inst':
        advo.rotate2('inst', inplace=True)

    # Returns True/False if head2inst_rotmat has been set/not-set.
    # Bad configs raises errors (this is to check for those)
    rot._check_inst2head_rotmat(advo)
        
    # Create the motion 'calculator':
    calcobj = CalcMotion(advo,
                         accel_filtfreq=accel_filtfreq,
                         vel_filtfreq=vel_filtfreq,
                         to_earth=to_earth)

    ##########
    # Calculate the translational velocity (from the accel):
    advo['orient']['velacc'] = calcobj.calc_velacc()
    # Copy acclow to the adv-object.
    advo['orient']['acclow'] = calcobj.acclow

    ##########
    # Calculate rotational velocity (from angrt):
    pos = _calc_probe_pos(advo, separate_probes)
    # Calculate the velocity of the head (or probes).
    velrot = calcobj.calc_velrot(pos, to_earth=False)
    if separate_probes:
        # The head->beam transformation matrix
        transMat = advo.config.head.get('TransMatrix', None)
        # The body->head transformation matrix
        rmat = advo.props['inst2head_rotmat']

        # 1) Rotate body-coordinate velocities to head-coord.
        velrot = np.dot(rmat, velrot)
        # 2) Rotate body-coord to beam-coord (einsum),
        # 3) Take along beam-component (diagonal),
        # 4) Rotate back to head-coord (einsum),
        velrot = np.einsum('ij,kj->ik',
                           transMat,
                           np.diagonal(np.einsum('ij,j...->i...',
                                                 np.linalg.inv(transMat),
                                                 velrot)))
        # 5) Rotate back to body-coord.
        velrot = np.dot(rmat.T, velrot)
    advo['orient']['velrot'] = velrot

    ##########
    # Rotate the data into the correct coordinate system.
    # inst2earth expects a 'rotate_vars' property.
    # Add velrot, velacc, acclow, to it.
    if 'rotate_vars' not in advo.props:
        advo.props['rotate_vars'] = {'vel', 'orient.velrot', 'orient.velacc',
                                     'orient.accel', 'orient.acclow',
                                     'orient.angrt', 'orient.mag'}
    else:
        advo.props['rotate_vars'].update({'orient.velrot',
                                          'orient.velacc',
                                          'orient.acclow'})

    # NOTE: accel, acclow, and velacc are in the earth-frame after
    #       calc_velacc() call.
    if to_earth:
        advo['orient']['accel'] = calcobj.accel
        rot.inst2earth(advo, rotate_vars=advo.props['rotate_vars'] -
                       {'orient.accel', 'orient.acclow',
                        'orient.velacc', })
    else:
        # rotate these variables back to the instrument frame.
        rot.inst2earth(advo, reverse=True,
                       rotate_vars={'orient.acclow', 'orient.velacc', },
                       force=True, )

    ##########
    # Copy vel -> velraw prior to motion correction:
    advo['velraw'] = advo.vel.copy()
    # Add it to rotate_vars:
    advo.props['rotate_vars'].update({'velraw', })

    ##########
    # Remove motion from measured velocity!
    # NOTE: The plus sign is because the measured-induced velocities
    #       are in the opposite direction of the head motion.
    #       i.e. when the head moves one way in stationary flow, it
    #       measures a velocity in the opposite direction.
    velmot = advo['orient']['velrot'] + advo['orient']['velacc']
    if advo.props['inst_type'] == 'ADP':
        velmot = velmot[:, None]
    advo['vel'][:3] += velmot
    if advo._make_model.startswith('nortek signature') and \
       advo['vel'].shape[0] > 3:
        # This assumes these are w.
        advo['vel'][3:] += velmot[2:]
    advo.props['motion corrected'] = True
    advo.props['motion accel_filtfreq Hz'] = calcobj.accel_filtfreq


[docs]class CorrectMotion(object): """ This object performs motion correction on an IMU-ADV data object. The IMU and ADV data should be tightly synchronized and contained in a single data object. Parameters ---------- accel_filtfreq : float the frequency at which to high-pass filter the acceleration signal to remove low-frequency drift. vel_filtfreq : float (optional) a second frequency to high-pass filter the integrated acceleration. (default: 1/3 of accel_filtfreq) separate_probes : bool (optional: False) a flag to perform motion-correction at the probe tips, and perform motion correction in beam-coordinates, then transform back into XYZ/earth coordinates. This correction seems to be lower than the noise levels of the ADV, so the defualt is to not use it (False). Notes ----- Acceleration signals from inertial sensors are notorious for having a small bias that can drift slowly in time. When integrating these signals to estimate velocity the bias is amplified and leads to large errors in the estimated velocity. There are two methods for removing these errors, 1) high-pass filter the acceleration signal prior and/or after integrating. This implicitly assumes that the low-frequency translational velocity is zero. 2) provide a slowly-varying reference position (often from a GPS) to an IMU that can use the signal (usually using Kalman filters) to debias the acceleration signal. Because method (1) removes `real` low-frequency acceleration, method (2) is more accurate. However, providing reference position estimates to undersea instruments is practically challenging and expensive. Therefore, lacking the ability to use method (2), this function utilizes method (1). For deployments in which the ADV is mounted on a mooring, or other semi-fixed structure, the assumption of zero low-frequency translational velocity is a reasonable one. However, for deployments on ships, gliders, or other moving objects it is not. The measured velocity, after motion-correction, will still hold some of this contamination and will be a sum of the ADV motion and the measured velocity on long time scales. If low-frequency motion is known separate from the ADV (e.g. from a bottom-tracking ADP, or from a ship's GPS), it may be possible to remove that signal from the ADV signal in post-processing. The accuracy of this approach has not, to my knowledge, been tested yet. Examples -------- >>> from dolfyn.adv import api as avm >>> dat = avm.read_nortek('my_data_file.vec') >>> mc = avm.CorrectMotion(0.1) >>> corrected_data = mc(dat) """
[docs] def __init__(self, accel_filtfreq=None, vel_filtfreq=None, separate_probes=False): self.accel_filtfreq = accel_filtfreq self.accelvel_filtfreq = vel_filtfreq self.separate_probes = separate_probes warnings.warn("The 'CorrectMotion' class is being deprecated " "and will be removed in a future DOLfYN release. " "Use the 'correct_motion' function instead.", DeprecationWarning)
def _calc_rot_vel(self, calcobj): """ Calculate the 'rotational' velocity as measured by the IMU rate sensor. """ advo = calcobj.advo # This returns a 3x3 array of probe positions if # separate_probes is True. pos = self._calc_probe_pos(advo) # Calculate the velocity of the head (or probes). velrot = calcobj.calc_velrot(pos, to_earth=False) if self.separate_probes: # The head->beam transformation matrix transMat = advo.config.head.get('TransMatrix', None) # The body->head transformation matrix rmat = advo.props['inst2head_rotmat'] # 1) Rotate body-coordinate velocities to head-coord. velrot = np.dot(rmat, velrot) # 2) Rotate body-coord to beam-coord (einsum), # 3) Take along beam-component (diagonal), # 4) Rotate back to head-coord (einsum), velrot = np.einsum('ij,kj->ik', transMat, np.diagonal(np.einsum('ij,j...->i...', np.linalg.inv(transMat), velrot))) # 5) Rotate back to body-coord. velrot = np.dot(rmat.T, velrot) advo['orient']['velrot'] = velrot def _calc_probe_pos(self, advo): """ !!!Currently this only works for Nortek Vectors! In the future, we could use the transformation matrix (and a probe-length lookup-table?) """ # According to the ADV_DataSheet, the probe-length radius is # 8.6cm @ 120deg from probe-stem axis. If I subtract 1cm # (!!!checkthis) to get acoustic receiver center, this is # 7.6cm. In the coordinate sys of the center of the probe # then, the positions of the centers of the receivers is: p = advo.props if self.separate_probes and p['inst_make'].lower() == 'nortek' and \ p['inst_model'].lower == 'vector': r = 0.076 # The angle between the x-y plane and the probes phi = np.deg2rad(-30) # The angles of the probes from the x-axis: theta = np.deg2rad(np.array([0., 120., 240.])) return (np.dot(advo.props['inst2head_rotmat'].T, np.array([r * np.cos(theta), r * np.sin(theta), r * np.tan(phi) * np.ones(3)])) + advo.props['inst2head_vec'][:, None] ) else: return advo.props['inst2head_vec'] def _calc_accel_vel(self, calcobj): advo = calcobj.advo advo['orient']['velacc'] = calcobj.calc_velacc()
[docs] def __call__(self, advo, to_earth=True): """ Perform motion correction on an IMU-equipped ADV object. Parameters ---------- advo : :class:`ADVdata <base.ADVdata>` The adv object on which to perform motion correction. It must contain the following data attributes: - vel : The velocity array. - accel : The translational acceleration array. - angrt : The rotation-rate array. - orientmat : The orientation matrix. - props : a dictionary that has 'inst2head_vec' and 'coord_sys'. to_earth : bool (optional, default: True) A boolean that specifies whether the data should be rotated into the earth frame. Notes ----- After calling this function, `advo` will have *velrot* and *velacc* data attributes. The velocity vector attribute ``vel`` will be motion corrected according to: u_corr = u_raw + velacc + velrot Therefore, to recover the 'raw' velocity, subtract velacc and velrot from ``vel``. This method does not return a data object, it operates on (motion corrects) the input `advo`. """ if hasattr(advo, 'velrot') or \ advo.props.get('motion corrected', False): raise Exception('The data object appears to already have been ' 'motion corrected.') if not advo.props['inst_type'] == 'ADV': raise Exception("This motion correction tool currently only works " "for ADV data objects.") if advo.props['coord_sys'] != 'inst': advo.rotate2('inst', inplace=True) # Returns True/False if head2inst_rotmat has been set/not-set. # Bad configs raises errors (this is to check for those) rot._check_inst2head_rotmat(advo) calcobj = CalcMotion(advo, accel_filtfreq=self.accel_filtfreq, vel_filtfreq=self.accelvel_filtfreq, to_earth=to_earth) if 'rotate_vars' not in advo.props: advo.props['rotate_vars'] = {'vel', 'velraw', 'orient.velrot', 'orient.velacc', 'orient.accel', 'orient.acclow', 'orient.angrt', 'orient.mag'} else: advo.props['rotate_vars'].update({'velraw', 'orient.velrot', 'orient.velacc', 'orient.acclow', }) self._calc_rot_vel(calcobj) self._calc_accel_vel(calcobj) # calcobj.accel, calcobj.acclow, and velacc are already in # the earth frame. advo['orient']['acclow'] = calcobj.acclow advo['velraw'] = advo.vel.copy() if to_earth: advo['orient']['accel'] = calcobj.accel rot.inst2earth(advo, rotate_vars=advo.props['rotate_vars'] - {'orient.accel', 'orient.acclow', 'orient.velacc', }) else: # rotate these variables back to the instrument frame. rot.inst2earth(advo, reverse=True, rotate_vars={'orient.acclow', 'orient.velacc', }, force=True, ) # NOTE: The plus sign is because the measured-induced velocities # are in the opposite direction of the head motion. # i.e. when the head moves one way in stationary flow, it # measures a velocity in the opposite direction. advo['vel'] += (advo['orient']['velrot'] + advo['orient']['velacc']) advo.props['motion corrected'] = True advo.props['motion accel_filtfreq Hz'] = calcobj.accel_filtfreq