import numpy as np
import pandas as pd
from astropy.table import Table
from citation_compass import cite_function
from lightcurvelynx import _LIGHTCURVELYNX_BASE_DATA_DIR
from lightcurvelynx.astro_utils.mag_flux import mag2flux
from lightcurvelynx.noise_models.base_noise_models import PoissonFluxNoiseModel
from lightcurvelynx.noise_models.noise_utils import poisson_bandflux_std
from lightcurvelynx.obstable.obs_table import ObsTable
[docs]
ROMAN_PIXEL_SCALE = 0.11
"""The pixel scale for Roman in arcseconds per pixel.
https://roman-docs.stsci.edu/roman-instruments-home/wfi-imaging-mode-user-guide/introduction-to-the-wfi/wfi-quick-reference
"""
_roman_dark_current = 0.0
"""The dark current for Roman in electrons per second per pixel.
< 0.005 e-/s/pix
https://roman-docs.stsci.edu/roman-instruments-home/wfi-imaging-mode-user-guide/wfi-detectors
"""
_roman_fov = 0.28
"""The Roman field of view in square degree
https://roman-docs.stsci.edu/roman-instruments-home/wfi-imaging-mode-user-guide/introduction-to-the-wfi/wfi-quick-reference
"""
_roman_zodi_level_factor = 1.5
"""
Factor for calculating Zodiacal light intensity.
According to the WFI technical documentation:
https://roman.gsfc.nasa.gov/science/WFI_technical.html
- At high galactic latitudes, the Zodi intensity is typically ~1.5× the
minimum.
- For observation into the galactic bulge, the Zodi intensity is typically 2.5-7x the minimum.
"""
_psf_url = "https://raw.githubusercontent.com/RomanSpaceTelescope/roman-technical-information/refs/heads/main/data/WideFieldInstrument/Imaging/PointSpreadFunctions/SummaryPSFstats_center.ecsv"
_zp_url = "https://raw.githubusercontent.com/RomanSpaceTelescope/roman-technical-information/refs/heads/main/data/WideFieldInstrument/Imaging/ZeroPoints/Roman_zeropoints_20240301.ecsv"
_thermal_url = "https://raw.githubusercontent.com/RomanSpaceTelescope/roman-technical-information/refs/heads/main/data/WideFieldInstrument/Imaging/Backgrounds/internal_thermal_backgrounds.ecsv"
_zodiacal_url = "https://raw.githubusercontent.com/RomanSpaceTelescope/roman-technical-information/refs/heads/main/data/WideFieldInstrument/Imaging/ZodiacalLight/zodiacal_light.ecsv"
[docs]
hltds_pass_map = {
"PC": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
"EC1": [17, 18, 19, 20, 21, 22],
"EC2": [31, 32, 33, 34, 35, 36, 37, 38, 39, 40],
}
def _get_roman_char():
psf_table = Table.read(_psf_url, format="csv", comment="#").to_pandas()
zp_table = Table.read(_zp_url, format="csv", comment="#", delimiter=" ").to_pandas()
thermal_table = Table.read(_thermal_url, format="csv", comment="#").to_pandas()
zodiacal_table = Table.read(_zodiacal_url, format="csv", comment="#").to_pandas()
return {
"psf_table": psf_table,
"zp_table": zp_table,
"thermal_table": thermal_table,
"zodiacal_min_table": zodiacal_table,
}
def _get_ma_table(ma_table_path=None):
if ma_table_path is None:
ma_table_file = (
_LIGHTCURVELYNX_BASE_DATA_DIR
/ "roman_characterization/roman_wfi_imaging_multiaccum_tables_with_exptime.csv"
)
ma_table = pd.read_csv(ma_table_file)
return ma_table
def _assign_survey_component(row, pass_map):
"""
Assign survey component name based on pass_map.
"""
if pass_map is None:
return "NA"
else:
pass_number = row["PASS"]
survey_component = "CC"
for key in pass_map:
survey_component = key if pass_number in pass_map[key] else survey_component
return survey_component
[docs]
class RomanPoissonFluxNoiseModel(PoissonFluxNoiseModel):
"""A subclass of PoissonFluxNoiseModel for Roman survey data."""
def __init__(self):
super().__init__()
[docs]
def compute_flux_error(self, bandflux, obs_table, indices):
"""Compute the flux error for the given bandflux and observation parameters.
Parameters
----------
bandflux : array_like of float
Source bandflux in nJy.
obs_table : ObsTable
Table containing the observation parameters needed to compute the noise.
indices : array_like of int
Indices of the observations in the ObsTable for which to compute the noise.
Returns
-------
flux_err : array_like
The standard deviation of the bandflux measurement error (in nJy)
"""
exptime = obs_table["exptime"].iloc[indices].to_numpy()
sky = obs_table.calculate_skynoise(
exptime,
obs_table.safe_get_survey_value("zodi_level"),
obs_table["zodi_countrate_min"].iloc[indices].to_numpy(),
obs_table["thermal_countrate"].iloc[indices].to_numpy(),
)
return poisson_bandflux_std(
bandflux, # nJy
total_exposure_time=exptime, # seconds
exposure_count=1,
psf_footprint=obs_table["N_Eff_Pix"].iloc[indices].to_numpy(),
sky=sky,
zp=obs_table["zp"].iloc[indices].to_numpy() / exptime, # (nJy/s * s)^-1
readout_noise=obs_table.readnoise_func, # e-/pixel
dark_current=obs_table.safe_get_survey_value("dark_current"),
zp_err_mag=obs_table.safe_get_survey_value("zp_err_mag"),
)
[docs]
class RomanObsTable(ObsTable):
"""A subclass for Roman exposure table.
Parameters
----------
table : dict or pandas.core.frame.DataFrame
The table with all the observation information.
colmap : dict
A mapping of standard column names to a list of possible names in the input table.
Each value in the dictionary can be a string or a list of strings.
Defaults to the Roman APT column names, stored in _default_colnames.
ma_table_path : str or pathlib.Path, optional
The path to the Roman MultiAccum table CSV file. If not provided, a default table included
with LightCurveLynx will be used.
saturation_mags : dict, optional
A dictionary mapping filter names to their saturation thresholds in magnitudes. The filters
provided must match those in the table. If not provided, saturation effects will not be applied.
**kwargs : dict
Additional keyword arguments to pass to the ObsTable constructor. This includes overrides
for survey parameters such as:
- dark_current : The dark current for the camera in electrons per second per pixel.
- gain: The CCD gain (in e-/ADU).
- pixel_scale: The pixel scale for the camera in arcseconds per pixel.
- radius: The angular radius of the observations (in degrees).
- read_noise: The standard deviation of the count of readout electrons per pixel.
"""
# Default column names for the ZTF survey data.
_default_colnames = {
"dec": "DEC",
"filter": "BANDPASS",
"ra": "RA",
"zp": "zp_nJy",
"time": "mjd_start",
"exptime": "EXPOSURE_TIME",
}
# Default survey values.
_default_survey_values = {
"dark_current": _roman_dark_current,
"pixel_scale": ROMAN_PIXEL_SCALE,
"survey_name": "Roman",
"radius": np.sqrt(_roman_fov / np.pi),
"zodi_level": _roman_zodi_level_factor,
"zp_err_mag": 0.0,
}
_additional_survey_values = {
"survey_start_time": 61406.0, # Jan 1 2027
"cadence": {
"PC": 20.0,
"EC1": 120.0,
"EC2": 120.0,
"CC": 5.0,
},
"field_time_offset": [0.0, 0.5],
"after_component_delay_time": {
"PC": 100.0,
"EC1": 100.0,
"CC": 100.0,
},
"component_start_time": {},
}
def __init__(
self,
table,
colmap=None,
ma_table_path=None,
saturation_mags=None,
**kwargs,
):
colmap = self._default_colnames if colmap is None else colmap
[docs]
self.ma_table = _get_ma_table(ma_table_path)
roman_char = _get_roman_char()
[docs]
self.zp_table = roman_char["zp_table"]
[docs]
self.psf_table = roman_char["psf_table"]
[docs]
self.thermal_table = roman_char["thermal_table"]
[docs]
self.zodiacal_table = roman_char["zodiacal_min_table"]
[docs]
self.pass_map = hltds_pass_map
self._append_apt_table()
super().__init__(
self.apt_table,
colmap=colmap,
saturation_mags=saturation_mags,
**kwargs,
)
# assign time based on https://roman.gsfc.nasa.gov/science/High_Latitude_Time_Domain_Survey.html
if "time" not in self._table.columns:
self.survey_values.update(self._additional_survey_values)
if not self.survey_values["component_start_time"]:
self.infer_component_start_time(
field_time_offset=self.safe_get_survey_value("field_time_offset"),
after_component_delay_time=self.safe_get_survey_value("after_component_delay_time"),
)
self._assign_time()
[docs]
def update_time(self):
"""
Update the time column assignment.
"""
self._assign_time()
[docs]
def infer_component_start_time(self, field_time_offset=None, after_component_delay_time=None):
"""
Infer the start time of each survey commponent for Roman HLTDS.
Parameters
----------
field_time_offset: tuple or ndarray
The time offset for the north and south field.
after_component_delay_time: dict
The delay time after each survey component.
"""
if field_time_offset is None:
field_time_offset = self.safe_get_survey_value("field_time_offset")
if after_component_delay_time is None:
after_component_delay_time = self.safe_get_survey_value("after_component_delay_time")
##separate north and south
north_field = self._table.dec > 0
south_field = self._table.dec < 0
nepoch = {}
survey_start_time = self.safe_get_survey_value("survey_start_time")
cadence = self.safe_get_survey_value("cadence")
field_time_offset = self.safe_get_survey_value("field_time_offset")
component_start_time = self.safe_get_survey_value("component_start_time")
for field_name, field, time_offset in zip(
["North", "South"], [north_field, south_field], field_time_offset, strict=True
):
table = self._table.loc[field]
nepoch[field_name] = {}
for component in ["PC", "EC1", "CC", "EC2"]:
nepoch[field_name][component] = len(
table.loc[component == table.SURVEY_COMPONENT].PASS.unique()
)
component_start_time[field_name] = {}
t = survey_start_time + time_offset
component_start_time[field_name]["PC"] = t
t += nepoch[field_name]["PC"] * cadence["PC"] + after_component_delay_time["PC"]
component_start_time[field_name]["EC1"] = t
t += nepoch[field_name]["EC1"] * cadence["EC1"] + after_component_delay_time["EC1"]
component_start_time[field_name]["CC"] = t
t += nepoch[field_name]["CC"] * cadence["CC"] + after_component_delay_time["CC"]
component_start_time[field_name]["EC2"] = t
self.survey_values["component_start_time"] = component_start_time
self.survey_values["after_component_delay_time"] = after_component_delay_time
self.survey_values["field_time_offset"] = field_time_offset
def _assign_time(self):
self._table["time"] = 0.0
north_field = self._table.dec > 0
south_field = self._table.dec < 0
component_start_time = self.safe_get_survey_value("component_start_time")
cadence = self.safe_get_survey_value("cadence")
for field_name, field in zip(["North", "South"], [north_field, south_field], strict=True):
table = self._table.loc[field]
for component in ["PC", "EC1", "CC", "EC2"]:
for i, pass_number in enumerate(
np.sort(table.loc[component == table.SURVEY_COMPONENT].PASS.unique())
):
time = component_start_time[field_name][component] + cadence[component] * i
idx = pass_number == table.PASS
self._table.loc[field & idx, "time"] = time
def _append_apt_table(self):
self.apt_table["zp_abmag"] = 0.0
self.apt_table["N_Eff_Pix"] = 0.0
self.apt_table["zodi_countrate_min"] = 0.0
self.apt_table["thermal_countrate"] = 0.0
if "mjd_start" not in self.apt_table.columns:
self.apt_table["time"] = 0.0
for f in np.unique(self.apt_table.BANDPASS):
if f == "PRISM" or f == "GRISM":
continue
zp_abmag = self.zp_table.loc[self.zp_table.element == f, "ABMag"].values[0]
self.apt_table.loc[f == self.apt_table.BANDPASS, "zp_abmag"] = zp_abmag
n_eff_pix = self.psf_table.loc[self.psf_table["Filter"] == f, "N_Eff_Pix"].values[0]
self.apt_table.loc[f == self.apt_table.BANDPASS, "N_Eff_Pix"] = n_eff_pix
sigma_zodi_min = self.zodiacal_table.loc[self.zodiacal_table["filter"] == f, "rate"].values[0]
self.apt_table.loc[f == self.apt_table.BANDPASS, "zodi_countrate_min"] = sigma_zodi_min
sigma_thermal = self.thermal_table.loc[self.thermal_table["filter"] == f, "rate"].values[0]
self.apt_table.loc[f == self.apt_table.BANDPASS, "thermal_countrate"] = sigma_thermal
## We don't need to map exposure time since new APT files includes exposure time column.
## Keeping these for now for the record.
# self.apt_table["exptime"] = 0.0
# for mat_number in np.unique(self.apt_table.MA_TABLE_NUMBER):
# exptime = self.ma_table.loc[self.ma_table["MATableNumber"] == mat_number, "Exptime"].values[0]
# self.apt_table.loc[mat_number == self.apt_table.MA_TABLE_NUMBER, "exptime"] = exptime
# map survey name for HLTDS
self.apt_table["SURVEY_COMPONENT"] = self.apt_table.apply(
lambda row: _assign_survey_component(row, self.pass_map), axis=1
)
self.apt_table["zp_nJy"] = mag2flux(self.apt_table["zp_abmag"])
@cite_function
[docs]
def readnoise_func(self, exptime):
"""
Readout noise function for Roman.
References
----------
Eq. 9 of Rose et al 2025 - https://ui.adsabs.harvard.edu/abs/2025ApJ...988...65R/abstract
Parameters
----------
exptime: float or npt.ArrayLike
Exposure time in seconds.
Returns
-------
sigma_read: float or npt.ArrayLike
Readout noise.
"""
exptime = np.asarray(exptime)
sigma_floor_sq = 25.0
sigma_sq = 3072.0
n = exptime / 3.04
nfactor = (n - 1) / n / (n + 1)
sigma_read = np.sqrt(sigma_floor_sq + sigma_sq * nfactor)
return sigma_read
@cite_function
[docs]
def calculate_skynoise(self, exptime, zodi_scale, zodi_countrate_min, thermal_countrate):
"""Calculate sky noise.
References
----------
Eq. 10 of Rose et al 2025 - https://ui.adsabs.harvard.edu/abs/2025ApJ...988...65R/abstract
Parameters
----------
exptime: float or npt.ArrayLike
Exposure time.
zodi_scale: float or npt.ArrayLike
Zodiacal light scale. The zodiacal light amount is zodi_scale * zodi_countrate_min
zodi_countrate_min: float or npt.ArrayLike
Minimum zodiacal count rate (e-/s/pixel).
thermal_countrate: float or npt.ArrayLike
Thermal count rate (e-/s/pixel).
Returns
-------
sky_variance: float or npt.ArrayLike
Total sky variance (e-^2/pixel).
"""
exptime = np.asarray(exptime)
zodi_scale = np.asarray(zodi_scale)
zodi_countrate_min = np.asarray(zodi_countrate_min)
thermal_countrate = np.asarray(thermal_countrate)
sky_variance = exptime * ((zodi_scale * zodi_countrate_min) + thermal_countrate)
return sky_variance