"""Utility functions for post processing the results data by adding statistics
columns and filtering on those columns."""
import warnings
import numpy as np
import numpy.ma as ma
import pandas as pd
from nested_pandas import NestedFrame
from lightcurvelynx.astro_utils.mag_flux import flux2mag
from lightcurvelynx.obstable.obs_table import ObsTable
[docs]
def concat_results(results_list):
"""Concatenate a list of results into a single NestedFrame,
updating the ID column to be unique across all results.
Parameters
----------
results_list : list of nested_pandas.NestedFrame
The list of DataFrames to concatenate.
Returns
-------
nested_pandas.NestedFrame
The concatenated DataFrame.
"""
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=FutureWarning, message=".*DataFrame concatenation with empty.*"
)
result = pd.concat(results_list, ignore_index=True)
# We need to update the ID column to be unique across all results.
if "id" in result.columns:
result["id"] = np.arange(len(result))
return result
[docs]
def results_drop_empty(results):
"""Drop empty lightcurves from the results DataFrame.
Parameters
----------
results : nested_pandas.NestedFrame
The DataFrame containing lightcurve data.
Returns
-------
nested_pandas.NestedFrame
The DataFrame with empty lightcurves removed.
"""
return results.dropna(subset=["lightcurve"])
[docs]
def results_append_param_as_col(results, param_name):
"""Append a simulation parameter as a new column to the results DataFrame.
Parameters
----------
results : nested_pandas.NestedFrame
The DataFrame containing lightcurve data. This is modified in place.
param_name : str
The name of the parameter to append in the form <node_label>.<param_name>.
Returns
-------
nested_pandas.NestedFrame
The DataFrame with the new parameter column added.
"""
# Remove the dot from the parameter name to create a valid column name.
new_colname = param_name.replace(".", "_")
if new_colname in results.columns:
warnings.warn(f"Parameter {new_colname} already exists in results. Overwriting.")
# Because the parameters are stored as a list of dictionaries, we need to loop through
# each row and extract the parameter value.
values = [results["params"].iloc[i][param_name] for i in range(len(results))]
results[new_colname] = values
return results
[docs]
def results_append_obstable_data(results, column_name, obstables):
"""Append the ObsTable entries for each observation as a new column in the
lightcurves nested DataFrame.
Parameters
----------
results : nested_pandas.NestedFrame
The DataFrame containing lightcurve data. This is modified in place.
column_name : str
The name of the column to append from the ObsTable entries.
obstables : ObsTable or list of ObsTable
The ObsTable(s) containing the data to append. These should be in the
same order as where used in the simulation.
Returns
-------
nested_pandas.NestedFrame
The DataFrame with the new parameter column added.
"""
if isinstance(obstables, ObsTable):
obstables = [obstables]
obs_idx = results["lightcurve.obs_idx"]
survey_idx = results["lightcurve.survey_idx"]
unique_survey_idx = np.unique(survey_idx)
new_col = np.full(len(obs_idx), np.nan)
for s_idx in unique_survey_idx:
survey_mask = survey_idx == s_idx
if column_name in obstables[s_idx].columns:
new_col[survey_mask] = obstables[s_idx][column_name].iloc[obs_idx[survey_mask]]
results[f"lightcurve.{column_name}"] = new_col
return results
[docs]
def lightcurve_compute_snr(flux, fluxerr):
"""Compute the signal-to-noise ratio (SNR) for given flux and flux error arrays.
Parameters
----------
flux : array-like
The flux values.
fluxerr : array-like
The flux error values.
Returns
-------
result : np.ndarray
The SNR values, with None for invalid entries (e.g., zero or negative flux error).
"""
flux = np.asarray(flux)
fluxerr = np.asarray(fluxerr)
valid_mask = (flux > 0) & (fluxerr > 0)
result = ma.masked_all(flux.shape)
result[valid_mask] = flux[valid_mask] / fluxerr[valid_mask]
return result
[docs]
def lightcurve_compute_mag(flux, fluxerr):
"""Compute the AB magnitude and magnitude error for given flux and flux error arrays.
Parameters
----------
flux : array-like
The flux values.
fluxerr : array-like
The flux error values.
Returns
-------
tuple of np.ndarray
The magnitude and magnitude error values, with None for invalid entries (e.g., non-positive flux).
"""
flux = np.asarray(flux)
fluxerr = np.asarray(fluxerr)
valid_mask = (flux > 0) & (fluxerr > 0)
mag = ma.masked_all(flux.shape)
mag[valid_mask] = flux2mag(flux[valid_mask])
magerr = ma.masked_all(flux.shape)
magerr[valid_mask] = (2.5 / np.log(10)) * (fluxerr[valid_mask] / flux[valid_mask])
return mag, magerr
[docs]
def augment_single_lightcurve(results, *, min_snr=0.0, t0=None):
"""Add columns to a single lightcurve DataFrame with additional information
about the light curve, including:
- SNR = flux / fluxerr
- detection flag (True if SNR >= min_snr, False otherwise)
- AB magnitude
- AB magnitude error = (2.5 / ln(10)) * (fluxerr / flux)
- relative time = mjd - t0 (if t0 is provided)
None is used for invalid entries, e.g. negative flux or zero flux error.
Parameters
----------
results : pandas.DataFrame
The DataFrame containing lightcurve data. Modified in place.
min_snr : float, optional
Minimum SNR required to mark an entry as a detection. Default is 0.0.
t0 : float or None, optional
Reference time for the lightcurve.
Returns
-------
results : pandas.DataFrame
The modified DataFrame (to enable chaining).
"""
if "flux" not in results.columns or "fluxerr" not in results.columns:
raise ValueError("flux and fluxerr must be present in the light curve DataFrame.")
flux = results["flux"]
fluxerr = results["fluxerr"]
snr = lightcurve_compute_snr(flux, fluxerr)
results["snr"] = snr
results["detection"] = np.where(snr.mask, False, snr >= min_snr)
mag, magerr = lightcurve_compute_mag(flux, fluxerr)
results["mag"] = mag
results["magerr"] = magerr
if t0 is not None and "mjd" in results.columns:
results["time_rel"] = results["mjd"] - t0
return results
[docs]
def results_augment_lightcurves(results, *, min_snr=0.0):
"""Add columns to the results DataFrame with additional information
about each light curve, including:
- SNR = flux / fluxerr
- detection flag (True if SNR >= min_snr, False otherwise)
- AB magnitude
- AB magnitude error = (2.5 / ln(10)) * (fluxerr / flux)
- relative time = mjd - t0 (if t0 in the results table)
None is used for invalid entries, e.g. negative flux or zero flux error.
The input data frame can either be a single light curve (pandas.DataFrame)
with columns "flux" and "fluxerr", or a NestedFrame (nested_pandas.NestedFrame)
with a nested DataFrame column "lightcurve" that contains the "flux" and
"fluxerr" columns.
Parameters
----------
results : pandas.DataFrame or nested_pandas.NestedFrame
The DataFrame containing lightcurve data. Modified in place.
min_snr : float, optional
Minimum SNR required to mark an entry as a detection. Default is 0.0.
Returns
-------
results : pandas.DataFrame or nested_pandas.NestedFrame
The modified DataFrame (to enable chaining).
"""
if not isinstance(results, NestedFrame) or "lightcurve" not in results.columns:
raise ValueError("results must be a NestedFrame with a 'lightcurve' column.")
if (
"flux" not in results["lightcurve"].nest.columns
or "fluxerr" not in results["lightcurve"].nest.columns
):
raise ValueError("lightcurve.flux and lightcurve.fluxerr must be present in the DataFrame.")
flux = results["lightcurve.flux"]
fluxerr = results["lightcurve.fluxerr"]
# Compute SNR and detection flag.
snr = lightcurve_compute_snr(flux, fluxerr)
results["lightcurve.snr"] = snr
results["lightcurve.detection"] = np.where(snr.mask, False, snr >= min_snr)
# Compute magnitude and magnitude error.
mag, magerr = lightcurve_compute_mag(flux, fluxerr)
results["lightcurve.mag"] = mag
results["lightcurve.magerr"] = magerr
# If t0 is provided as a column in results, compute relative time.
if "t0" in results.columns and np.all(results["t0"]) and results["t0"].notna().all():
if "mjd" not in results["lightcurve"].nest.columns:
raise ValueError("lightcurve.mjd must be present in the DataFrame.")
# Get the index for the t0 entry for each lightcurve MJD and use that
# to subtract out the reference t0.
t0 = np.asanyarray(results["t0"])
t0_idx = np.array(results["lightcurve"]["mjd"].index)
results["lightcurve.time_rel"] = results["lightcurve.mjd"] - t0[t0_idx]
return results
[docs]
def results_use_full_filter_names(results, passbands):
"""Modifies the 'filter' column in the results DataFrame to include
the survey name as a prefix, e.g. 'LSST_g'.
Parameters
----------
results : pandas.DataFrame or nested_pandas.NestedFrame
The DataFrame containing lightcurve data. Modified in place.
passbands : list of PassbandGroup
The list of PassbandGroups used in the simulation, in the same order
as in the simulation.
Returns
-------
results : pandas.DataFrame or nested_pandas.NestedFrame
The modified DataFrame (to enable chaining).
"""
if not isinstance(results, NestedFrame) or "lightcurve" not in results.columns:
raise ValueError("results must be a NestedFrame with a 'lightcurve' column.")
if "filter" not in results["lightcurve"].nest.columns:
raise ValueError("lightcurve.flux and lightcurve.fluxerr must be present in the DataFrame.")
if "survey_idx" in results["lightcurve"].nest.columns:
survey_idx = results["lightcurve.survey_idx"].values
else:
survey_idx = np.zeros(len(results), dtype=int)
# Go through every pair of survey index and filter name and replace
# the filter name with the full name from the passband group.
filter_names = results["lightcurve.filter"].values.copy()
for s_idx in np.unique(survey_idx):
for fil in np.unique(filter_names[survey_idx == s_idx]):
mask = (survey_idx == s_idx) & (filter_names == fil)
full_name = passbands[s_idx][fil].full_name
filter_names[mask] = full_name
results["lightcurve.filter"] = filter_names
return results