Source code for fluxer.eddycov.tilt_windows

# pylint: disable=too-many-locals,invalid-name,no-member

"""Calculate tilt angles in flux files using non-overlapping moving windows

"""

import itertools as itrt
import datetime
import os.path as osp
import logging
import numpy as np
import pandas as pd
from scipy.stats import circmean
import matplotlib as mpl
import matplotlib.pyplot as plt
from fluxer.eddycov.settings import FLUX_FLAGS
from fluxer.eddycov.parse_ecfile import (prepare_period, NavigationError,
                                         SonicError)
from fluxer.eddycov.flux import planarfit

__all__ = ["TiltWindows"]

logger = logging.getLogger(__name__)
# Add the null handler if importing as library; whatever using this library
# should set up logging.basicConfig() as needed
logger.addHandler(logging.NullHandler())


def _fname2time(fname):
    """Construct a time stamp from a file name"""
    return datetime.datetime.strptime(fname[-18:-4], "%Y%m%d%H%M%S")


def _make_key(delta_t):
    """Return a function with attribute to track current upper bound"""
    def key(fname):
        """Set upper bound for current time window given a file name"""
        if key.upper_bound is None:
            key.upper_bound = _fname2time(fname) + delta_t
        else:
            tstamp = _fname2time(fname)
            while tstamp >= key.upper_bound:
                key.upper_bound += delta_t
        return key.upper_bound
    key.upper_bound = None
    return key


def _make_windows(ec_files, win_minutes):
    """Build list of tuples linking list of files to a time window"""
    # Time delta
    tdelta = datetime.timedelta(minutes=win_minutes)
    # Build a list of tuples (key=window start timestamp, file-list)
    win_files = [(pd.to_datetime(k), list(grp)) for k, grp in
                 itrt.groupby(ec_files, key=_make_key(tdelta))]
    return win_files


[docs]class TiltWindows: """Well-defined pandas DataFrame""" def __init__(self, ec_files=[], win_minutes=0): """Set up attributes for handling tilt window data Parameters ---------- ec_files : list, optional List of file names. Default is empty list. win_minutes : int, optional Number of minutes to build windows, based on file names. Default is zero. """ win_files = _make_windows(ec_files, win_minutes) # Set up dataframe to place tilt angles and other details cols = (["failed_tilt_flag", "nfiles", "nfiles_ok"] + ["n{0}".format(i) for i in FLUX_FLAGS] + ["phi_motion", "theta_motion", "phi_sonic", "theta_sonic"] + ["wind_direction"]) tilts = pd.DataFrame(index=[x[0] for x in win_files], columns=cols) tilts.loc[:, cols[1]] = [len(x[1]) for x in win_files] tilts.loc[:, cols[-5:]] = tilts[cols[-5:]].astype(np.float).values # Zero all the columns except nfiles and parameters tilts.loc[:, cols[0]] = False tilts.loc[:, cols[2:-5]] = 0 self.width = win_minutes self.tilts = tilts self.win_files = win_files # [Original comment: create flags for the 6 possible sources of # "bad" data, flag=0 means data good] in each input file flags = dict.fromkeys(FLUX_FLAGS, False) # We set up a dataframe with all files to process as index, and all # the flags as columns. This is the basis for our summary output # file; other columns (e.g. flux summary calculations for the # period) will be appended as we loop. prep_flags = pd.DataFrame(flags, index=[osp.basename(x) for x in ec_files]) self.prep_flags = prep_flags def __str__(self): # Show tilts attribute first, followed by the number of windows # found, followed by the width (window width) attribute msg = ("TiltWindows({0.tilts}, nwindows={1}, " "width={0.width!r})") return msg.format(self, len(self.win_files)) __repr__ = __str__
[docs] def get_tilts_planarfit(self, config): """Compute tilt angles in all windows using planar fit method Parameters ---------- config : OrderedDict Dictionary with parsed configuration file. Returns ------- pandas.DataFrame tilt angles for each window. """ wind_names = ["wind_speed_u", "wind_speed_v", "wind_speed_w"] mot3d_names = ["acceleration_x", "acceleration_y", "acceleration_z", "rate_x", "rate_y", "rate_z"] # Iterate over the list of tuples (key=window time stamp, file-list) msg_suffix = " tilt angles via planar fit for window %s" for (k, l) in self.win_files: logger.info("Begin" + msg_suffix, k) ec_list = [] ec_ok_files = [] # Iterate over the file-list for ec_file in l: logger.info("Begin preparing %s", osp.basename(ec_file)) # Get a file name prefix to be shared by the output files # from this period. Note iname is THE SAME AS THE INDEX IN # prep_flags iname = osp.basename(ec_file) # Try to prepare file; set flags, and populate the list of # OK files and summaries. Continue if preparation fails. try: ec_prep, prep_flags = prepare_period(ec_file, config) except (NavigationError, SonicError) as err: self.prep_flags.loc[iname, "failed_prep_flag"] = True for flagname, flag in err.flags.items(): self.prep_flags.loc[iname, flagname] = flag self.tilts.loc[k, "n" + flagname] += np.int(flag) logger.info("Skip %s", ec_file) continue else: for flagname, flag in prep_flags.iteritems(): self.prep_flags.loc[iname, flagname] = flag self.tilts.loc[k, "n" + flagname] += np.int(flag) ec_list.append(pd.DataFrame(ec_prep.mean()).transpose()) ec_ok_files.append(ec_file) logger.info("End preparing %s", osp.basename(ec_file)) l = ec_ok_files n_ok = len(ec_list) self.tilts.loc[k, "nfiles_ok"] = n_ok if n_ok < 3: logger.info("Aborting window without enough adequate files %s", k) for ec_file in l: self.tilts.loc[k, "failed_tilt_flag"] = True continue else: ec_win = pd.concat(ec_list, keys=ec_ok_files) logger.info("Combining %s periods in window %s", n_ok, k) # Extract means from each period file for planar fitting wnd = ec_win.loc[:, wind_names] mot3d = ec_win.loc[:, mot3d_names[0:3]] # Tilt angles for motion sensor mot3d_pfit = planarfit(mot3d.values) self.tilts.loc[k, "phi_motion"] = mot3d_pfit.phi self.tilts.loc[k, "theta_motion"] = mot3d_pfit.theta # Tilt angles for sonic anemometer sonic_pfit = planarfit(wnd.values) self.tilts.loc[k, "phi_sonic"] = sonic_pfit.phi self.tilts.loc[k, "theta_sonic"] = sonic_pfit.theta wind_direction_mean = circmean(ec_win["wind_direction"].values, high=np.degrees(2 * np.pi)) self.tilts.loc[k, "wind_direction"] = wind_direction_mean logger.info("End" + msg_suffix, k)
[docs] def plot(self, fig_file, step=1, title=None): """Generate a plot of tilt angles and diagnostics Parameters ---------- fig_file : str Path to file to write plot to step : int, optional Step (stride) to subsample the time series by. Default is 1. title : str Title for plot. Default is ``None``. """ # Workaround for stupid problem in Debian now: plt.rcParams['mathtext.fontset'] = 'stix' tilts = self.tilts # Exclude missing data from plots so lines work # Set up wind direction vector def plot_quiver(tseries, ax, step, yoffset=1, **kwargs): """Return an axis with quiver plot for a time series Parameters ---------- tseries : pandas.Series Time series with angles (radians) to plot. ax : matplotlib.axes Axis to plot in. step : int Step (stride) to subsample the time series by. yoffset : int Location for quiver in the y-coordinate. **kwargs : optional keyword arguments Arguments passed to ``pyplot.quiver``. Returns ------- Matplotlib axis """ pivot = kwargs.pop("pivot", "mid") width = kwargs.pop("width", 0.015) scale = kwargs.pop("scale", 1 / 0.2) units = kwargs.pop("units", "inches") ts = tseries.dropna()[::step] tstamps = mpl.dates.date2num(ts.index.to_pydatetime()) ts_cos = np.cos(ts).values.flatten() ts_sin = np.sin(ts).values.flatten() qvr = ax.quiver(tstamps, np.ones(ts.shape[0]) * yoffset, ts_sin, ts_cos, pivot=pivot, units=units, width=width, scale=scale, **kwargs) return qvr # Set up phi_motion vector mot_phi = tilts[["phi_motion"]] # Set up theta_motion vector mot_theta = tilts[["theta_motion"]] # Set up phi_sonic vector wnd3d_phi = tilts[["phi_sonic"]] # Set up theta_sonic vector wnd3d_theta = tilts[["theta_sonic"]] fig = plt.figure(figsize=(15, 12)) # Main plotting grid grd = mpl.gridspec.GridSpec(2, 1) grd.update(hspace=0.1) # Top plots grd1 = mpl.gridspec.GridSpecFromSubplotSpec(5, 1, subplot_spec=grd[0], hspace=0.015) # Top line plot ax1 = plt.subplot(grd1[1:, :]) # Corresponding quiver plot above ax0 = plt.subplot(grd1[0, :], sharex=ax1) # Bottom plots grd2 = mpl.gridspec.GridSpecFromSubplotSpec(5, 1, subplot_spec=grd[1], hspace=0.015) # Bottom line plot ax3 = plt.subplot(grd2[1:, :], sharex=ax1) # Corresponding quiver plot above ax2 = plt.subplot(grd2[0, :], sharex=ax1) # Plotting # Quiver colors as list of dictionaries to match the rest q_colors = [x for x in plt.rcParams["axes.prop_cycle"]] q_yticks = np.array([1, 1.1]) # quiver y ticks q_ylims = [np.min(q_yticks) - 0.05, np.max(q_yticks) + 0.05] # Top plot_quiver(mot_phi, ax0, step=step, yoffset=q_yticks[0], **q_colors[0]) plot_quiver(wnd3d_phi, ax0, step=step, yoffset=q_yticks[1], **q_colors[1]) ax0.yaxis.set_visible(False) ax0.set_ylim(q_ylims) ax0.tick_params(axis="x", bottom="off", labelbottom="off") ax1.plot("phi_motion", data=mot_phi.dropna()) ax1.plot("phi_sonic", data=wnd3d_phi.dropna()) ax1.set_ylabel(r"Roll $\phi$") ax1.xaxis.set_visible(False) # Bottom plot_quiver(mot_theta, ax2, step=step, yoffset=q_yticks[0], **q_colors[0]) plot_quiver(wnd3d_theta, ax2, step=step, yoffset=q_yticks[1], **q_colors[1]) ax2.yaxis.set_visible(False) ax2.set_ylim(q_ylims) ax2.tick_params(axis="x", bottom="off", labelbottom="off") ax3.plot("theta_motion", data=mot_theta.dropna()) ax3.plot("theta_sonic", data=wnd3d_theta.dropna()) ax3.set_ylabel(r"Pitch $\theta$") ax3.set_xlabel("") # Legend leg = ax3.legend(loc=9, bbox_to_anchor=(0.5, -0.05), frameon=False, borderaxespad=0, ncol=2) leg.get_texts()[0].set_text("motion sensor") leg.get_texts()[1].set_text("sonic anemometer") fig.savefig(fig_file, bbox_inches="tight")
if __name__ == "__main__": # Test defaults tlt = TiltWindows() # Generate 6 hours (20 min x 30) of time stamps for filenames tstamps = pd.date_range("2016-01-01 00:30:00", periods=30, freq="20min") files = tstamps.strftime("EC_%Y%m%d%H%M%S.csv") tlt = TiltWindows(files, 120) tlt