# pylint: disable=too-many-locals,invalid-name,no-member
"""Core functionality for the package
Library of main functions required for flux calculations.
"""
from collections import namedtuple
from itertools import groupby
import numpy as np
from scipy import interpolate as itpl
from scipy import signal, integrate
from scipy.stats import zscore
from astropy.convolution import convolve, Box1DKernel
__all__ = ["smooth_angle", "planarfit", "rotate_coordinates",
"rotate_wind3d", "wind3D_correct", "despike_VickersMahrt"]
# Valid 3-D wind rotation methods
_VECTOR_ROTATION_METHODS = {"DR", "TR", "PF"}
AngleCoordinates = namedtuple("AngleCoordinates", ["x", "y"])
Vector = namedtuple("Vector", ["angle", "magnitude"])
PlanarFitCoefs = namedtuple("PlanarFitCoefs",
["k_vct", "tilt_coefs", "phi", "theta"])
RotatedVectors = namedtuple("RotatedVectors", ["rotated", "phi_theta"])
CorrectedWind3D = namedtuple("CorrectedWind3D",
["uvw_ship",
"euler_angles",
"euler_angles_angular_rates",
"euler_angles_accelerations",
"euler_angles_slow",
"euler_angles_fast",
"mount_offset_rotations",
"uvw_earth",
"uvw_angular",
"uvw_linear",
"ship_enu",
"uvw_enu",
"imu_enu"])
VickersMahrt = namedtuple("VickersMahrt",
["x", "nspikes", "ntrends", "kclass"])
def decompose(angle, vmagnitude):
"""Decompose angle and magnitude into `x` and `y` coordinates
Parameters
----------
angle : array_like
The angle(s) in degree units.
vmagnitude : array_like
The magnitude array associated with each angle. It can be a scalar
that is common to all ``angle`` elements.
Returns
-------
AngleCoordinates : namedtuple
namedtuple with ndarrays `x` and `y`, in that order.
Examples
--------
>>> angles = np.arange(0, 360, 36)
>>> vmags = np.arange(10)
>>> decompose(angles, vmags) # doctest: +NORMALIZE_WHITESPACE
(array([ 0. , 0.80901699, 0.61803399, -0.92705098, -3.23606798,
-5. , -4.85410197, -2.16311896, 2.47213595, 7.28115295]),
array([ 0.00000000e+00, 5.87785252e-01, 1.90211303e+00,
2.85316955e+00, 2.35114101e+00, 6.12323400e-16,
-3.52671151e+00, -6.65739561e+00, -7.60845213e+00,
-5.29006727e+00]))
"""
x = vmagnitude * np.cos(np.radians(angle))
y = vmagnitude * np.sin(np.radians(angle))
return AngleCoordinates(x, y)
def recompose(x, y):
"""Recompose angles and associated magnitudes from ``x`` and ``y`` vectors
Parameters
----------
x : array_like or scalar
``x``-coordinates
y : array_like or scalar
``y``-coordinates
Returns
-------
Vector : namedtuple
namedtuple with ndarrays ``angle`` and ``magnitude``, in that
order.
"""
vmag = np.sqrt((x ** 2) + (y ** 2))
ang = np.arctan2(y, x)
try: # first assume array inputs
ang[ang < 0] = ang[ang < 0] + (2 * np.pi) # output range 0 - 2*pi
ang[vmag == 0] = 0 # when magnitude is 0 the angle is also 0
ang[ang == 0] = 2 * np.pi # convention
except (IndexError, TypeError): # then check if scalar inputs
if np.isscalar(ang) and ang < 0:
ang = ang + (2 * np.pi)
if np.isscalar(ang) and vmag == 0:
ang = 0
if np.isscalar(ang) and ang == 0:
ang = 2 * np.pi
return Vector(np.degrees(ang), vmag)
[docs]def smooth_angle(angle, vmagnitude=1, kernel_width=21):
"""Smooth angles by decomposing them, applying a boxcar average
Smoothing is done by using a 1D kernel smoothing filter of a given
width.
Parameters
----------
angle : numpy.ndarray
The angle(s) in degree units.
vmagnitude : numpy.ndarray, optional
The magnitude array associated with each angle. It can be a scalar
that is common to all ``angle`` elements.
kernel_width : int, optional
The width of the filter kernel.
Returns
-------
namedtuple
namedtuple with ndarrays ``angle`` and ``magnitude``, in that
order.
"""
x, y = decompose(angle, vmagnitude)
x_smooth = convolve(x, Box1DKernel(kernel_width), boundary="extend")
y_smooth = convolve(y, Box1DKernel(kernel_width), boundary="extend")
return recompose(x_smooth, y_smooth)
def level3D_motion(accel, ang_rate, roll_range, pitch_range):
"""Level 3D acceleration and angular rate measured by motion sensor"""
pass # IMPLEMENT THIS?
def level3D_anemometer(wind_speed, roll, pitch):
"""Level 3D anemometer measurements, given mean roll and pitch"""
pass # IMPLEMENT THIS?
[docs]def planarfit(vectors):
"""Calculate planar fit coefficients for coordinate rotation
See Handbook of Micrometeorology (Lee et al. 2004). Ported from
getPlanarFitCoeffs.m from Patric Sturm <pasturm@ethz.ch>.
Parameters
----------
vectors : numpy.ndarray
A 2-D (Nx3) array with `x`, `y`, and `z` vectors, expressed in a
right-handed coordinate system. These vectors may correspond to
`u`, `v`, and `w` wind speed vectors, or inertial acceleration
components.
Returns
-------
PlanarFitCoefs : namedtuple
namedtuple with (index and name in brackets):
numpy.ndarray [0, `k_vct`]
1-D array (1x3) unit vector parallel to the new z-axis.
numpy.ndarray [1, `tilt_coefs`]
1-D array (1x3) Tilt coefficients `b0`, `b1`, `b2`.
numpy.float [2, `phi`]
Scalar representing roll angle :math:`\\phi`.
numpy.float [3, `theta`]
Scalar representing pitch angle :math:`\\theta`.
"""
vct_u = vectors[:, 0]
vct_v = vectors[:, 1]
vct_w = vectors[:, 2]
vct_nrows = vectors.shape[0]
sum_u = sum(vct_u)
sum_v = sum(vct_v)
sum_w = sum(vct_w)
dot_uv = np.dot(vct_u, vct_v)
dot_uw = np.dot(vct_u, vct_w)
dot_vw = np.dot(vct_v, vct_w)
dot_u2 = np.dot(vct_u, vct_u)
dot_v2 = np.dot(vct_v, vct_v)
H_arr = np.array([[vct_nrows, sum_u, sum_v],
[sum_u, dot_u2, dot_uv],
[sum_v, dot_uv, dot_v2]])
g_arr = np.array([sum_w, dot_uw, dot_vw])
tilt_coef = np.linalg.solve(H_arr, g_arr)
# Estimated \phi (roll) and \theta (pitch) tilt angles
phi_denom = np.sqrt(1 + (tilt_coef[2] ** 2))
phi_sin = tilt_coef[2] / phi_denom
phi_cos = 1 / phi_denom
phi = np.arctan2(phi_sin, phi_cos)
theta_denom = np.sqrt(1 + (tilt_coef[1] ** 2) + (tilt_coef[2] ** 2))
theta_sin = -tilt_coef[1] / theta_denom
theta_cos = np.sqrt((tilt_coef[2] ** 2) + 1) / theta_denom
theta = np.arctan2(theta_sin, theta_cos)
# Determine unit vector parallel to new z-axis
k_2 = 1 / np.sqrt(1 + tilt_coef[1] ** 2 + tilt_coef[2] ** 2)
k_0 = -tilt_coef[1] * k_2
k_1 = -tilt_coef[2] * k_2
k_vct = np.array([k_0, k_1, k_2])
return PlanarFitCoefs(k_vct, tilt_coef, phi, theta)
def rotation_matrix(theta, axis, active=False):
"""Generate rotation matrix for a given axis
The default (``active=False``) output matrix corresponds to passive
("alias") rotations, if used to post-multiply row vectors.
Parameters
----------
active: bool, optional
Whether to return active transformation matrix.
Note
----
See ``rotate_coordinates`` for rest of parameters.
Returns
-------
R_theta : numpy.ndarray
3x3 rotation matrix
"""
theta = np.radians(theta)
if axis == 0:
R_theta = np.array([[1, 0, 0],
[0, np.cos(theta), -np.sin(theta)],
[0, np.sin(theta), np.cos(theta)]])
elif axis == 1:
R_theta = np.array([[np.cos(theta), 0, np.sin(theta)],
[0, 1, 0],
[-np.sin(theta), 0, np.cos(theta)]])
else:
R_theta = np.array([[np.cos(theta), -np.sin(theta), 0],
[np.sin(theta), np.cos(theta), 0],
[0, 0, 1]])
if active:
R_theta = np.transpose(R_theta)
return R_theta
[docs]def rotate_coordinates(vectors, theta, axis=2, **kwargs):
"""Rotate vector coordinate system or vectors themselves around an axis
A right-handed coordinate system is assumed, where positive rotations
are clockwise when looking along the rotation axis from the origin
towards the positive direction. With the default ``active=False``,
each row vector :math:`i` in input coordinate system :math:`0` is
rotated around the given axis by angle :math:`\\theta`, so that it can
be expressed in coordinate system :math:`1` as follows:
.. math:: \\vec{v}_{i,1} = \\vec{v}_{i,0} R_{\\theta}
where the input and output coordinate systems are different. If the
input and output coordinate systems are the same, i.e. ``active=True``,
the vectors are transformed according to:
.. math:: \\vec{v}_{i,1} = \\vec{v}_{i,0} R_{\\theta}^\\intercal
Parameters
----------
vectors : array_like
An nx3 array of vectors with their `x`, `y`, `z` components
theta : numeric
The angle (degrees) by which to perform the rotation.
axis : int
Axis around which to perform the rotation (`x` = 0; `y` = 1;
`z` = 2).
**kwargs
Optional keywords
active : bool
Whether to return the coordinates of each vector in the rotated
coordinate system or to rotate the vectors themselves onto the
same coordinate system. Default is to perform a passive
transformation, i.e. rotation of the coordinate system.
Returns
-------
array_like
The vector array with the same shape as input with rotated
components.
"""
R_theta = rotation_matrix(theta, axis, **kwargs)
# Multiplying a vector by R_theta, as defined above, transforms the
# vector coordinates from the original coordinate system to the new,
# rotated coordinate frame. Note that the rotation matrix
# post-multiplies the vector matrix. If rotating the vectors
# themselves, then the *transposed* rotation matrix post-multiplies the
# matrix of vectors.
return np.dot(vectors, R_theta)
[docs]def rotate_wind3d(wind3D, method="PF", **kwargs):
"""Transform 3D wind vectors to reference mean streamline coordinate system
Use double rotation, triple rotation, or planar fit methods (Wilczak et
al. 2001; Handbook of Micrometeorology).
This is a general coordinate rotation tool, so can handle inputs such
as wind speed and acceleration from inertial measurement units.
Parameters
----------
wind3D : numpy.ndarray
A 2-D (Nx3) array with `x`, `y`, and `z` vector components,
expressed in a right-handed coordinate system. These may represent
`u`, `v`, and `w` wind speed vectors, or inertial acceleration.
method : {"DR", "TR", "PF"}, optional
One of: "DR", "TR", "PF" for double rotation, triple rotation, or
planar fit.
k_vector : numpy.ndarray, optional
1-D array (1x3) unit vector parallel to the new z-axis, when
"method" is "PF" (planar fit). If not supplied, then it is
calculated.
Returns
-------
RotatedVectors : namedtuple
namedtuple with (index, name in brackets):
numpy.ndarray [0, `rotated`]
2-D array (Nx3) Array with rotated vectors
numpy.ndarray [1, `phi_theta`]
1-D array (1x2) :math:`\\phi` and :math:`\\theta` rotation
angles. The former is the estimated angle between the vertical
unit coordinate vector in the rotated frame and the vertical
unit vector in the measured uv plane, while the latter is wind
direction in the measured uv plane. Note these are *not* roll
and pitch angles of the measurement coordinate frame relative
to the reference frame.
"""
if method not in _VECTOR_ROTATION_METHODS:
msg = "method must be one of "
raise ValueError(msg + ', '.join("\"{}\"".format(m) for m in
_VECTOR_ROTATION_METHODS))
if method == "PF":
if "k_vector" in kwargs:
k_vct = kwargs.get("k_vector")
else:
pfit = planarfit(wind3D)
k_vct = pfit.k_vct
j_vct = np.cross(k_vct, np.mean(wind3D, 0))
j_vct = j_vct / np.sqrt(np.sum(j_vct ** 2))
i_vct = np.cross(j_vct, k_vct)
vcts_mat = np.column_stack((i_vct, j_vct, k_vct))
vcts_new = np.dot(wind3D, vcts_mat)
phi = np.arccos(np.dot(k_vct, np.array([0, 0, 1])))
theta = np.arctan2(np.mean(wind3D[:, 1], 0),
np.mean(wind3D[:, 0], 0))
else:
# First rotation to set mean v to 0
theta = np.arctan2(np.mean(wind3D[:, 1]),
np.mean(wind3D[:, 0]))
rot1 = np.array([[np.cos(theta), -np.sin(theta), 0],
[np.sin(theta), np.cos(theta), 0],
[0, 0, 1]])
vcts1 = np.dot(wind3D, rot1)
# Second rotation to set mean w to 0
phi = np.arctan2(np.mean(vcts1[:, 2]),
np.mean(vcts1[:, 0]))
rot2 = np.array([[np.cos(phi), 0, -np.sin(phi)],
[0, 1, 0],
[np.sin(phi), 0, np.cos(phi)]])
vcts_new = np.dot(vcts1, rot2)
# Third rotation to set mean vw to 0
if method == "TR":
mean_vw = np.mean(vcts_new[:, 1] * vcts_new[:, 2])
psi = 0.5 * np.arctan2(2 * mean_vw,
np.mean(vcts_new[:, 1] ** 2) -
np.mean(vcts_new[:, 2] ** 2))
rot3 = np.array([[1, 0, 0],
[0, np.cos(psi), -np.sin(psi)],
[0, np.sin(psi), np.cos(psi)]])
vcts_new = np.dot(vcts_new, rot3)
return RotatedVectors(vcts_new, np.array([phi, theta]))
def _rot_xyz(phi, theta, psi):
"""Compute passive rotation matrix from 3 angles in 1-D array x
The matrix is meant to post-multiply row vectors.
Parameters
----------
phi : float
Angle (degrees) for the rotation around the `x`-axis.
theta : float
Angle (degrees) for the rotation around the `y`-axis.
psi : float
Angle (degrees) for the rotation around the `z`-axis.
Returns
-------
numpy.ndarray
2-D array (3x3) with rotation matrix corresponding to the rotation
sequence :math:`\\phi`, :math:`\\theta`, :math:`\\psi`.
"""
rotx = rotation_matrix(phi, 0)
roty = rotation_matrix(theta, 1)
rotz = rotation_matrix(psi, 2)
return np.transpose(np.dot(rotz, np.dot(roty, rotx)))
def euler_rotate(xyz, xyz_angles):
"""Rotate vectors using Euler angles
Parameters
----------
xyz : array_like
2-D array (Nx3) with row vectors to be rotated.
xyz_angles : array_like
2-D array (Nx3) with row vectors of angles (degrees) to be used for
rotation of ``xyz``, where the first column specifies the angle for
the first rotation around the `x`-axis, the second column the angle
for the second rotation around the `y`-axis, and the third column
the angle for the last rotation around the `z`-axis.
Returns
-------
numpy.ndarray
Array of the same shape as ``xyz`` with rotated vectors.
"""
xyz_rots = np.empty_like(xyz)
for i, v in enumerate(xyz_angles):
phi, theta, psi = v
euler_mati = _rot_xyz(phi, theta, psi)
xyz_roti = np.dot(xyz[i], euler_mati)
xyz_rots[i] = xyz_roti
return xyz_rots
def euler_rate_rotate(euler_angles, omega):
"""Rotate angular rates, given Euler angles
The Euler angular rate column vector :math:`\\Omega` consists of the
angular rates :math:`\\dot \\phi`, :math:`\\dot \\theta`, and
:math:`\\dot \\psi` around the axes in the inertial frame. It is
related to the angular rate column vector :math:`\\omega`, consisting
of the angular rates :math:`\\omega_x`, :math:`\\omega_y`, and
:math:`\\omega_z` in the body frame via a non-orthogonal transformation
matrix :math:`R`:
.. math:: \\Omega = R \\omega
This function performs this operation by using the transpose of the
input body-frame angular rate row vectors, and transposing the result
to return row vectors also.
Parameters
----------
euler_angles : array_like
2-D array (Nx2) with Euler :math:`\\phi` (roll) and :math:`\\theta`
(pitch) angles (radians) around the `x`- and `y`-axes in columns 1
and 2, respectively, representing the orientation of the sensor
orientation relative to the output frame.
omega : float 2-D array
(Nx3) with angular rates :math:`\\omega_x`, :math:`\\omega_y`, and
:math:`\\omega_z` around the body-frame `x`-, `y`-, and `z`-axes in
columns 1, 2, and 3, respectively.
Returns
-------
array_like
Array with same shape as `omega` with rotated angular rates.
"""
def rot_mat(phi, theta):
"""Transformation matrix pre-multiplying column vectors"""
return np.array([[1, np.sin(phi) * np.tan(theta),
np.cos(phi) * np.tan(theta)],
[0, np.cos(phi), -np.sin(phi)],
[0, np.sin(phi) / np.cos(theta),
np.cos(phi) / np.cos(theta)]])
omega_rots = np.empty_like(omega)
for i, v in enumerate(euler_angles):
phi, theta = v
# Multiply and transpose to original shape as row vectors
omega_rots[i] = np.dot(rot_mat(phi, theta), omega[i].T).T
return omega_rots
def _cumtrapz(x, sample_freq):
"""Cumulatively integrate signal given sampling frequency
Parameters
----------
x : array_like
A 1- or 2-D array with signal such as angular rate (deg/s) or
linear rate measurements.
sample_freq : int, float
The sampling frequency in units required for integration.
Returns
-------
array_like
Array with same shape as `x`
"""
y = integrate.cumtrapz(x, dx=(1.0 / sample_freq), axis=0, initial=0)
return y
def _butterworth_coefs(cutoff_period, sample_freq, Astop=10.0, Apass=0.5):
"""Compute Butterworth filter coefficients
Parameters
----------
cutoff_period : int, float
Cutoff period (s) defining the cutoff frequency for designing the
filter.
sample_freq : int, float
The sampling frequency in units required for integration.
Apass : float, optional
Stopband attenuation
Astop : float, optional
Passband ripple (dB)
Returns
-------
tuple
Tuple with (index, name in brackets):
numpy.ndarray [0]
Numerator (b) polynomial of the filter.
numpy.ndarray [1]
Denominator (a) polynomial of the filter.
float [2]
Padding length for the filter.
"""
# Passband and stopband cutoffs, normalized to Nyquist frequency
wp = 1.0 / (2.0 * cutoff_period) / (sample_freq / 2.0)
ws = 1.0 / cutoff_period / (sample_freq / 2.0)
N, Wn = signal.buttord(wp, ws, Apass, Astop)
bc, ac = signal.butter(N, Wn, "high")
# WATCH THIS: we need to make padlen the same as in Matlab
pdl = 3 * (max(len(ac), len(bc)) - 1)
return bc, ac, pdl
[docs]def wind3D_correct(wind_speed, acceleration, angle_rate, heading, speed,
anemometer_pos, sample_freq, Tcf, Ta,
tilt_motion=np.array([0.0, 0.0]),
tilt_anemometer=np.array([0.0, 0.0])):
"""Correct wind vector measurements from a moving platform
This is a port of S. Miller's ``motion`` Matlab function, which
implements Miller et al. (2008) approach. Coordinate frame is assumed
to be right-handed:
* `x` - positive towards the bow.
* `y` - positive towards port side.
* `z` - positive upwards.
Parameters
----------
wind_speed : numpy.ndarray
A 2-D (Nx3) array with `u`, `v`, and `w` wind speed (m/s) vectors.
acceleration : numpy.ndarray
A 2-D (Nx3) array with `x`, `y`, and `z` acceleration (m/s/s)
vectors.
angle_rate : numpy.ndarray
A 2-D (Nx3) array with angular rates (radians/s) vectors.
heading : array_like
A vector with ship heading measurements (deg) in right-hand
coordinate system.
speed : array_like
A vector with ship speed (m/s).
anemometer_pos : array_like
[`x`, `y`, `z`] position vector of anemometer, relative to motion
sensor (m).
sample_freq : float
Sampling frequency (Hz).
Tcf : float
Complimentary filter period (s).
Ta : float
High-pass filter cutoff for accelerations (s). This can be a
scalar if the same cutoff is used for the three components, or a 3
element vector to indicate cutoff period for the `x`, `y`, `z`
components.
tilt_motion : array_like, optional
[roll, pitch] vector with mean tilt (radians) relative to a
horizontal plane. Roll angle offset is positive is port side up,
and pitch is positive for bow down (see Miller 2008 for info, set
to [0 0] if unknown).
tilt_anemometer : array_like, optional
[roll, pitch] vector with mean tilt (radians) relative to a
horizontal plane. Roll angle offset is positive is port side up,
and pitch is positive for bow down (see Miller 2008 for info, set
to [0 0] if unknown).
Returns
-------
CorrectedWind3D : namedtuple
namedtuple with (index, name in brackets):
numpy.ndarray [0, `uvw_ship`]
2-D array (Nx3) with corrected wind vectors in ship-referenced
frame with z-axis parallel to gravity.
numpy.ndarray [1, `euler_angles`]
2-D array (Nx3) with Euler angles.
numpy.ndarray [2, `euler_angles_angular_rates`]
2-D array (NX3) with Euler angles from rate sensors
(unfiltered).
numpy.ndarray [3, `euler_angles_accelerations`]
2-D array (NX3) with Euler angles from accelerometers
(unfiltered).
numpy.ndarray [4, `euler_angles_slow`]
2-D array (NX3) with slow Euler angles (low pass filtered).
numpy.ndarray [5, `euler_angles_fast`]
2-D array (NX3) with fast Euler angles (high pass filtered).
numpy.ndarray [6, `mount_offset_rotations`]
2-D array (3X3) with mounting offset rotation matrix (see
Miller 2008).
numpy.ndarray [7, `uvw_earth`]
2-D array (NX3) with measured velocity, rotated to the earth
frame.
numpy.ndarray [8, `uvw_angular`]
2-D array (NX3) with velocity induced by angular motion.
numpy.ndarray [9, `uvw_linear`]
2-D array (NX3) with velocity induced by platform linear
motion.
numpy.ndarray [10, `ship_enu`]
2-D array (NX3) with ship velocity in eastward, northward, up
frame. (low pass filtered).
numpy.ndarray [11, `uvw_enu`]
2-D array (NX3) with corrected wind in eastward, northward, up
frame.
numpy.ndarray [12, `imu_enu`]
2-D (NX3) array with displacement of the motion sensor.
References
----------
Miller,S.D., Hristov,T.S., Edson,J.B., and C.A. Friehe, 2008:
Platform Motion Effects on Measurements of Turbulence and Air-Sea
Exchange Over the Open Ocean, J. Atmo. Ocean. Tech. 25(9),
1683-1694, DOI: 10.1175/2008JTECHO547.1.
"""
Astop, Apass = 10.0, 0.5 # stopband attenuation; passband Ripple (dB)
# Stop, passband cutoffs
bc, ac, pdl = _butterworth_coefs(Tcf, sample_freq,
Astop=Astop, Apass=Apass)
# EULER ANGLES: (see Edson et al., 1998)
# Low frequency tilt using accelerometers and gyro
g = np.sqrt(np.sum(np.mean(acceleration, 0) ** 2))
gyro = np.unwrap(-np.radians(heading)) # Put heading in radians and RHS
gyro0 = gyro[0] # Initial heading
gyro = gyro - gyro0 # Remove initial heading
# Uncorrected Euler angles from accelerometers and gyro
EA_acc = np.column_stack((np.arcsin(acceleration[:, 1] / g), # phi
np.arcsin(-acceleration[:, 0] / g), # theta
gyro)) # psi
# High frequency angles using angular rates
rm = signal.detrend(angle_rate, 0, "constant")
EA_rate = _cumtrapz(rm, sample_freq) # Estimated Euler angles
EA_rate_hf = signal.filtfilt(bc, ac, EA_rate, padlen=pdl, axis=0)
theta_lf = (EA_acc[:, 1] -
signal.filtfilt(bc, ac, EA_acc[:, 1], padlen=pdl))
theta = theta_lf + EA_rate_hf[:, 1]
phi_pitched = EA_acc[:, 0] / np.cos(theta_lf)
phi_lf = phi_pitched - signal.filtfilt(bc, ac, phi_pitched, padlen=pdl)
phi = phi_lf + EA_rate_hf[:, 0] # complementary-filtered phi
# Low-pass filter to retain low-frequency content, but not the offset
psi_lf = gyro - signal.filtfilt(bc, ac, gyro, padlen=pdl)
axg = np.arctan2(-acceleration[:, 0] * np.cos(phi), g)
theta_lf = axg - signal.filtfilt(bc, ac, axg, padlen=pdl)
theta = theta_lf + EA_rate_hf[:, 1]
EA_slow = np.column_stack((phi_lf, theta_lf, psi_lf))
# NONLINEAR ROTATION OF ANGULAR RATES FROM MOTION SENSOR-FRAME TO
# EARTH-FRAME This next step puts the measured angular rates into the
# earth reference frame. Motion sensor frame-based angular rates are
# first rolled based on the roll angle (phim) estimated above; they are
# then pitched *in a rolled frame* based upon the pitch angle (thetam)
# estimated above; finally, they are yawed *in a rolled and pitched
# frame* based upon both the pitch and roll (thetam and phim)
# estimates. These sequential rotations are exact for small angle RATES
# (see Goldstein) - I've found this step doesn't have much effect on
# the corrected winds.
omega = euler_rate_rotate(np.column_stack((phi, theta)), rm)
# The angular rates omega are now in the earth based frame, which is
# what we want for creating the rotation-transformation
# matrix. Re-integrate and high pass filter these angular rates to get
# the fast angles.
EA_fast = signal.filtfilt(bc, ac,
_cumtrapz(omega, sample_freq),
padlen=pdl, axis=0)
# combine high- and low-frequency angles; make 2D transformation matrix
EA = EA_slow + EA_fast
# SHIP AND ANEMOMETER OFFSET ANGLES
# Pitch and roll tilt of platform w.r.t. earth (see Wilczak et al. 2000)
phi_em, theta_em = tilt_motion[0], tilt_motion[1]
PHI_em = rotation_matrix(np.degrees(phi_em), axis=0, active=True)
THETA_em = rotation_matrix(np.degrees(theta_em), axis=1, active=True)
# Pitch and roll tilt of anemometer w.r.t. earth
phi_ea, theta_ea = tilt_anemometer[0], tilt_anemometer[1]
PHI_ea = rotation_matrix(np.degrees(phi_ea), axis=0, active=True)
THETA_ea = rotation_matrix(np.degrees(theta_ea), axis=1, active=True)
# Anemometer WRT motion sensor
M_em = np.dot(THETA_em, PHI_em)
M_ea = np.dot(THETA_ea, PHI_ea)
M_ma = np.dot(np.transpose(M_em), M_ea)
# WIND VECTOR ROTATION
# Using coordinate rotations
EA_degs = np.degrees(EA) # only for getting rotation matrix
Ur = euler_rotate(np.dot(M_ma, wind_speed.T).T, EA_degs)
# PLATFORM ANGULAR VELOCITY
uam = np.column_stack((anemometer_pos[2] * rm[:, 1] -
anemometer_pos[1] * rm[:, 2],
anemometer_pos[0] * rm[:, 2] -
anemometer_pos[2] * rm[:, 0],
anemometer_pos[1] * rm[:, 0] -
anemometer_pos[0] * rm[:, 1]))
Ua = euler_rotate(uam, EA_degs) # rotate to earth frame
# PLATFORM LINEAR VELOCITY
# High-pass filters for accelerometers
if np.isscalar(Ta):
Ta = np.repeat(Ta, 3)
bax, aax, pdlx = _butterworth_coefs(Ta[0], sample_freq,
Astop=Astop, Apass=Apass)
bay, aay, pdly = _butterworth_coefs(Ta[1], sample_freq,
Astop=Astop, Apass=Apass)
baz, aaz, pdlz = _butterworth_coefs(Ta[2], sample_freq,
Astop=Astop, Apass=Apass)
# Rotate accelerations
ae = euler_rotate(acceleration, EA_degs)
ae[:, 2] = ae[:, 2] - g # subtract gravity
up = _cumtrapz(ae, sample_freq)
Up = np.column_stack((signal.filtfilt(bax, aax, up[:, 0], padlen=pdlx),
signal.filtfilt(bay, aay, up[:, 1], padlen=pdly),
signal.filtfilt(baz, aaz, up[:, 2], padlen=pdlz)))
# SHIP SPEED: this is in the positive x-direction (assuming the bow is
# positive x direction). We low-pass filter this signal so there isnt
# double counting of linear motion from the integrated accelerometers.
# Also, the y- and z-components are zero (ie, captured by the
# integrated accelerometers).
n = len(speed)
Us = np.column_stack((speed -
signal.filtfilt(bax, aax, speed, padlen=pdlx),
np.zeros((n, 1)), np.zeros((n, 1))))
UVW = Ur + Ua + Up + Us # corrected wind vector
# Organize outputs
u_ea = np.column_stack((np.zeros((n, 1)),
np.zeros((n, 1)),
(gyro0 + np.pi / 2) * np.ones((n, 1))))
u_ea_degs = np.degrees(u_ea)
U_ship = euler_rotate(Us, u_ea_degs)
U_earth = euler_rotate(UVW, u_ea_degs)
# Platform displacement
xp = _cumtrapz(Up, sample_freq)
Xp = np.column_stack((signal.filtfilt(bax, aax, xp[:, 0], padlen=pdlx),
signal.filtfilt(bay, aay, xp[:, 1], padlen=pdly),
signal.filtfilt(baz, aaz, xp[:, 2], padlen=pdlz)))
return CorrectedWind3D(UVW, EA, EA_rate, EA_acc, EA_slow, EA_fast,
M_ma, Ur, Ua, Up, U_ship, U_earth, Xp)
def window_indices(idxs, width, step=None):
"""List of sliding window indices across an index vector
Parameters
----------
idx : numpy.ndarray
A 1-D index vector.
width : int
Window width.
step : int, optional
Step size for sliding windows.
Returns
-------
list
List of tuples, each with the indices for a window.
"""
return zip(*(idxs[i::step] for i in range(width)))
def get_VickersMahrt(x, zscore_thr, nrep_thr):
"""Vickers Mahrt computations in a window
Parameters
----------
x : numpy.ndarray
A 1-D signal vectors to be despiked.
zscore_thr : float
The zscore beyond which an observation is considered to be an
outlier.
nrep_thr : int
The maximum number of consecutive outliers that should occur for a
spike to be detected.
Returns
-------
VickersMahrt : tuple
Tuple with (index, name in brackets):
numpy.ndarray [0, `x`]
1-D array with interpolated input.
numpy.int [1, `nspikes`]
Number of spikes detected.
numpy.int [2, `ntrends`]
Number of outlier trends detected.
numpy.ndarray [3, `kclass`]
1-D array of the same size as input, indicating the
classification `k` for each measurement. k=0: measurement
within plausibility range, k=[-1 or 1]: measurement outside
plausibility range, abs(k) > 1: measurement is part of an
outlier trend.
"""
z = zscore(x)
# Discern between outliers above and below the threshold
isout_hi, isout_lo = (z > zscore_thr), (z < -zscore_thr)
n_outs = sum(isout_hi) + sum(isout_lo)
# Set categorical x: 0 (ok), 1 (upper outlier), -1 (lower outlier)
xcat = np.zeros(x.shape, dtype=np.int)
xcat[isout_hi] = 1
xcat[isout_lo] = -1
if n_outs > 0:
# Create tuples for each sequence indicating whether it's outliers
# and its length.
grps = [(val, len(list(seq))) for val, seq in groupby(xcat)]
vals = np.array([k[0] for k in grps])
lens = np.array([k[1] for k in grps])
is_spike = (vals != 0) & (lens <= nrep_thr)
nspikes = sum(is_spike)
# We tally trends as well
is_trend = (vals != 0) & (lens > nrep_thr)
ntrends = sum(is_trend)
# If we have trends, loop through each one, knowing the length of
# the spike and where we are along the input series.
if ntrends > 0:
trends = zip(vals[is_trend], lens[is_trend],
np.cumsum(lens)[is_trend])
for i in trends:
# Double the categorical value for trends and consider
# these OK for interpolation. So abs(xcat) > 1 are trends.
xcat[(i[2] - i[1]):i[2]] = i[0] * 2
# Now we are left with true outliers to interpolate
x_new = x.copy()
xidx = np.arange(len(x)) # simple index along x
isok = (xcat == 0) | (abs(xcat) > 1) # ok if 0 or trend
s = itpl.InterpolatedUnivariateSpline(xidx[isok],
x[isok], k=1)
x_itpl = s(xidx[~ isok])
x_new[~ isok] = x_itpl
return VickersMahrt(x_new, nspikes, ntrends, xcat)
else:
return VickersMahrt(x, 0, 0, xcat)
[docs]def despike_VickersMahrt(x, width, zscore_thr, nreps, step=None,
nrep_thr=None, interp_nan=True):
"""Vickers and Mahrt (1997) signal despiking procedure
The interpolating function is created by the
InterpolatedUnivariateSpline function from the scipy package, and uses
a single knot to approximate a simple linear interpolation, so as to
keep the original signal as untouched as possible.
Parameters
----------
x : numpy.ndarray
A 1-D signal vectors to be despiked.
width : int
Window width.
step : int, optional
Step size for sliding windows. Default is one-half window width.
zscore_thr : float
The zscore beyond which an observation is considered to be an
outlier. Default is zero.
nrep_thr : int, optional
The maximum number of consecutive outliers that should occur for a
spike to be detected. Default: 3.
nreps: int, optional
How many times to run the procedure. Default is zero.
interp.nan : bool, optional
Whether missing values should be interpolated. Interpolated values
are computed after despiking. Default is True.
Returns
-------
VickersMahrt : tuple
Tuple with (index, name in brackets):
numpy.ndarray [0, `x`]
1-D array with despiked input.
numpy.int [1, `nspikes`]
Number of spikes detected.
numpy.int [2, `ntrends`]
Number of outlier trends detected.
numpy.int [3, `kclass`]
Number of iterations performed.
"""
if step is None: # set default step as
step = width / 2 # one-half window size
if nrep_thr is None:
nrep_thr = 3
nspikes, ntrends = 0, 0
xout = x.copy()
# Following EddyUH implementation, fill missing values with nearest
# value for the purpose of spike and trend detection. This ensures
# that we can always calculate zscores.
is_missing = np.isnan(np.array(xout)) # need to coerce to np array
xidx = np.arange(len(xout)) # simple index along x
f_itpl = itpl.interp1d(xidx[~ is_missing], xout[~ is_missing],
kind="nearest", fill_value="extrapolate")
x_nonan = f_itpl(xidx)
# Get a series of tuples with indices for each window
idxl = window_indices(range(len(x)), width, step)
nloops = 0
while nloops < nreps:
nspikes_loop = 0
for w in idxl:
winidx = [i for i in w] # indices of current window
xwin = x_nonan[winidx] # values for the current window
xnew, nsp, ntr, xmask = get_VickersMahrt(xwin, zscore_thr,
nrep_thr)
nspikes_loop += nsp
ntrends += ntr
x_nonan[winidx] = xnew
nloops += 1
# Increase zscore_thr by 0.3, instead of 0.1 as in V&M (1997),
# following EddyUH implementation
zscore_thr += 0.3
if nspikes_loop > 0:
nspikes += nspikes_loop
else: # Stop if we haven't found any new spikes
break
# Interpolate through missing values, if requested (default).
nmissing = np.count_nonzero(is_missing)
if (nmissing > 0) and interp_nan:
s = itpl.InterpolatedUnivariateSpline(xidx[~ is_missing],
xout[~ is_missing], k=1)
x_itpl = s(xidx[is_missing])
xout[is_missing] = x_itpl
return VickersMahrt(xout, nspikes, ntrends, nloops)