#!/usr/bin/env python3
# Copyright (C) 2020-2024 Gabriele Bozzola
#
# Based on code originally developed by Wolfgang Kastaun. This file may contain
# algorithms and/or structures first implemented in
# GitHub:wokast/PyCactus/PostCactus/cactus_multipoles.py
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, see <https://www.gnu.org/licenses/>.
"""This module provides access to data saved by the Multipole thorn.
There are multiple classes defined in this module:
- :py:class`~.MultipolesDir` interfaces with :py:class:`~.SimDir` and organizes the
data according to the variable available. This is a dictionary-like object with keys
the variable names.
- :py:class`~.MultipoleAllDets` takes all the files that correspond to a given variable
and organize them according to the extraction radius.
- :py:class`~.MultipoleOneDet` represents one single extraction radius, for a single
variable. It is a dictionary-like object with keys the multipolar numbers and values
the multipolar decomposition represented as represented as :py:class:`~.TimeSeries`
objects.
These are hierarchical classes, one containing the others, so one typically ends
up with a series of brackets or dots to access the actual data. For example, if
``sim`` is a :py:class:`~.SimDir`, ``sim.multipoles['rho_b'][100][2,2]`` is
(2,2) decomposition of ``rho_b`` at radius 100 represented as
:py:class:`~.TimeSeries`.
"""
import os
import re
from typing import Optional
import h5py
import numpy as np
from kuibit import timeseries
from kuibit.attr_dict import pythonize_name_dict
[docs]class MultipoleOneDet:
"""This class collects multipole components of a specific variable
a given spherical surface.
Multipoles are tightly connected with gravitational waves, so morally a
sphere where multipoles are computed is a "detector". Hence, the name
of the class.
:py:class:`~.MultipoleOneDet` is a dictionary-like object with components as
the tuples (l,m) and values the corresponding multipolar decomposition as a
:py:class:`~.TimeSeries` object. Alternatively, this can also be called
directly ``multipoleonedet(l,m)``. Iteration is supported and yields tuples
``(l, m, data)``, which can be used to loop through all the multipoles
available.
The reason we allow for ``l_min`` is to remove those files that are not
necessary when considering gravitational waves (l<2).
:ivar dist: Radius of the sphere.
:vartype dist: float
:ivar radius: Radius of the sphere.
:type radius: float
:ivar l_min: l smaller than ``l_min`` are dropped.
:type l_min: int
:ivar available_l: Available l values.
:type available_l: set
:ivar available_m: Available m values.
:ivar available_m: set
:ivar available_lm: Available ``(l, m)`` values.
:ivar available_lm: set of tuples
:ivar missing_lm: Missing (l, m) values to have all from ``l_min`` to ``l_max``.
:ivar missing_lm: set of tuples
"""
def __init__(self, dist, data, l_min=0):
"""Constructor.
:param dist: Radius of the spherical surface.
:type dist: float
:param data: List of tuples with the two multipolar numbers and
the data as :py:class:`~.TimeSeries`.
:type data: list of tuple ``(l, m, timeseries)``
:ivar l_min: l smaller than ``l_min`` are dropped.
:type l_min: int
"""
self.dist = float(dist)
self.radius = self.dist # This is just an alias
self.l_min = l_min
# Associate multipoles to list of timeseries
multipoles_list_ts = {}
# Now we populate the multipoles_ts_list dictionary. This is a
# dictionary with keys (l, m) and with values lists of timeseries. If
# the key is not already present, we create it with value an empty
# list, then we append the timeseries to this list
for mult_l, mult_m, ts in data: # mult = "multipole"
if mult_l >= l_min:
lm_list = multipoles_list_ts.setdefault((mult_l, mult_m), [])
# This means: multipoles[(mult_t, mult_m)] = []
lm_list.append(ts)
# At the end we have:
# multipoles[(mult_t, mult_m)] = [ts1, ts2, ...]
# Now self._multipoles is a dictionary in which all the timeseries are
# collapse in a single one. So it is a straightforward map (l, m) -> ts
self._multipoles = {
lm: timeseries.combine_ts(ts)
for lm, ts in multipoles_list_ts.items()
}
self.available_l = sorted(
{mult_l for mult_l, _ in self._multipoles.keys()}
)
self.l_max = max(self.available_l)
self.available_m = sorted(
{mult_m for _, mult_m in self._multipoles.keys()}
)
self.available_lm = set(self._multipoles.keys())
# Check if all the (l, m) from l_min to l_max are available
all_lm = set()
for mult_l in range(self.l_min, max(self.available_l) + 1):
for mult_m in range(-mult_l, mult_l + 1):
all_lm.add((mult_l, mult_m))
# set subtraction
self.missing_lm = all_lm - self.available_lm
# Data is in the format expected by __init__
self.data = [(lm[0], lm[1], ts) for lm, ts in self._multipoles.items()]
[docs] def copy(self):
"""Return a deep copy.
:returns: Deep copy of ``self``.
:rtype: :py:class:`~.MultipoleOneDet`
"""
return type(self)(self.dist, self.data, self.l_min)
[docs] def crop(self, init: Optional[float] = None, end: Optional[float] = None):
"""Remove all the data before ``init`` and after ``end``.
If ``init`` or ``end`` are not specified, do not crop on that side.
"""
for _, _, ts in self.data:
ts.crop(init=init, end=end)
[docs] def cropped(
self, init: Optional[float] = None, end: Optional[float] = None
):
"""Return a copy where the data is cropped in the provided interval.
If ``init`` or ``end`` are not specified, do not crop on that side.
:returns: Copy of ``self`` with data cropped.
:rtype: :py:class:`~.MultipoleOneDet`
"""
ret = self.copy()
ret.crop(init=init, end=end)
return ret
def __contains__(self, key):
return key in self._multipoles
def __getitem__(self, key):
return self._multipoles[key]
def __call__(self, mult_l, mult_m):
return self[(mult_l, mult_m)]
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return (
self.dist == other.dist and self._multipoles == other._multipoles
)
# From Python's docs: In order to conform to the object model, classes that
# define their own equality method should also define their own hash method,
# or be unhashable.
# Since we consider series unhashable, this object also has to be unhashable.
__hash__ = None
def __iter__(self):
for (mult_l, mult_m), ts in sorted(self._multipoles.items()):
yield mult_l, mult_m, ts
def __len__(self):
return len(self._multipoles)
[docs] def keys(self):
"""Return available multipolar numbers.
:returns: Available multipolar numbers.
:rtype: dict_keys
"""
return self._multipoles.keys()
def __str__(self):
ret = f"(l, m) available: {list(self.keys())}"
if self.missing_lm:
ret += f" (missing: {list(self.missing_lm)})"
return ret
[docs] def total_function_on_available_lm(
self, function, *args, l_max=None, **kwargs
):
"""Evaluate ``function`` on each multipole and accumulate the result.
``total_function_on_available_lm`` will call ``function`` with the
following arguments:
``function(timeseries, mult_l, mult_m, dist, *args, **kwargs)``
If ``function`` does not need some paramters, it should use take
the ``*args`` argument to ignore the additional paramters that
are always passed ``(l, m, r)``.
Values of l larger than ``l_max`` are ignored.
This method is used to compute quantities like the total power in
gravitational waves.
``function`` can take additional paramters passed directly from
``total_function_on_available_lm`` (e.g. ``pcut`` for FFI).
:params function: Function that has to be applied on each multipole.
:type function: callable
:returns: Sum of function applied to each monopole
:rtype: return type of function
"""
# This function is used to compute many quantities with waves (e.g.,
# total strain, total power emitted, ...)
# It is a little bit ugly, but it works
if l_max is None:
l_max = self.l_max
if l_max > self.l_max:
raise ValueError("l max larger than l available")
# The iterator is increasing in (l, m), so we can start from the first
# element
iter_self = iter(self)
try:
first_l, first_m, first_det = next(iter_self)
except StopIteration:
raise RuntimeError("No multipole moments available")
if first_l > l_max:
raise ValueError("l max smaller than all l available")
result = function(
first_det, first_l, first_m, self.dist, *args, **kwargs
)
for mult_l, mult_m, det in iter_self:
if mult_l <= l_max:
result += function(
det, mult_l, mult_m, self.dist, *args, **kwargs
)
return result
[docs]class MultipoleAllDets:
"""This class collects available surfaces with multipole data.
It is a dictionary-like object with keys the spherical surface radius, and
values :py:class:`MultipoleOneDet` object. Iteration is supported, sorted by
ascending radius. You can iterate over all the radii and all the available l
and m with a nested loop.
:ivar radii: Available surface radii.
:type radii: float
:ivar r_outer: Radius of the outermost detector.
:type r_outer: float
:ivar l_min: l smaller than l_min are dropped.
:type l_min: int
:ivar outermost: Outermost detector.
:type outermost: :py:class:`~MultipoleOneDet`
:ivar available_lm: Available components as tuple (l,m).
:type available_lm: list of tuples
:ivar available_l: List of available l.
:type available_l: list
:ivar available_m: List of available m.
:type available_m: list
"""
def __init__(self, data, l_min=0):
"""Constructor.
:param data: List of tuples with ``(multipole_l, multipole_m,
extraction_radius, [timeseries])``, where
``[timeseries]`` is a list of the :py:class:`~.TimeSeries`
associated.
:type data: list of tuples
"""
self.l_min = l_min
detectors = {}
self.available_lm = set()
# Populate detectors
# detectors is a dictionary with keys the radii and
# items that look like ([l, m, timeseries, ...]).
# We accumulate in the list all the ones for the same
# radius
for mult_l, mult_m, radius, ts in data:
if mult_l >= self.l_min:
# If we don't have the radius yet, let's create an empty list
d = detectors.setdefault(radius, [])
# Add the new values
d.append((mult_l, mult_m, ts))
# Tally the available l and m
self.available_lm.add((mult_l, mult_m))
self._detectors = {
radius: MultipoleOneDet(radius, multipoles, self.l_min)
for radius, multipoles in detectors.items()
}
# In Python3 .keys() is not a list
self.radii = sorted(list(self._detectors.keys()))
if len(self.radii) > 0:
self.r_outer = self.radii[-1]
self.outermost = self._detectors[self.r_outer]
#
self.available_l = sorted({mult_l for mult_l, _ in self.available_lm})
if self.available_l:
self.l_max = max(self.available_l)
else:
self.l_max = None
self.available_m = sorted({mult_m for _, mult_m in self.available_lm})
# Alias
self._dets = self._detectors
# Data is in the format expected by __init__
# (multipole_l, multipole_m, extraction_radius, [timeseries])
self.data = []
for radius, det in self._dets.items():
for mult_l, mult_m, ts in det:
self.data.append((mult_l, mult_m, radius, ts))
[docs] def copy(self):
"""Return a deep copy.
:returns: Deep copy of ``self``.
:rtype: :py:class:`~.MultipoleAllDets`
"""
return type(self)(self.data, self.l_min)
[docs] def has_detector(self, mult_l, mult_m, dist):
"""Check if a given multipole component extracted at a given
distance is available.
:param mult_l: Multipole component l.
:type mult_l: int
:param mult_m: Multipole component m.
:type mult_m: int
:param dist: Distance of the detector.
:type dist: float
:returns: If available or not.
:rtype: bool
"""
if dist in self:
return (mult_l, mult_m) in self[dist]
return False
def __contains__(self, key):
return key in self._dets
def __getitem__(self, key):
return self._dets[key]
def __iter__(self):
for r in self.radii:
yield self[r]
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.radii == other.radii and self._dets == other._dets
# From Python's docs: In order to conform to the object model, classes that
# define their own equality method should also define their own hash method,
# or be unhashable.
# Since we consider series unhashable, this object also has to be unhashable.
__hash__ = None
def __len__(self):
return len(self._dets)
[docs] def keys(self):
"""Return available extraction radii.
:returns: Available extraction radii.
:rtype: dict_keys
"""
return self._dets.keys()
def __str__(self):
ret = f"Avilable radii: {list(self.keys())}\n\n"
for d in sorted(self.keys()):
ret += f"At radius {d}, {self._dets[d]}\n"
return ret
[docs]class MultipolesDir:
"""This class provides acces to various types of multipole data in a given
simulation directory.
This class is like a dictionary, you can access its values using the
brackets operator, with values that are :py:mod:`~.MultipoleAllDets`. These
contain the full multipolar description for all the available radii. Files
are lazily loaded. If both HDF5 and ASCII are present, HDF5 are preferred.
There's no attempt to combine the two. Alternatively, you can access
variables with ``get`` or with ``fields.var_name``.
"""
def __init__(self, sd):
"""Constructor.
:param sd: Simulation directory.
:type sd: :py:class:`~.SimDir`
"""
# self._vars_*_files are dictionary. For _vars_ascii_files, the keys are
# the variables and the items are sets of tuples of the form
# (multipole_l, multipole_m, radius, filename) for text files.
#
# For example:
# self._vars_ascii_files =
# {'psi4': {(2, 2, 110.69, 'output-0000/mp_Psi4_l_m2_r110.69.asc'),
# (2, 2, 110.69, 'output-0001/mp_Psi4_l_m2_r110.69.asc')}}
self._vars_ascii_files = {}
# For _vars_h5_files, the keys are still the variables, but values are
# sets with only the files (and not tuples), since we have to read the
# content to find the l and m
#
# For example
# self._vars_h5_files =
# {'psi4': {'output-0000/mp_Psi4.h5', 'output-0001/mp_Psi4.h5'}}
self._vars_h5_files = {}
# self._vars is the dictionary where we cache the results. The keys are
# the variables, the values are the corresponding MultipoleAllDets
# objects. We fill this with __getitem__
self._vars = {}
# First, we need to find the multipole files.
# There are text files and h5 files
#
# We use a regular expressions on the name
# The structure is like mp_Psi4_l_m2_r110.69.asc
#
# Let's understand the regexp:
# 0. ^ and $ means that we match the entire name
# 1. We match mp_ followed my the variable name, which is
# any combination of characters
# 2. We match _l with a number
# 3. We match _m with possibly a minus sign and a number
# 4. We match _r with any combination of numbers with possibly
# dots
# 5. We possibly match a compression
rx_ascii = re.compile(
r"""^
mp_([a-zA-Z0-9\[\]_]+)
_l(\d+)
_m([-]?\d+)
_r([0-9.]+)
.asc
(?:.bz2|.gz)?
$""",
re.VERBOSE,
)
# For h5 files is easy: it is just the var name
rx_h5 = re.compile(r"^mp_([a-zA-Z0-9\[\]_]+).h5$")
for f in sd.allfiles:
filename = os.path.split(f)[1]
matched_h5 = rx_h5.match(filename)
matched_ascii = rx_ascii.match(filename)
if matched_h5 is not None:
variable_name = matched_h5.group(1).lower()
var_list = self._vars_h5_files.setdefault(variable_name, set())
# We are flagging that this h5
var_list.add(f)
elif matched_ascii is not None:
variable_name = matched_ascii.group(1).lower()
mult_l = int(matched_ascii.group(2))
mult_m = int(matched_ascii.group(3))
radius = float(matched_ascii.group(4))
var_list = self._vars_ascii_files.setdefault(
variable_name, set()
)
var_list.add((mult_l, mult_m, radius, f))
# What pythonize_name_dict does is to make the various variables
# accessible as attributes, e.g. self.fields.rho
self.fields = pythonize_name_dict(list(self.keys()), self.__getitem__)
def __contains__(self, key):
return str(key).lower() in self.keys()
# The following are staticmethods because they do not depend on the bound
# object. Using this decorator we save memory because Python will
# initialize them only once.
@staticmethod
def _multipole_from_textfile(path):
"""Read multipole data from a text file.
:param path: File to read.
:type path: str
:returns: Multipole data.
:rtype: :py:class:`~.TimeSeries`
"""
a = np.loadtxt(path, unpack=True, ndmin=2)
if len(a) != 3:
raise RuntimeError(f"Wrong format in {path}")
complex_mp = a[1] + 1j * a[2]
return timeseries.remove_duplicated_iters(a[0], complex_mp)
@staticmethod
def _multipoles_from_h5file(path):
"""Read multipole data from a HDF5 file.
:param path: File to read.
:type path: str
:returns: Multipole data.
:rtype: :py:class:`~.TimeSeries`
"""
alldets = []
# This regex matches : l(number)_m(-number)_r(number)
fieldname_pattern = re.compile(r"l(\d+)_m([-]?\d+)_r([0-9.]+)")
try:
with h5py.File(path, "r") as data:
# Loop over the groups in the hdf5
for entry in data.keys():
matched = fieldname_pattern.match(entry)
if matched:
mult_l = int(matched.group(1))
mult_m = int(matched.group(2))
radius = float(matched.group(3))
# Read the actual data
a = data[entry][()].T
complex_mp = a[1] + 1j * a[2]
ts = timeseries.remove_duplicated_iters(
a[0], complex_mp
)
alldets.append((mult_l, mult_m, radius, ts))
except RuntimeError as exce:
raise RuntimeError(f"File {data} cannot be processed") from exce
return alldets
def _multipoles_from_textfiles(self, mpfiles):
"""Read all the multipole data in several text files.
:param mpfiles: Files to read.
:type mpfiles: list of str
:returns: :py:class:`~.MultipoleAllDets` with all the data read.
:rtype: :py:class:`~.MultipoleAllDets`
"""
# We prepare the data for MultipoleAllDets checking for errors
alldets = [
(
mult_l,
mult_m,
radius,
self._multipole_from_textfile(filename),
)
for mult_l, mult_m, radius, filename in mpfiles
]
return MultipoleAllDets(alldets)
def _multipoles_from_h5files(self, mpfiles):
"""Read all the multipole data in several HDF5 files.
:param mpfiles: Files to read.
:type mpfiles: list of str
:returns: :py:class:`~.MultipoleAllDets` with all the data read.
:rtype: :py:class:`~.MultipoleAllDets`
"""
mult_l = []
for filename in mpfiles:
mult_l.extend(self._multipoles_from_h5file(filename))
return MultipoleAllDets(mult_l)
def __getitem__(self, key):
"""Read data associated to variable ``key``.
HDF5 files are preferred over ASCII ones.
:returns: Multipolar data.
:rtype: :py:class:`~.MultipoleAllDets`
"""
k = str(key).lower()
# If we don't have already cached the result, we first read it.
if k not in self._vars:
# We prefer h5
if k in self._vars_h5_files:
self._vars[k] = self._multipoles_from_h5files(
self._vars_h5_files[k]
)
elif k in self._vars_ascii_files:
self._vars[k] = self._multipoles_from_textfiles(
self._vars_ascii_files[k]
)
else:
raise KeyError
return self._vars[k]
[docs] def get(self, key, default=None):
"""Return a the multipolar data for the given variable if available, else return
the default value.
:param key: Requested variable.
:type key: str
:param default: Returned value if ``key`` is not available.
:type default: any
:returns: Collection of all the multipolar data for the given variable.
:rtype: :py:class:`~.MultipoleAllDets`
"""
if key not in self:
return default
return self[key]
[docs] def keys(self):
"""Return available variables with multipolar data.
:returns: Available variables that have multipolar data.
:rtype: dict_keys
"""
# We merge the dictionaries and return the keys.
# This automatically takes care of making sure that they keys are unique.
return {**self._vars_h5_files, **self._vars_ascii_files}.keys()
def __str__(self):
"""NOTE: __str__ requires opening all the h5 files! This can be slow!"""
ret = f"Variables available: {self.keys()}\n"
for variable in self.keys():
ret += f"For variable {variable}:\n\n"
ret += f"{self[variable]}\n"
return ret