#!/usr/bin/env python3
# Copyright (C) 2020-2024 Gabriele Bozzola
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, see <https://www.gnu.org/licenses/>.
"""The :py:mod:`~.series` module provides a base class :py:class:`~.BaseSeries`
for representing and handling series (from which :py:class:`~.TimeSeries` and
:py:class:`~.FrequencySeries` are derived).
:py:class:`~.BaseSeries` handles series that have a independent variable ``x``
and a dependent variable ``y``. The derived classes have to implement setters
and getters if they need to rename these variables (e.g. ``x -> t``). The
independent variable has to be monotonically increasing.
:py:class:`~.BaseSeries` implements several methods for operations on series.
Most of these methods are available in two flavors: those that return a new
:py:class:`~.BaseSeries`, and those which modify the object in place. The latter
have names with imperative verbs.
This module also provides the useful function :py:func:`~.sample_common`, which
takes a list of series and resamples them to their common points.
"""
import warnings
from typing import Tuple
import numpy as np
import numpy.typing as npty
from scipy import integrate, interpolate, signal
from kuibit.attr_dict import AttributeDictionary
from kuibit.numerical import BaseNumerical
from kuibit.tensor import Tensor
class _AttributeDictionaryNumPy(AttributeDictionary):
"""Class that maps a dictionary to attributes and that has a method
``to_numpy``.
This is used only to fake being Pandas so that Series can be plotted with
matplotlib. See comments in BaseSeries.
"""
def to_numpy(self):
# This method only makes sense in the context of faking pandas. See
# comments in BaseSeries about this.
return self.values
# Note, we test this class testing its derived class TimeSeries
[docs]class BaseSeries(BaseNumerical):
"""Base class (not intended for direct use) for generic series data in
which the independendent variable x is sorted.
This class is already rich of features.
.. note:
Derived class should define setters and getters to handle ``x``
and ``y``. This is where the data is stored.
The idea is the following. The actual data is stored in the
``BaseSeries` properties ``data_x`` and ``data_y``. These are
accessible from the derived classes. However, we don't want the
derived classes to use directly ``data_x`` and ``data_y``: they
should use something that clearly inform the user of their meaning,
like ``t`` or ``f`` (time or frequency). To do this, we have to
define getters and setters that access and modify ``data_x``
and ``y`` but use more meaningful names. To define a getters,
simply use the ``@property`` decorator:
.. code-block:: python
@property
def t(self):
return self.data_x
With these, ``.t`` will return ``self.data_x``. For a setter,
.. code-block:: python
@t.setter
def t(self, t):
# This is defined BaseClass
self.data_x = t
This is called when with ``.t = something``. Once these are defined,
the derived classes should use their getters and setters.
:ivar data_x: Independent variable.
:vartype data_x: 1D NumPy array or float
:ivar y: Dependent variable.
:vartype y: 1D NumPy array or float
:ivar spline_real: Coefficients for a spline represent of the real part
of y.
:vartype spline_real: Tuple
:ivar spline_imag: Coefficients for a spline represent of the real part
of y.
:vartype spline_imag: Tuple
"""
@staticmethod
def _make_array(x):
"""Return a NumPy array version of x (if x is not already an array)."""
return np.atleast_1d(x) if not isinstance(x, np.ndarray) else x
def _return_array_if_monotonic(self, x_array):
"""Return the import array if it has length 1 or if it is
monotonically increasing. Otherwise return error.
We assume x_array is an array. We will not check for this, it is up to
the developer to guarantee this. If this is not true, some errors will
be thrown.
:param x_array: Array to check if it is monotonically increasing.
:type x_array: 1d NumPy array
:returns: Input array, if increasing monotonically.
:rtype: 1d NumPy array
"""
if len(x_array) > 1:
# Here we compute directly the diff because it seems faster
# than using np.diff
# Example:
# self.x = [1,2,3]
# self.x[1:] = [2, 3]
# self.x[:-1] = [1, 2]
# dx = [1,1]
dx = x_array[1:] - x_array[:-1]
if dx.min() <= 0:
# HACK: To provide more useful information we assume
# that the derived classes are named like TimeSeries.
# Then, we remove the "series"
name = type(self).__name__
x_name = name[:-6]
raise ValueError(f"{x_name} not monotonically increasing")
return x_array
def __init__(self, x, y, guarantee_x_is_monotonic=False):
"""When guarantee_x_is_monotonic is True no checks will be perform to
make sure that x is monotonically increasing (increasing performance).
This should is used internally whenever a new series is returned from
self, since we have already checked that data_x is good.
:param x: Independent variable.
:type x: 1d NumPy array or list
:param y: Dependent variable.
:type y: 1d NumPy array or list
:param guarantee_x_is_monotonic: Whether we can skip the check on monotonicity.
:param guarantee_x_is_monotonic: bool
"""
x_array = self._make_array(x)
y_array = self._make_array(y)
if len(x_array) != len(y_array):
raise ValueError("Data length mismatch")
if len(x_array) == 0:
raise ValueError("Trying to construct empty Series.")
if not guarantee_x_is_monotonic:
x_array = self._return_array_if_monotonic(x_array)
# The copy is because we don't want to change the input values
self.__data_x = x_array.copy()
self.__data_y = y_array.copy()
# The data is stored in the members self.data_x and self.data_y. We
# will never access these directly. We have setters and getters to that
# we can do stuff when variables change. For example, we want to
# compute or update splines. The setter and getters are for x and y
# We keep this flag around to know when we have to recompute the
# splines. Operations that invalidate the splines MUST reset this flag
# to True.
self.invalid_spline = True
# Here we also define the splines as empty objects so that we know
# that they are attributes of the class and they are not uninitialized
self.spline_real = None
self.spline_imag = None
@property
def x(self):
return self.__data_x
@x.setter
def x(self, x):
x_array = self._make_array(x)
if len(x_array) != len(self.x):
raise ValueError("You cannot change the length of the series")
x_array = self._return_array_if_monotonic(x_array)
# This series should own the data, so we copy (to avoid accidentally
# changing some other variable).
# If you do self.x = z
# and then self.x = *2
# z will change (if we don't copy)
self.__data_x = x_array.copy()
# Invalidate the spline
self.invalid_spline = True
@property
def y(self):
return self.__data_y
@y.setter
def y(self, y):
y_array = self._make_array(y)
if len(y_array) != len(self.__data_y):
raise ValueError("You cannot change the length of the series")
# This series should own the data, so we copy (to avoid accidentally
# changing some other variable)
self.__data_y = y_array.copy()
# Invalidate the spline
self.invalid_spline = True
# Here is where we pretend to be pandas. We want to be able to plot our
# series with matplotlib. Unfortunately, there is no easy way to provide a
# custom object to the plot functions. However, matplotlib has a special
# hook for pandas in the function matplotlib.cbook.index_of. In this
# function is checked if the index property is available, in which case,
# index.values and values are returned. We use this to make our objects
# plottable.
# The function in matplotlib < 3.5.3 is:
# try:
# return y.index.values, y.values
# except AttributeError:
# y = _check_1d(y)
# return np.arange(y.shape[0], dtype=float), y
#
# It got updated to
# try:
# return y.index.to_numpy(), y.to_numpy()
# except AttributeError:
# pass
# try:
# y = _check_1d(y)
# except (np.VisibleDeprecationWarning, ValueError):
# # NumPy 1.19 will warn on ragged input, and we can't actually use it.
# pass
# else:
# return np.arange(y.shape[0], dtype=float), y
# raise ValueError('Input could not be cast to an at-least-1D NumPy array')
#
# Let us try to be compatible with both (luckily, we can easily do that).
#
# For the older version, if we provide index.values and values, we can
# return x and y.
#
# For the newer version, we also have to provide the correct methods named
# `to_numpy` to both this class and the object that is return by the index
# attribute. So, we define a _AttributeDictionaryNumPy class that does that.
[docs] def to_numpy(self):
"""Return the data as NumPy array. Equivalent to ``self.y``.
This function is here to enable compatibility matplotlib.
"""
return self.y
@property
def values(self):
"""Fake pandas properties, to make Series objects plottable by
matplotlib.
"""
return self.y
@property
def index(self):
"""Fake pandas properties, to make Series objects plottable by
matplotlib.
"""
return _AttributeDictionaryNumPy({"values": self.x})
@property
def mask(self):
"""Return where the data is valid (according to the mask).
:returns: Array of True/False of the same length of the data.
False where the data is valid, true where is not.
:rtype: 1D array of bool
"""
if self.is_masked():
return np.ma.getmask(self.y)
return np.zeros(len(self), dtype=bool)
@property
def xmin(self):
"""Return the minimum of the independent variable x.
:rvalue: Minimum of x.
:rtype: float
"""
return self.x[0]
@property
def xmax(self):
"""Return the maximum of the independent variable x.
:rvalue: Maximum of x
:rtype: float
"""
return self.x[-1]
[docs] def is_regularly_sampled(self):
"""Return whether the series is regularly sampled.
If the series is only one point, an error is raised.
:returns: Is the series regularly sampled?
:rtype: bool
"""
if len(self) == 1:
raise RuntimeError(
"Series is only one point, "
"it does not make sense to compute dx"
)
dx = self.x[1:] - self.x[:-1]
return np.allclose(dx, dx[0], atol=1e-14)
def __len__(self):
"""The number of data points."""
return len(self.x)
def __iter__(self):
for x, y in zip(self.x, self.y):
yield x, y
[docs] def is_complex(self):
"""Return whether the data is complex.
:returns: True if the data is complex, false if it is not.
:rtype: bool
"""
return issubclass(self.y.dtype.type, complex)
[docs] def is_masked(self) -> bool:
"""Return whether the x or y are masked.
:returns: True if the x or y are masked, false if it is not.
:rtype: bool
"""
return isinstance(self.y, np.ma.MaskedArray) or isinstance(
self.x, np.ma.MaskedArray
)
[docs] def x_at_maximum_y(self):
"""Return the value of x when y is maximum.
:returns: Value of x when y is maximum.
:rtype: float
"""
return self.x[np.argmax(self.y)]
[docs] def x_at_minimum_y(self):
"""Return the value of x when y is minimum.
:returns: Value of x when y is minimum.
:rtype: float
"""
return self.x[np.argmin(self.y)]
[docs] def x_at_abs_maximum_y(self):
"""Return the value of x when abs(y) is maximum.
:returns: Value of x when abs(y) is maximum.
:rtype: float
"""
return self.x[np.argmax(np.abs(self.y))]
[docs] def x_at_abs_minimum_y(self):
"""Return the value of x when abs(y) is minimum.
:returns: Value of x when abs(y) is minimum.
:rtype: float
"""
return self.x[np.argmin(np.abs(self.y))]
def _local_extrema(
self, y: npty.NDArray, *args, include_edges: bool = True, **kwargs
) -> Tuple[npty.NDArray]:
"""Use SciPy's ``find_peaks`` to find the local minima and maxima.
Unkown arguments are passed to ``find_peaks``.
If the signal is complex, the absolute value is taken.
If ``include_edges`` is True, the edges are considered among the
possible extrema.
:returns: Coordinate and value of the peaks.
:rtype: Tuple of NumPy arrays
"""
# TODO (FEATURE): Improve accuracy
#
# We can use splines or quadratic approximations to improve the location
# the peak (ie, find it within a cell)
offset = 0
if include_edges:
# https://stackoverflow.com/a/60096220
min_y = (min(y),)
y = np.concatenate((min_y, y, min_y))
offset = 1
peak_indices, _ = signal.find_peaks(y, *args, **kwargs)
# If we included the edges, we added an extra point, se we have to
# remove the offset
peak_indices = peak_indices - offset
return self.x[peak_indices], self.y[peak_indices]
[docs] def local_maxima(
self, *args, include_edges: bool = True, **kwargs
) -> Tuple[npty.NDArray]:
"""Use SciPy's ``find_peaks`` to find the local maxima.
Unkown arguments are passed to ``find_peaks``.
If the signal is complex, the absolute value is taken.
If ``include_edges`` is True, the edges are considered among the
possible maxima.
:returns: Coordinate and value of the peaks.
:rtype: Tuple of NumPy arrays
"""
y = abs(self.y) if self.is_complex() else self.y
return self._local_extrema(
y, *args, include_edges=include_edges, **kwargs
)
[docs] def local_minima(
self, *args, include_edges: bool = True, **kwargs
) -> Tuple[npty.NDArray]:
"""Use SciPy's ``find_peaks`` to find the local minima.
Unkown arguments are passed to ``find_peaks``.
If the signal is complex, the absolute value is taken.
If ``include_edges`` is True, the edges are considered among the
possible minima.
:returns: Coordinate and value of the minima.
:rtype: Tuple of NumPy arrays
"""
y = -abs(self.y) if self.is_complex() else -self.y
return self._local_extrema(
y, *args, include_edges=include_edges, **kwargs
)
def _make_spline(self, *args, k=3, s=0, **kwargs):
"""Private function to make spline representation of the data.
This function is not meant to be called directly.
``k`` is the degree of the spline fit. It is recommended to use cubic
splines. Even values of ``k`` should be avoided especially with small ``s``
values. 1 <= k <= 5
Unknown arguments are pass to ``scipy.interpolate.splrep``.
:param k: Order of the spline representation.
:type k: int
:param s: Smoothing of the spline.
:type s: float
"""
if len(self) < k:
raise ValueError(
f"Too few points to compute a spline of order {k}"
)
if self.is_masked():
raise RuntimeError("Splines with masked data are not supported.")
self.spline_real = interpolate.splrep(
self.x, self.y.real, k=k, s=s, *args, **kwargs
)
if self.is_complex():
self.spline_imag = interpolate.splrep(
self.x, self.y.imag, k=k, s=s, *args, **kwargs
)
self.invalid_spline = False
[docs] def evaluate_with_spline(self, x, ext=2):
"""Evaluate the spline on the points ``x``.
Values outside the interval are extrapolated if ``ext=0``, set to 0 if
``ext=1``, raise a ``ValueError`` if ``ext=2``, or if ``ext=3``, return
the boundary value.
This method is meant to be used only if you want to use a different ext
for a specific call, otherwise, just use __call__.
:param x: Array of x where to evaluate the series or single x.
:type x: 1D NumPy array of float
:param ext: How to deal values outside the bounaries. Values outside the
interval are extrapolated if ``ext=0``, set to 0 if
``ext=1``, raise a ValueError if ``ext=2``, or if ``ext=3``,
return the boundary value.
:type ext: int
:returns: Values of the series evaluated on the input x.
:rtype: 1D NumPy array or float
"""
if self.invalid_spline:
self._make_spline()
y_real = interpolate.splev(x, self.spline_real, ext=ext)
if self.is_complex():
y_imag = interpolate.splev(x, self.spline_imag, ext=ext)
ret = y_real + 1j * y_imag
else:
ret = y_real
# When this method is called with a scalar input, at this point, ret
# would be a 0d NumPy scalar array. What's that? - you may ask. I have
# no idea, but the user is expecting a scalar as output. Hence, we cast
# the 0d array into at "at_least_1d" array, then we can see its length
# and act consequently.
ret = np.atleast_1d(ret)
return ret if len(ret) > 1 else ret[0]
def __call__(self, x):
"""Evaluate the spline on the points x. If the value is outside the
range, a ValueError will be raised.
"""
# We call the spline only if we need to.
# TODO (REFACTORING): This is not a Pythonic way to write this function.
#
# The main problem is that it is is not vectorized. It is also not
# efficient, we are going over the array a lot of times, re-checking the
# same elements over and over. Also, we are not considering the floating
# point arithmetic, we should allow for some tolerance.
# First we consider the scalar case
if not hasattr(x, "__len__"):
if x in self.x:
return self.y[np.searchsorted(self.x, x)]
return self.evaluate_with_spline(x, ext=2)
ret = np.zeros(len(x), dtype=type(self.y[0]))
# Hash-maps are more efficient than searching every time through the
# array, but there is some overhead cost in defining the dictionary.
# Experiments show that it is sill more performant.
dic_data = dict(zip(self.x, self.y))
for index, elem in enumerate(x):
if elem in self.x:
ret[index] = dic_data[elem]
else:
ret[index] = self.evaluate_with_spline(elem, ext=2)
return ret
[docs] def copy(self):
"""Return a deep copy.
:returns: Deep copy of the series.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
# The following is more complicated copy constructor that is designed
# to copy also the spline information without re-computing it.
# This can speed up some computations.
copied = type(self).__new__(self.__class__)
# We don't use the setters
# skipcq PYL-W0212
copied.__data_x = self.__data_x.copy()
# skipcq PYL-W0212
copied.__data_y = self.__data_y.copy()
if not self.invalid_spline:
# splines are tuples, with a direct call to the function
# tuple() we make a deep copy
copied.spline_real = tuple(self.spline_real)
if self.is_complex():
copied.spline_imag = tuple(self.spline_imag)
copied.invalid_spline = False
copied.invalid_spline = True
return copied
[docs] def resampled(self, new_x, ext=2, piecewise_constant=False):
"""Return a new series resampled from this to new_x.
You can specify the details of the spline with the method make_spline.
If you want to resample without using the spline, and you want a nearest
neighbor resampling, pass the keyword ``piecewise_constant=True``.
This may be a good choice for data with large discontinuities, where the
splines are ineffective.
:param new_x: New independent variable.
:type new_x: 1D NumPy array or list of float
:param ext: How to handle points outside the data interval.
:type ext: 0 for extrapolation, 1 for returning zero, 2 for ``ValueError``,
3 for extending the boundary
:param piecewise_constant: Do not use splines, use the nearest neighbors.
:type piecewise_constant: bool
:returns: Resampled series.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
# If x is the same, there's no need to resample
if len(self.x) == len(new_x) and np.allclose(
self.x, new_x, atol=1e-14
):
return self.copy()
# Unfortunately there is no nearest neighor resampling in SciPy's splines.
# Hence, we use directly the method interp1d.
if piecewise_constant:
interp_function = interpolate.interp1d(
self.x, self.y, kind="nearest", assume_sorted=True
)
new_y = interp_function(new_x)
else:
new_y = self.evaluate_with_spline(new_x, ext=ext)
return type(self)(new_x, new_y)
[docs] def resample(self, new_x, ext=2, piecewise_constant=False):
"""Resample the series to new independent variable new_x.
If you want to resample without using the spline, and you want a nearest
neighbor resampling, pass the keyword ``piecewise_constant=True``.
This may be a good choice for data with large discontinuities, where the
splines are ineffective.
:param new_x: New independent variable.
:type new_x: 1D NumPy array or list of float
:param ext: How to handle points outside the interval.
:type ext: 0 for extrapolation, 1 for returning zero, 2 for ValueError,
3 for extending the boundary
:param piecewise_constant: Do not use splines, use the nearest neighbors.
:type piecewise_constant: bool
"""
self._apply_to_self(
self.resampled,
new_x,
ext=ext,
piecewise_constant=piecewise_constant,
)
def _apply_binary(self, other, function, *args, **kwargs):
"""This is an abstract function that is used to implement mathematical
operations with other series (if they have the same x) or
scalars.
:py:meth:`~._apply_binary` takes another object that can be of the same
type or a scalar, and applies ``function(self.y, other.y)``, performing type
checking.
:param other: Other object.
:type other: :py:class:`~.BaseSeries` or derived class or float
:param function: Dyadic function (function that takes two arguments).
:type function: callable
:returns: Return value of ``function`` when called with self and other.
:rtype: :py:class:`~.BaseSeries` or derived class (typically)
"""
# If the other object is of the same type
if isinstance(other, type(self)):
if (len(self.x) != len(other.x)) or (
not np.allclose(other.x, self.x, atol=1e-14)
):
raise ValueError("The objects do not have the same x!")
return type(self)(
self.x,
function(self.y, other.y, *args, **kwargs),
True,
)
# If it is a number
if isinstance(other, (int, float, complex)):
return type(self)(
self.x, function(self.y, other, *args, **kwargs), True
)
# If it is a Tensor of type(self), we have to return a Tensor
if isinstance(other, Tensor) and type(self) is other.type:
# We keep this at the high level
return type(other).from_shape_and_flat_data(
other.shape,
[
function(ot, self, *args, **kwargs)
for ot in other.flat_data
],
)
# If we are here, it is because we cannot add the two objects
raise TypeError("I don't know how to combine these objects")
def __eq__(self, other):
"""Check for equality up to numerical precision."""
if isinstance(other, type(self)):
return np.allclose(self.x, other.x, atol=1e-14) and np.ma.allclose(
self.y, other.y, atol=1e-14
)
return False
# From Python's docs: In order to conform to the object model, classes that
# define their own equality method should also define their own hash method,
# or be unhashable.
# We consider series unhashable. We could make them hashable by taking
# the combined hash of all the points, but this would not be useful.
__hash__ = None
def _apply_to_self(self, f, *args, **kwargs):
"""Apply the method ``f`` to ``self``, modifying ``self``.
This is used to transform the commands from returning an object to
modifying ``self``. The function ``f`` has to return a new copy of the
object (not a reference).
"""
ret = f(*args, **kwargs)
# We avoid the setters to avoid checking for consistency because this
# was already done
self.__data_x, self.__data_y = ret.x, ret.y
# We have to recompute the splines
self.invalid_spline = True
[docs] def save(self, file_name, *args, **kwargs):
"""Saves into simple ASCII format with 2 columns ``(x, y)``
for real valued data and 3 columns ``(x, Re(y), Im(y))``
for complex valued data.
Unknown arguments are passed to ``NumPy.savetxt``.
:param file_name: Path (with extension) of the output file.
:type file_name: str
"""
if self.is_masked():
warnings.warn(
"Discarding mask information.",
RuntimeWarning,
)
if self.is_complex():
np.savetxt(
file_name,
np.transpose(
(self.x, self.y.real, self.y.imag),
*args,
**kwargs,
),
)
else:
np.savetxt(
file_name,
np.transpose((self.x, self.y), *args, **kwargs),
)
[docs] def nans_removed(self):
"""Filter out nans/infinite values.
Return a new series with finite values only.
:returns: A new series with only finite values.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
mask = np.isfinite(self.y)
return type(self)(self.x[mask], self.y[mask], True)
[docs] def nans_remove(self):
"""Filter out nans/infinite values."""
self._apply_to_self(self.nans_removed)
[docs] def mask_removed(self):
"""Remove masked value.
Return a new series with valid values only.
:returns: A new series with only valid values.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
if self.is_masked():
mask = np.invert(self.mask)
return type(self)(
np.ma.compressed(self.x[mask]),
np.ma.compressed(self.y[mask]),
True,
)
# We can copy the spline
return self.copy()
[docs] def mask_remove(self):
"""Remove masked values."""
self._apply_to_self(self.mask_removed)
[docs] def mask_applied(self, mask, ignore_existing=False):
"""Return a new series with given mask applied to the data.
If a previous mask already exists, the new mask will be added on top,
unless ``ignore_existing`` is True.
:param mask: Array of booleans that identify where the data is invalid.
This can be obtained with the method :py:meth:`~.mask`.
:type mask: 1D NumPy array
:param ignore_existing: If True, overwrite any previously existing mask.
:type ignore_existing: bool
:returns: New series with mask applied.
:rtype: :py:class:`~.BaseSeries`
"""
if self.is_masked() and not ignore_existing:
mask = np.ma.mask_or(mask, self.mask)
return type(self)(self.x, np.ma.MaskedArray(self.y, mask=mask), True)
[docs] def mask_apply(self, mask, ignore_existing=False):
"""Apply given mask.
If a previous mask already exists, the new mask will be added on top,
unless ``ignore_existing`` is True.
:param mask: Array of booleans that identify where the data is invalid.
This can be obtained with the method :py:meth:`~.mask`.
:type mask: 1D NumPy array
:param ignore_existing: If True, overwrite any previously existing mask.
:type ignore_existing: bool
"""
self._apply_to_self(
self.mask_applied, mask, ignore_existing=ignore_existing
)
[docs] def integrated(self, dx=None):
"""Return a series that is the integral computed with method of
the rectangles.
The spacing ``dx`` can be optionally provided. If provided, it will be
used (increasing performance), otherwise it will be computed internally.
:param dx: Delta x in the independent variable. If None it will be
computed internally.
:type dx: float or None
:returns: New series with the cumulative integral.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
if self.is_masked():
raise RuntimeError(
"Integration with masked data is not supported."
)
# We pass self.x only if dx was not provided
passing_x = self.x if dx is None else None
return type(self)(
self.x,
integrate.cumtrapz(self.y, x=passing_x, dx=dx, initial=0),
True,
)
[docs] def integrate(self, dx=None):
"""Integrate series with method of the rectangles.
The spacing ``dx`` can be optionally provided. If provided, it will be
used (increasing performance), otherwise it will be computed internally.
"""
self._apply_to_self(self.integrated, dx=dx)
[docs] def spline_differentiated(self, order=1):
"""Return a series that is the derivative of the current one using
the spline representation.
The optional parameter ``order`` specifies the order of the derivative.
.. warning::
The values at the boundary are typically not accurate.
:param order: Order of derivative (e.g. 2 = second derivative).
:type order: int
:returns: New series with derivative
:rtype: :py:class:`~.BaseSeries` or derived class
"""
if (order > 3) or (order < 0):
raise ValueError(f"Cannot compute differential of order {order}")
if self.invalid_spline:
self._make_spline()
if self.is_complex():
ret_value = interpolate.splev(
self.x, self.spline_real, der=order
) + 1j * interpolate.splev(self.x, self.spline_imag, der=order)
else:
ret_value = interpolate.splev(self.x, self.spline_real, der=order)
return type(self)(self.x, ret_value, True)
[docs] def spline_differentiate(self, order=1):
"""Differentiate the series using the spline representation.
The optional parameter ``order`` specifies the order of the derivative.
.. warning::
The values at the boundary are typically not accurate.
:param order: Order of derivative (e.g. 2 = second derivative).
:type order: int
"""
self._apply_to_self(self.spline_differentiated, order)
[docs] def differentiated(self, order=1):
"""Return a series that is the numerical order-differentiation of
the present series.
The optional parameter ``order`` specifies the order of the derivative.
The derivative is calulated as centered differencing in the interior
and one-sided derivatives at the boundaries. Higher orders are computed
applying the same rule recursively.
:param order: Order of derivative (e.g. 2 = second derivative).
:type order: int
:returns: New series with derivative.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
if self.is_masked():
raise RuntimeError(
"Differentiation with masked data is not supported."
)
ret_value = self.y
for _num_deriv in range(order):
ret_value = np.gradient(ret_value, self.x, edge_order=2)
return type(self)(self.x, ret_value, True)
[docs] def differentiate(self, order=1):
"""Differentiate with the numerical order-differentiation.
The optional parameter ``order`` specifies the order of the derivative.
The derivative is calulated as centered differencing in the interior
and one-sided derivatives at the boundaries. Higher orders are computed
applying the same rule recursively.
:param order: Order of derivative (e.g. 2 = second derivative).
:type order: int
"""
self._apply_to_self(self.differentiated, order)
[docs] def savgol_smoothed(self, window_size, order=3):
"""Return a smoothed series with a Savitzky-Golay filter with
window of size ``window_size`` and order ``order``.
This is just like a regular "Moving average" filter, but instead of
just calculating the average, a polynomial (usually 2nd or 4th order)
fit is made for every point, and only the "middle" point is chosen.
Since 2nd (or 4th) order information is concerned at every point, the
bias introduced in "moving average" approach at local maxima or minima,
is circumvented.
:param window_size: Number of points of the smoothing window (needs to
be odd).
:type window_size: int
:param order: Order of the filter.
:type order: int
:returns: New smoothed series.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
if self.is_masked():
raise RuntimeError("Smoothing with masked data is not supported.")
if self.is_complex():
return type(self)(
self.x,
signal.savgol_filter(self.y.imag, window_size, order)
+ 1j * signal.savgol_filter(self.y.real, window_size, order),
True,
)
return type(self)(
self.x,
signal.savgol_filter(self.y, window_size, order),
True,
)
[docs] def savgol_smooth(self, window_size, order=3):
"""Smooth the series with a Savitzky-Golay filter with window of
size ``window_size`` and order ``order``.
This is just like a regular "Moving average" filter, but instead of
just calculating the average, a polynomial (usually 2nd or 4th order)
fit is made for every point, and only the "middle" point is chosen.
Since 2nd (or 4th) order information is concerned at every point, the
bias introduced in "moving average" approach at local maxima or minima,
is circumvented.
:param window_size: Number of points of the smoothing window (needs to
be odd).
:type window_size: int
:param order: Order of the filter.
:type order: int
"""
self._apply_to_self(self.savgol_smoothed, window_size, order)
[docs] def cropped(self, init=None, end=None):
"""Return a series with data removed outside the interval ``[init, end]``. If
``init`` or ``end`` are not specified or None, it does not remove
anything from this side.
:param init: Data with ``x <= init`` will be removed.
:type init: float or None
:param end: Data with ``x >= end`` will be removed.
:type end: float or None
:returns: Series with enforced minimum and maximum
:rtype: :py:class:`~.BaseSeries` or derived class
"""
x = self.x
y = self.y
if init is not None:
m = x >= init
x = x[m]
y = y[m]
if end is not None:
m = x <= end
x = x[m]
y = y[m]
return type(self)(x, y, True)
[docs] def crop(self, init=None, end=None):
"""Remove data outside the the interval ``[init, end]``. If
``init`` or ``end`` are not specified or None, it does not remove
anything from this side.
:param init: Data with ``x <= init`` will be removed.
:type init: float or None
:param end: Data with ``x >= end`` will be removed.
:type end: float or None
"""
self._apply_to_self(self.cropped, init, end)
# Define aliases
clip = crop
clipped = cropped
def _apply_unary(self, function, *args, **kwargs):
"""Apply a unary function to the data.
:param function: Function to apply to the series.
:type function: callable
:return: New series with function applied to the data.
:rtype: :py:class:`~.BaseSeries` or derived class
"""
return type(self)(self.x, function(self.y, *args, **kwargs), True)
def _apply_reduction(self, reduction, *args, **kwargs):
"""Apply a reduction to the data.
:param function: Function to apply to the series.
:type function: callable
:return: Reduction applied to the data
:rtype: float
"""
return reduction(self.y, *args, **kwargs)
[docs]def sample_common(series, resample=False, piecewise_constant=False):
"""Take a list of series and return new ones so that they are all defined on the
same points.
If ``resample`` is False (default), take as input a list of series and
return a new list with the same series but only defined on those points that
are common to all the lists. If ``resample`` is True, instead of removing
points, find the common interval of definition, and resample all the series
on that internal. The number of sample points is the minimum over all
series. Additionally, if ``piecewise_constant=True``, the approximant used
for resampling is a piecewise constant function, splines are not used,
instead, the nearest neighbors are used. Use this when you have series with
discontinuities.
:param series: The series to resample or redefine on the common points
:type series: list of :py:class:`~.Series`
:param resample: Whether to resample the series, or just find the common
points.
:type resample: bool
:param piecewise_constant: Whether to use the nearest neighbor resampling
method instead of splines.
If ``piecewise_constant=True``, the approximant used
for resampling is a piecewise constant function.
:type piecewise_constant: bool
:returns: Resampled series so that they are all defined in
the same interval.
:rtype: list of :py:class:`~.Series`
"""
# In many cases there is no real need for resampling because the array
# already have the desired shape. It is worth checking if we need to
# resample. If the series are regularly sampled, it is easy to check
# if the are the same. We also need to check that they are regularly
# sampled, to do this, we check that the first is regularly sampled,
# and that all the other ones have the same x.
s1, *s_others = series
if s1.is_regularly_sampled():
for s in s_others:
if not (len(s) == len(s1)):
break
if not np.allclose(s1.x, s.x, atol=1e-14):
break
# This is an else to the for loop
else:
# We have to copy, otherwise one can accidentally modify input data
return [ss.copy() for ss in series]
if resample:
# Find the series with max xmin
s_xmin = max(series, key=lambda x: x.xmin)
# Find the series with min xmax
s_xmax = min(series, key=lambda x: x.xmax)
# Find the series with min number of points
s_ns = min(series, key=len)
x = np.linspace(s_xmin.xmin, s_xmax.xmax, len(s_ns))
return [
s.resampled(x, piecewise_constant=piecewise_constant)
for s in series
]
def float_intersection(array_1, array_2):
"""Here we find the intersection between the two arrays also
considering the floating points.
"""
# https://stackoverflow.com/a/32516182
return array_2[
np.isclose(array_1[:, None], array_2, atol=1e-14).any(0)
]
# Here we find the common intersection between all the x, starting with
# the first one
x = s1.x
for s in s_others:
x = float_intersection(x, s.x)
if len(x) == 0:
raise ValueError("Series do not have any point in common")
return [s.resampled(x) for s in series]