Source code for lightcurvelynx.noise_models.pzflow_noise_model

"""A wrapper that trains and queries pzflow for sampling noise.

We strongly recommend using the ``learn_pzflow_noise_model`` function to train
a flow and create a PZFlowNoiseModel.

For the full pzflow package see:
https://github.com/jfcrenshaw/pzflow
"""

import pickle

import numpy as np
import pandas as pd
from citation_compass import CiteClass, cite_function

from lightcurvelynx.noise_models.base_noise_models import FluxNoiseModel


class _ColumnNormalizationData:
    """A class to hold the data needed for normalizing the data for training
    and prediction data for a PZFlowNoiseModel.

    Attributes
    ----------
    offset : float
        The offset value used for normalization, typically the minimum value of the data.
    scale : float
        The scale factor to be used for normalization, typically the range of the data.
    log_transform : bool
        Whether to apply a log transform to the data for normalization. If True,
        the data will be transformed using log10(x - offset + 1) before applying
        the scaling. This can be useful for data that is highly skewed or has a
        large dynamic range.

    Parameters
    ----------
    data : array_like
        The data to be used for computing the normalization parameters.
    log_transform : bool, optional
        Whether to apply a log transform to the data for normalization. If True,
        the data will be transformed using log10(x - offset + 1) before applying
        the scaling. This can be useful for data that is highly skewed or has a
        large dynamic range.
    """

    def __init__(self, data, log_transform=False):
        # Perform some basic checks on the data.
        data = np.asarray(data)
        if len(data) == 0:
            raise ValueError("Data is empty, cannot compute normalization parameters.")
        if np.all(np.isnan(data)):
            raise ValueError("Data contains only NaN values, cannot compute normalization parameters.")
        if log_transform and np.any(data <= 0):
            raise ValueError(
                "Data contains non-positive values, cannot apply log transform for normalization."
            )

        # Take the log of the data (if needed).
        self.log_transform = log_transform
        if log_transform:
            data = np.log10(data)

        # Compute the scale of the data such that it will have a range of width 1 after normalization.
        range = np.max(data) - np.min(data)
        if range <= 0:
            # Handle the case where all the data is the same.
            range = 1.0
        self.scale = 1.0 / range
        data = data * self.scale

        # Shift the data to lie between 0 and 1 after normalization.
        self.offset = -np.min(data)

    def normalize(self, data):
        """Normalize the data using the specified min, max, and log transform.

        Parameters
        ----------
        data : array_like
            The data to be normalized.

        Returns
        -------
        array_like
            The normalized data.
        """
        data = np.asarray(data)
        if self.log_transform:
            if np.any(data <= 0):
                raise ValueError(
                    "Data contains non-positive values, cannot apply log transform for normalization."
                )
            data = np.log10(data)
        data = data * self.scale + self.offset

        return data

    def denormalize(self, data):
        """Denormalize the data using the specified min, max, and log transform.

        Parameters
        ----------
        data : array_like
            The data to be denormalized.

        Returns
        -------
        array_like
            The denormalized data.
        """
        data = (data - self.offset) / self.scale
        if self.log_transform:
            data = 10**data
        return data


[docs] class PZFlowNoiseModel(FluxNoiseModel, CiteClass): """A noise model that uses pzflow to sample noise parameters (standard deviation of the noise) for bandflux measurements. The names of the pzflow input parameters should either match the column names or be mapped to column names in the ObsTable using the `input_col_map` when creating the PZFlowNoiseModel. The exception to this is `bandflux` input, which would be passed directly as an argument to the `apply_noise` method. We strongly recommend using the ``learn_pzflow_noise_model`` function to train a flow and create a PZFlowNoiseModel as this will correctly handle potentially error-prone aspects like normalization. References ---------- * Paper: Crenshaw et. al. 2024 - https://ui.adsabs.harvard.edu/abs/2024AJ....168...80C * Zenodo: Crenshaw et. al. 2024 - https://doi.org/10.5281/zenodo.10710271 Parameters ---------- flow_obj : pzflow.flow.Flow or pzflow.flowEnsemble.FlowEnsemble The object from which to sample. input_col_map : dict, optional A dictionary where the keys are the input parameter names for the pzflow model and the values are the names of the columns in the ObsTable that should be used for those parameters. normalizer_data : dict, optional A dictionary where the keys are the column names of the data used for training the flow and the values are _ColumnNormalizationData objects that contain the information needed to normalize and denormalize the data for prediction. """ def __init__( self, flow_obj, *, input_col_map=None, normalizer_data=None, ): # Validate the pzflow object has the expected output column # and save its name. self._flow = flow_obj if len(self._flow.data_columns) != 1: raise ValueError( "PZFlowNoiseModel currently only supports flows with a" "single data column (the standard deviation of the noise)." ) self._output_column = self._flow.data_columns[0] # Save the meta data. self._input_col_map = input_col_map if input_col_map is not None else {} self._normalizer_data = normalizer_data if normalizer_data is not None else {} self._update_required_values()
[docs] def add_column_mapping(self, flow_input_name, obs_table_col_name): """Add a mapping from a pzflow input parameter name to an ObsTable column name. This function is used when the model is created with on data with different column names than the ObsTable on which it will be applied. Parameters ---------- flow_input_name : str The name of the input parameter for the pzflow model. obs_table_col_name : str The name of the column in the ObsTable that should be used for this parameter. """ self._input_col_map[flow_input_name] = obs_table_col_name
def _update_required_values(self): """Update the list of required values based on the current input_col_map.""" self._required_values = [] for col in self._flow.conditional_columns: if col == "bandflux": # Bandflux is a special case because it is passed directly to apply_noise. continue colname = self._input_col_map.get(col, col) self._required_values.append(colname) @classmethod
[docs] def from_file(cls, filename, *, input_col_map=None): """Create a PZFlowNoiseModel from a saved file. Parameters ---------- filename : str or Path The location of the saved flow. input_col_map : dict, optional A dictionary where the keys are the input parameter names for the pzflow model and the values are the names of the columns in the ObsTable that should be used for those parameters. If provided this overrides the input_col_map that was saved with the model. """ # We are loading the entire model object (not just the flow) so that we can keep # the normalization data and other meta data together with the flow. with open(filename, "rb") as f: noise_model = pickle.load(f) assert isinstance(noise_model, PZFlowNoiseModel), "The loaded object is not a PZFlowNoiseModel." # Since we can pass in a different input_col_map when loading the model, we need to update # the required values. if input_col_map is not None: noise_model._input_col_map = input_col_map noise_model._update_required_values() return noise_model
[docs] def apply_noise( self, bandflux, *, obs_table=None, indices=None, rng=None, **kwargs, ): """Compute the noise parameters for given observations in an ObsTable and apply noise to the input bandflux. Parameters ---------- bandflux : array_like of float Source bandflux in energy units, e.g. nJy. obs_table : ObsTable, optional Table containing the observation parameters, including all parameters needed to compute the noise. indices : array_like of int, optional Indices of the observations in the ObsTable to which noise should be applied. rng : np.random.Generator, optional The random number generator to use for applying noise. If None, a default generator will be used. **kwargs Additional parameters for the noise model. Returns ------- flux : array_like The updated flux measurements after applying noise, in the same units as the input bandflux. flux_err : array_like The bandflux measurement error used for applying noise, in the same units as the input bandflux. """ if obs_table is None: raise ValueError("ObsTable must be provided for PZFlowNoiseModel.") if indices is None: raise ValueError("Indices must be provided for PZFlowNoiseModel.") num_samples = len(bandflux) if len(indices) != num_samples: raise ValueError("Length of indices must match length of bandflux.") # Get the input parameters for the flow (if there are any). if self._flow.conditional_columns is not None and len(self._flow.conditional_columns) > 0: input_params = {} for col in self._flow.conditional_columns: if col == "bandflux" and bandflux is not None: values = bandflux else: key = self._input_col_map.get(col, col) values = obs_table.get_value_per_row(key, indices=indices) # Normalize the input parameters if needed. We can lookup by col (instead of key), # because the normalization is based on the pzflow column names. if self._normalizer_data.get(col) is not None: normalizer = self._normalizer_data[col] values = normalizer.normalize(values) # Record the input parameters for this column. input_params[col] = values input_df = pd.DataFrame(input_params) else: input_df = None # Sample from the flow to get the noise parameters. rng = np.random.default_rng(rng) pzflow_seed = rng.integers(0, 1e9) samples = self._flow.sample(nsamples=1, conditions=input_df, seed=pzflow_seed) flux_err = np.clip(samples[self._output_column].values, a_min=0, a_max=None) # If we have normalization data for the output column, denormalize the output. if self._normalizer_data.get(self._output_column) is not None: normalizer = self._normalizer_data[self._output_column] flux_err = normalizer.denormalize(flux_err) # Apply noise to the input bandflux using the sampled noise parameters. noisy_bandflux = rng.normal(loc=bandflux, scale=flux_err) return noisy_bandflux, flux_err
[docs] def save_to_file(self, filename): """Save the PZFlowNoiseModel to a file. Parameters ---------- filename : str or Path The location where the flow should be saved. """ with open(filename, "wb") as f: pickle.dump(self, f)
@cite_function
[docs] def learn_pzflow_noise_model( data, *, noise_column=None, normalize=True, **kwargs, ): """Train a pzflow model to predict noise parameters (standard deviation of the noise) for bandflux measurements. Parameters ---------- data : dict or pd.DataFrame The data to be used for training the flow. This should include all conditional parameters listed in the ``conditional_columns`` parameter as well as the ``noise_column`` parameter. noise_column : str or array_like, optional The name of the column in ``data`` that contains the noise values to predict. All other columns are treated as input. normalize : bool, optional Whether to normalize the data for training the flow. This can help the flow learn the distribution more effectively, especially if the data has a large dynamic range or is highly skewed. **kwargs Additional parameters for training the flow. References ---------- * Paper: Crenshaw et. al. 2024 - https://ui.adsabs.harvard.edu/abs/2024AJ....168...80C * Zenodo: Crenshaw et. al. 2024 - https://doi.org/10.5281/zenodo.10710271 """ # Check that we can load the pzflow package before doing any other work. try: from pzflow import Flow except ImportError as err: # pragma: no cover raise ImportError( "pzflow package is not installed by default. You can install it with " "`pip install pzflow` or `conda install conda-forge::pzflow`." ) from err # Check that we have training data with all the columns we need. Make a copy # of the data so we can normalize it safely. local_data = data.copy() if isinstance(data, pd.DataFrame) else pd.DataFrame(data) if noise_column is None or not isinstance(noise_column, str): raise ValueError("A noise_column string must be specified to train the flow.") if noise_column not in local_data.columns: raise ValueError(f"noise_column '{noise_column}' not found in data.") # Normalize each column of the data and save the normalization information. normalizer_data = {} for col in local_data.columns: if normalize: log_transform = (col == noise_column) or (col == "bandflux") normalizer = _ColumnNormalizationData(local_data[col], log_transform=log_transform) normalizer_data[col] = normalizer local_data[col] = normalizer.normalize(local_data[col]) else: normalizer_data[col] = None # Train the actual flow using the normalized data. cond_columns = [col for col in local_data.columns if col != noise_column] flow = Flow(data_columns=[noise_column], conditional_columns=cond_columns) _ = flow.train(pd.DataFrame(local_data), verbose=False, **kwargs) return PZFlowNoiseModel( flow_obj=flow, normalizer_data=normalizer_data, )