"""The ArgusObsTable stores observation information from the Argus survey."""
import logging
import astropy.units as u
import numpy as np
import pandas as pd
from astropy.coordinates import SkyCoord
from cdshealpix import skycoord_to_healpix
from mocpy import MOC
from lightcurvelynx.astro_utils.zeropoint import calculate_zp_from_maglim
from lightcurvelynx.consts import GAUSS_EFF_AREA2FWHM_SQ
from lightcurvelynx.obstable.obs_table import ObsTable
_argus_view_radius = 52.0
"""The angular radius of the observation field (in degrees):
https://argus.unc.edu/about
This is not used in search, but provided for the `radius` getter.
"""
_argus_pixel_scale = 1.0
"""The pixel scale for the Argus survey in arcseconds per pixel:
https://argus.unc.edu/specifications
"""
[docs]
class ArgusHealpixObsTable(ObsTable):
"""An ObsTable for observations from the Argus survey in healpix format.
Unlike other ObsTable classes, the ArgusHealpixObsTable does not consist of a table of
pointings, but rather is organized at the healpix level. Each row corresponds
to a healpix pixel and time.
Parameters
----------
table : dict or pandas.core.frame.DataFrame
The table with all the survey information.
colmap : dict
A mapping of standard column names to a list of possible names in the input table.
Each value in the dictionary can be a string or a list of strings.
saturation_mags : dict, optional
A dictionary mapping filter names to their saturation thresholds in magnitudes. The filters
provided must match those in the table. If not provided, Argus-specific defaults will be
used.
**kwargs : dict
Additional keyword arguments to pass to the constructor. This includes overrides
for survey parameters such as:
- dark_electrons : The dark current for the camera in electrons per second per pixel.
- gain: The gain for the camera in electrons per ADU.
- pixel_scale: The pixel scale for the camera in arcseconds per pixel.
- radius: The angular radius of the observations (in degrees).
- read_noise: The readout noise for the camera in electrons per pixel.
"""
# Column names from the Argus simulation files.
_argus_sim_colmap = {
"dark_current": "dark_current", # The electrons/pixel/exposure
"dec": "dec", # degrees
"exptime": "expTime", # seconds
"maglim": "limmag", # magnitudes
"ra": "ra", # degrees
"seeing": "seeing", # arcseconds
"skybrightness": "sky_brightness", # mag/arcsec^2
"sky_bg_e": "sky_electrons", # electrons/pixel/exposure
"time": "epoch", # MJD
}
_default_colnames = _argus_sim_colmap
# Default survey values: https://argus.unc.edu/specifications
_default_survey_values = {
"nexposure": 1,
"pixel_scale": _argus_pixel_scale,
"radius": _argus_view_radius,
"read_noise": 1.4, # e-/pixel
"zp_err_mag": 0.0, # Placeholder for now.
"survey_name": "Argus",
}
def __init__(
self,
table,
*,
colmap=None,
apply_saturation=True,
saturation_mags=None,
nside=None,
**kwargs,
):
# Set some default values.
self._spatial_data = None
self._healpix_nside = nside
self._healpix_depth = None
# If the input is a dictionary, convert it to a DataFrame.
if isinstance(table, dict):
table = pd.DataFrame(table)
# The table uses the healpix IDs as the index. We bring this into its own column for easier access
# and reset the indices.
if "healpix" in table.columns:
table = table.reset_index() # Just reset the index if healpix is already a column.
elif table.index.name in ["healpix_id", "healpix"]:
table = table.rename(columns={table.index.name: "healpix"}).reset_index()
else:
raise ValueError(
"The input table must have healpix IDs as the index or in a column named 'healpix'."
)
# Use the default column mapping if one is not provided.
if colmap is None:
colmap = self._default_colnames
# If filter is not provided, use 'g' for all observations.
if "filter" not in table.columns:
table["filter"] = "g"
# Check the unsupported terms in the kwargs and raise an error if they are provided.
if "detector_footprint" in kwargs or "wcs" in kwargs: # pragma: no cover
raise ValueError("ArgusObsTable does not support detector footprints.")
super().__init__(
table=table,
colmap=colmap,
apply_saturation=apply_saturation,
saturation_mags=saturation_mags,
**kwargs,
)
@property
[docs]
def healpix_nside(self):
"""Return the nside of the healpix pixels in the table."""
return self._healpix_nside
@property
[docs]
def healpix_depth(self):
"""Return the depth of the healpix pixels in the table."""
return self._healpix_depth
def _build_spatial_data(self):
"""Construct a mapping of healpix id to row number from the ObsTable."""
if self._healpix_nside is None:
if "nside" in self._table.columns:
self._healpix_nside = self._table["nside"].iloc[0]
else:
raise ValueError(
"nside must be provided for ArgusHealpixObsTable construction or "
"as a column in the table."
)
# Check all nside values are the same.
if "nside" in self._table.columns:
nside = self._table["nside"].to_numpy()
if not np.all(nside == self._healpix_nside):
raise ValueError(
"Inconsistent nside values found in the table. Expected all nside values"
f"to be {self._healpix_nside}, but found values: {np.unique(nside)}"
)
# Compute the depth.
self._healpix_depth = int(np.log2(self._healpix_nside))
# Build a mapping of healpix id to row number.
self._spatial_data = {}
index = self._table["healpix"].to_numpy()
for idx in np.unique(index):
self._spatial_data[idx] = np.where(index == idx)[0]
[docs]
def build_moc(self, max_depth=None, **kwargs):
"""Build a Multi-Order Coverage Map from the regions in the data set.
These are built directly from the healpix pixels.
Parameters
----------
max_depth : int, optional
The maximum depth of the MOC. Default is the depth of the healpix pixels
in the table.
**kwargs : dict
Additional keyword arguments to pass to the MOC construction. Not currently used,
but accepted for consistency with the ObsTable interface.
Returns
-------
MOC
The Multi-Order Coverage Map constructed from the data set.
"""
if max_depth is None:
max_depth = self._healpix_depth
logger = logging.getLogger(__name__)
logger.debug(f"Building MOC from ArgusHealpixObsTable at depth={max_depth}.")
moc = MOC.from_healpix_cells(
np.array(list(self._spatial_data.keys())),
depth=self._healpix_depth,
max_depth=max_depth,
)
return moc
def _derive_noise_columns(self):
"""Derive any missing noise-related columns (e.g. zero points) from the existing columns
and survey values.
"""
# Compute the dark current in electrons/pixel/second.
if "dark_current" not in self and "dark_electrons" in self and "exptime" in self:
# Compute the dark current in electrons per pixel per second from the dark current
# in electrons per pixel per exposure and the exposure time.
dark_current = self["dark_electrons"] / self["exptime"]
self.add_column("dark_current", dark_current)
# Compute the zero points in nJy if not already present and if the necessary columns are available.
if "zp" not in self:
zp_deps = [
"dark_current",
"exptime",
"maglim",
"seeing",
"sky_bg_e",
"read_noise",
"nexposure",
"pixel_scale",
]
if all(col in self for col in zp_deps):
# Compute the full-width at half-maximum of the PSF in pixels from the
# seeing (in arcseconds) and the pixel scale (in arcseconds per pixel).
fwhm_px = self["seeing"] / self["pixel_scale"]
# Compute the zero points from the 5-sigma depth (and other parameters).
zp_vals = calculate_zp_from_maglim(
maglim=self["maglim"],
sky_bg_electrons=self["sky_bg_e"],
fwhm_px=fwhm_px,
read_noise=self["read_noise"],
dark_current=self["dark_current"],
exptime=self["exptime"],
nexposure=self["nexposure"],
)
self.add_column("zp", zp_vals)
# Compute the PSF footprint in pixels.
if "psf_footprint" not in self and "seeing" in self and "pixel_scale" in self:
psf_footprint = GAUSS_EFF_AREA2FWHM_SQ * (self["seeing"] / self["pixel_scale"]) ** 2
self.add_column("psf_footprint", psf_footprint)
[docs]
def range_search(self, query_ra, query_dec, *, radius=None, t_min=None, t_max=None):
"""Return the indices of the pointings that fall within the field
of view of the query point(s).
Note that radius is not used since everything is already stored at the healpix level,
but it is accepted as a parameter for consistency with the ObsTable interface.
Parameters
----------
query_ra : float or numpy.ndarray
The query right ascension (in degrees).
query_dec : float or numpy.ndarray
The query declination (in degrees).
radius : float or None, optional
Not used in this implementation since the data is already organized at the healpix level.
t_min : float, numpy.ndarray or None, optional
The minimum time (in MJD) for the observations to consider.
If None, no time filtering is applied.
t_max : float, numpy.ndarray or None, optional
The maximum time (in MJD) for the observations to consider.
If None, no time filtering is applied.
Returns
-------
inds : list[int] or list[numpy.ndarray]
Depending on the input, this is either a list of indices for a single query point
or a list of arrays (of indices) for an array of query points.
"""
# If the query RA and Dec are scalars, convert them to 1D arrays for consistent processing.
is_scalar = np.isscalar(query_ra) and np.isscalar(query_dec)
query_ra = np.atleast_1d(query_ra)
query_dec = np.atleast_1d(query_dec)
if len(query_ra) != len(query_dec):
raise ValueError("Query RA and Dec must have the same length.")
# Bulk compute the healpix ids for all query points.
coords = SkyCoord(query_ra * u.deg, query_dec * u.deg, frame="icrs")
healpix = skycoord_to_healpix(coords, self._healpix_depth)
# For each query point, get the rows and apply time filtering if specified.
inds = []
for hp in healpix:
if hp in self._spatial_data:
row_inds = self._spatial_data[hp]
# Apply time filtering if specified.
if t_min is not None or t_max is not None:
if t_min is not None:
times = self._table["time"].iloc[row_inds].to_numpy()
row_inds = row_inds[times >= t_min]
if t_max is not None:
times = self._table["time"].iloc[row_inds].to_numpy()
row_inds = row_inds[times <= t_max]
inds.append(row_inds)
else:
inds.append(np.array([], dtype=int))
# If the input was a single query point, return a single array of indices instead of a list of arrays.
if is_scalar:
inds = inds[0]
return inds