Parallel Computation
This notebook describes how to use LightCurveLynx to perform parallel computation.
The core simulation function of the model can take a concurrent.futures.Executor object and use that to distribute the computation over multiple processes. This object can be a built in parallelization method, such as ThreadPoolExecutor or ProcessPoolExecutor, or other libraries, such as Dask.
Note:
Each process will load a full version of all the data, so they may be memory intensive.
In some cases, the default parallelization approach may fail with a
PickleErroror something likeAttributeError: Can't get attribute 'XXX' on <module '__main__' (built-in)>. In such cases, we recommend using a third-party library such as Loky, Dask, or Ray; see the examples below.
[1]:
from lightcurvelynx.astro_utils.passbands import PassbandGroup
from lightcurvelynx.models.basic_models import ConstantSEDModel
from lightcurvelynx.obstable.opsim import OpSim
from lightcurvelynx.simulate import simulate_lightcurves
from lightcurvelynx.survey_info import SurveyInfo
# Usually we would not hardcode the path to the passband files, but for this demo we will use a relative path
# to the test data directory so that we do not have to download the files.
table_dir = "../../tests/lightcurvelynx/data/passbands"
/home/docs/checkouts/readthedocs.org/user_builds/lightcurvelynx/envs/latest/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
Prerequisite Data
We start by loading the standard information that we need for any simulation.
An
ObsTablethat includes the survey’s pointing and noise information.A
PassbandGroupfor that survey.
We start by creating a toy survey that includes pointings at two locations (0.0, 10.0) and (180.0, -10.0) in the “g” and “r” bands and loading the passband group.
As of v0.4.0, we wrap these data structures in a SurveyInfo object.
[2]:
obsdata1 = {
"time": [0.0, 1.0, 2.0, 3.0],
"ra": [0.0, 0.0, 180.0, 180.0],
"dec": [10.0, 10.0, -10.0, -10.0],
"filter": ["g", "r", "g", "r"],
"zp": [5.0, 6.0, 7.0, 8.0],
"seeing": [1.12, 1.12, 1.12, 1.12],
"skybrightness": [20.0, 20.0, 20.0, 20.0],
"exptime": [29.2, 29.2, 29.2, 29.2],
"nexposure": [2, 2, 2, 2],
}
obstable1 = OpSim(obsdata1)
passband_group1 = PassbandGroup.from_preset(
preset="LSST",
table_dir=table_dir,
filters=["g", "r", "i"],
)
survey_info1 = SurveyInfo(obstable=obstable1, passbands=passband_group1)
Model Creation
Next we create a model from which to simulate observations. We define a model and its parameters as we would with any other simulation. Here we use a constant SED model (same value for all times and wavelengths). We place the object at (0.0, 10.0) so it is observed by some of the pointings from each survey.
[3]:
model = ConstantSEDModel(brightness=100.0, t0=0.0, ra=0.0, dec=10.0, redshift=0.0, node_label="my_star")
Simulation
The only change in running the simulation in parallel is that we create a ProcessPoolExecutor object and pass that to the simulation function:
[4]:
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = simulate_lightcurves(
model=model,
num_samples=10_000,
survey_info=survey_info1,
obstable_save_cols=["zp_nJy"],
executor=executor,
batch_size=100,
)
print(f"Generated {len(results)} light curves")
print(results["lightcurve"][0])
Simulating: 100%|██████████| 10000/10000 [00:07<00:00, 1290.74obj/s]
Generated 10000 light curves
mjd filter flux fluxerr flux_perfect survey_idx obs_idx \
0 0.0 g -500.315291 895.605471 100.0 0 0
1 1.0 r 620.453877 1015.412583 100.0 0 1
is_saturated zp_nJy
0 False 5.0
1 False 6.0
If we do not provide an executor object, but rather a number of jobs, we automatically create and manage the ProcessPoolExecutor. Here we run the simulation on 4 processes.
[5]:
results = simulate_lightcurves(
model=model,
num_samples=10_000,
survey_info=survey_info1,
obstable_save_cols=["zp_nJy"],
num_jobs=4,
batch_size=100,
)
print(f"Generated {len(results)} light curves")
print(results["lightcurve"][0])
Simulating: 100%|██████████| 10000/10000 [00:07<00:00, 1292.46obj/s]
Generated 10000 light curves
mjd filter flux fluxerr flux_perfect survey_idx obs_idx \
0 0.0 g 557.855214 895.605471 100.0 0 0
1 1.0 r 1688.216270 1015.412583 100.0 0 1
is_saturated zp_nJy
0 False 5.0
1 False 6.0
Loky
Loky is a process-based parallelization library used by joblib. We recommend it when having pickle errors with the default executor (ProcessPoolExecutor).
Note: Loky is not installed by default, so users will need to install dask (pip install 'loky') to run this cell.
[6]:
try:
import loky
executor = loky.get_reusable_executor(max_workers=4)
results = simulate_lightcurves(
model=model,
num_samples=100,
survey_info=survey_info1,
obstable_save_cols=["zp_nJy"],
executor=executor,
)
print(f"Generated {len(results)} light curves")
print(results["lightcurve"][0])
except ImportError:
print("Loky is not installed, skipping Loky example")
Loky is not installed, skipping Loky example
Dask
Dask is a framework for parallel task execution at scale. We recommend Dask for a web-based dashboard to monitor the resource utilization. Dask also supports distributed cluster runs via dask-kubernetes and dask-jobqueue packages.
Here we provide an example for a simple local cluster.
Note: Dask is not installed by default, so users will need to install dask (pip install 'dask[distributed]') to run this cell. You would also need bokeh package for the dashboard (pip install bokeh).
[7]:
try:
import dask.distributed
with dask.distributed.Client() as client:
print(f"Dask dashboard link: {client.dashboard_link}")
results = simulate_lightcurves(
model=model,
num_samples=100,
survey_info=survey_info1,
obstable_save_cols=["zp_nJy"],
executor=client,
)
print(f"Generated {len(results)} light curves")
print(results["lightcurve"][0])
except ImportError:
print("Dask is not installed, skipping Dask example")
Dask is not installed, skipping Dask example
Ray
Ray is a framework for distributed application execution.
We can parallelize the computation via Ray by using ray.util.multiprocessing.Pool.
Note: Ray is not installed by default, so users will need to install dask (pip install -U "ray[default]") to run this cell.
[8]:
try:
import ray
from ray.util.multiprocessing import Pool
with Pool(processes=4) as executor:
results = simulate_lightcurves(
model=model,
num_samples=100,
survey_info=survey_info1,
obstable_save_cols=["zp_nJy"],
executor=executor,
)
print(f"Generated {len(results)} light curves")
print(results["lightcurve"][0])
ray.shutdown()
except ImportError:
print("Ray is not installed, skipping Ray example")
Ray is not installed, skipping Ray example
Saving to Files
Depending on the size of the simulated results, you might not want to load the full set into memory as a single table. The simulate_lightcurves has a function to save each shard (the result of each process) to a unique file. Instead of returning the NestedFrames, the function returns the list of file paths containing the data. Users can then analyze or load these later.
[9]:
file_paths = simulate_lightcurves(
model=model,
num_samples=10_000,
survey_info=survey_info1,
num_jobs=4,
batch_size=1000,
obstable_save_cols=["zp_nJy"],
output_file_path="./scratch/nb_results.parquet",
)
print(file_paths)
Simulating: 100%|██████████| 10000/10000 [00:06<00:00, 1434.22obj/s]
[PosixPath('scratch/nb_results_part0.parquet'), PosixPath('scratch/nb_results_part1.parquet'), PosixPath('scratch/nb_results_part2.parquet'), PosixPath('scratch/nb_results_part3.parquet'), PosixPath('scratch/nb_results_part4.parquet'), PosixPath('scratch/nb_results_part5.parquet'), PosixPath('scratch/nb_results_part6.parquet'), PosixPath('scratch/nb_results_part7.parquet'), PosixPath('scratch/nb_results_part8.parquet'), PosixPath('scratch/nb_results_part9.parquet')]
As you can see the results are broken up into ten different files.
Handling Randomness
By default LightCurveLynx creates a new random number generator (without a fixed seed) so that the parameters will vary from run to run. However it also all allows the users to control the randomness by passing in their own random number generator, which may have a fixed seed. In both these cases the simulate_lightcurves() ensures the correct behavior during parallel runs.
The initial random number generator (provided or default) is used to create a new random seed for each processing shard. If the user had provided a seeded random number generator, the new list of seeds will be predefined. Thus each shard will get a predefined (but different) random number generator. If the initial random number generator was not seeded, the seeds for each shard will themselves vary from run to run.
Working with Fixed Sets of Data
Several of the sampling nodes, such as the TableSampler, GivenValueSampler, or GivenValueList work with predefined lists of data. If the data is drawn randomly from the lists, such as with the GivenValueSampler, this randomness will be handled as described above. Each shard will use a different random seed to start from a different part of the sampling space.
In contrast there are a few nodes whose behavior is designed to be deterministic:
GivenValueListis meant for testing only and will fail if you try to use it in a parallel run.TableSampler(within_order=True) automatically handles the coordination so that each worker uses a disjoint range of rows. For example, if we are sampling 100 values using batches of size 10, the first shard will use rows [0, 9]. The second shard will use rows [10, 19]. And so forth.
[10]:
from lightcurvelynx.math_nodes.given_sampler import TableSampler
table_data = {
"ra": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"dec": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
}
table_node = TableSampler(table_data, in_order=True, node_label="table_node")
model2 = ConstantSEDModel(
brightness=100.0,
t0=0.0,
ra=table_node.ra,
dec=table_node.dec,
redshift=0.0,
node_label="my_star",
)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = simulate_lightcurves(
model=model2,
num_samples=5,
survey_info=survey_info1,
executor=executor,
batch_size=2,
)
for idx in range(5):
print(f"{idx}: RA={results['ra'][idx]}, DEC={results['dec'][idx]}")
Simulating: 100%|██████████| 5/5 [00:00<00:00, 55.97obj/s]
0: RA=1, DEC=10
1: RA=2, DEC=9
2: RA=3, DEC=8
3: RA=4, DEC=7
4: RA=5, DEC=6
Overhead
As with any distributed computation, there will be per-batch overhead. All of the input data (model, obstable, etc.) are pickled and sent to the new processes. It takes time to pack and unpack this information. So care must be taken to ensure the parallelization is worth it.
The user can provide a batch_size parameter to control the target batch size for each process. This allows the user to ensure that each process has enough data to be worth it.