Source code for lightcurvelynx.obstable.argus_obstable

"""The ArgusObsTable stores observation information from the Argus survey."""

import logging

import astropy.units as u
import numpy as np
import pandas as pd
from astropy.coordinates import SkyCoord
from cdshealpix import skycoord_to_healpix
from mocpy import MOC

from lightcurvelynx.astro_utils.zeropoint import calculate_zp_from_maglim
from lightcurvelynx.consts import GAUSS_EFF_AREA2FWHM_SQ
from lightcurvelynx.obstable.obs_table import ObsTable

_argus_view_radius = 52.0
"""The angular radius of the observation field (in degrees):
https://argus.unc.edu/about

This is not used in search, but provided for the `radius` getter.
"""

_argus_pixel_scale = 1.0
"""The pixel scale for the Argus survey in arcseconds per pixel:
https://argus.unc.edu/specifications
"""


[docs] class ArgusHealpixObsTable(ObsTable): """An ObsTable for observations from the Argus survey in healpix format. Unlike other ObsTable classes, the ArgusHealpixObsTable does not consist of a table of pointings, but rather is organized at the healpix level. Each row corresponds to a healpix pixel and time. Parameters ---------- table : dict or pandas.core.frame.DataFrame The table with all the survey information. colmap : dict A mapping of standard column names to a list of possible names in the input table. Each value in the dictionary can be a string or a list of strings. saturation_mags : dict, optional A dictionary mapping filter names to their saturation thresholds in magnitudes. The filters provided must match those in the table. If not provided, Argus-specific defaults will be used. **kwargs : dict Additional keyword arguments to pass to the constructor. This includes overrides for survey parameters such as: - dark_electrons : The dark current for the camera in electrons per second per pixel. - gain: The gain for the camera in electrons per ADU. - pixel_scale: The pixel scale for the camera in arcseconds per pixel. - radius: The angular radius of the observations (in degrees). - read_noise: The readout noise for the camera in electrons per pixel. """ # Column names from the Argus simulation files. _argus_sim_colmap = { "dark_current": "dark_current", # The electrons/pixel/exposure "dec": "dec", # degrees "exptime": "expTime", # seconds "maglim": "limmag", # magnitudes "ra": "ra", # degrees "seeing": "seeing", # arcseconds "skybrightness": "sky_brightness", # mag/arcsec^2 "sky_bg_e": "sky_electrons", # electrons/pixel/exposure "time": "epoch", # MJD } _default_colnames = _argus_sim_colmap # Default survey values: https://argus.unc.edu/specifications _default_survey_values = { "nexposure": 1, "pixel_scale": _argus_pixel_scale, "radius": _argus_view_radius, "read_noise": 1.4, # e-/pixel "zp_err_mag": 0.0, # Placeholder for now. "survey_name": "Argus", } def __init__( self, table, *, colmap=None, apply_saturation=True, saturation_mags=None, nside=None, **kwargs, ): # Set some default values. self._spatial_data = None self._healpix_nside = nside self._healpix_depth = None # If the input is a dictionary, convert it to a DataFrame. if isinstance(table, dict): table = pd.DataFrame(table) # The table uses the healpix IDs as the index. We bring this into its own column for easier access # and reset the indices. if "healpix" in table.columns: table = table.reset_index() # Just reset the index if healpix is already a column. elif table.index.name in ["healpix_id", "healpix"]: table = table.rename(columns={table.index.name: "healpix"}).reset_index() else: raise ValueError( "The input table must have healpix IDs as the index or in a column named 'healpix'." ) # Use the default column mapping if one is not provided. if colmap is None: colmap = self._default_colnames # If filter is not provided, use 'g' for all observations. if "filter" not in table.columns: table["filter"] = "g" # Check the unsupported terms in the kwargs and raise an error if they are provided. if "detector_footprint" in kwargs or "wcs" in kwargs: # pragma: no cover raise ValueError("ArgusObsTable does not support detector footprints.") super().__init__( table=table, colmap=colmap, apply_saturation=apply_saturation, saturation_mags=saturation_mags, **kwargs, ) @property
[docs] def healpix_nside(self): """Return the nside of the healpix pixels in the table.""" return self._healpix_nside
@property
[docs] def healpix_depth(self): """Return the depth of the healpix pixels in the table.""" return self._healpix_depth
[docs] def set_detector_footprint(self, detector_footprint, wcs=None): """Set the detector footprint, so footprint filtering is done. Parameters ---------- detector_footprint : astropy.regions.SkyRegion, Astropy.regions.PixelRegion, or DetectorFootprint The footprint object for the instrument's detector. wcs : astropy.wcs.WCS, optional The WCS for the footprint. Either this or pixel_scale must be provided if a footprint is provided as a Astropy region. """ raise NotImplementedError("ArgusObsTable does not support detector footprints.")
def _build_spatial_data(self): """Construct a mapping of healpix id to row number from the ObsTable.""" if self._healpix_nside is None: if "nside" in self._table.columns: self._healpix_nside = self._table["nside"].iloc[0] else: raise ValueError( "nside must be provided for ArgusHealpixObsTable construction or " "as a column in the table." ) # Check all nside values are the same. if "nside" in self._table.columns: nside = self._table["nside"].to_numpy() if not np.all(nside == self._healpix_nside): raise ValueError( "Inconsistent nside values found in the table. Expected all nside values" f"to be {self._healpix_nside}, but found values: {np.unique(nside)}" ) # Compute the depth. self._healpix_depth = int(np.log2(self._healpix_nside)) # Build a mapping of healpix id to row number. self._spatial_data = {} index = self._table["healpix"].to_numpy() for idx in np.unique(index): self._spatial_data[idx] = np.where(index == idx)[0]
[docs] def build_moc(self, max_depth=None, **kwargs): """Build a Multi-Order Coverage Map from the regions in the data set. These are built directly from the healpix pixels. Parameters ---------- max_depth : int, optional The maximum depth of the MOC. Default is the depth of the healpix pixels in the table. **kwargs : dict Additional keyword arguments to pass to the MOC construction. Not currently used, but accepted for consistency with the ObsTable interface. Returns ------- MOC The Multi-Order Coverage Map constructed from the data set. """ if max_depth is None: max_depth = self._healpix_depth logger = logging.getLogger(__name__) logger.debug(f"Building MOC from ArgusHealpixObsTable at depth={max_depth}.") moc = MOC.from_healpix_cells( np.array(list(self._spatial_data.keys())), depth=self._healpix_depth, max_depth=max_depth, ) return moc
def _derive_noise_columns(self): """Derive any missing noise-related columns (e.g. zero points) from the existing columns and survey values. """ # Compute the dark current in electrons/pixel/second. if "dark_current" not in self and "dark_electrons" in self and "exptime" in self: # Compute the dark current in electrons per pixel per second from the dark current # in electrons per pixel per exposure and the exposure time. dark_current = self["dark_electrons"] / self["exptime"] self.add_column("dark_current", dark_current) # Compute the zero points in nJy if not already present and if the necessary columns are available. if "zp" not in self: zp_deps = [ "dark_current", "exptime", "maglim", "seeing", "sky_bg_e", "read_noise", "nexposure", "pixel_scale", ] if all(col in self for col in zp_deps): # Compute the full-width at half-maximum of the PSF in pixels from the # seeing (in arcseconds) and the pixel scale (in arcseconds per pixel). fwhm_px = self["seeing"] / self["pixel_scale"] # Compute the zero points from the 5-sigma depth (and other parameters). zp_vals = calculate_zp_from_maglim( maglim=self["maglim"], sky_bg_electrons=self["sky_bg_e"], fwhm_px=fwhm_px, read_noise=self["read_noise"], dark_current=self["dark_current"], exptime=self["exptime"], nexposure=self["nexposure"], ) self.add_column("zp", zp_vals) # Compute the PSF footprint in pixels. if "psf_footprint" not in self and "seeing" in self and "pixel_scale" in self: psf_footprint = GAUSS_EFF_AREA2FWHM_SQ * (self["seeing"] / self["pixel_scale"]) ** 2 self.add_column("psf_footprint", psf_footprint)