from typing import Union, Dict, Tuple
from pathlib import Path
import astropy.units as u
import numpy as np
from .units import *
import pandas as pd
import os
import logging
logger = logging.getLogger("pyEDITH")
[docs]
def parse_parameters(parameters: dict, nlambda: int = None) -> dict:
"""
Parse and process input parameters for simulation.
This function handles various parameter types including wavelength-dependent parameters,
target-specific parameters, and scalar values. It converts parameters to appropriate
data types and ensures arrays have the correct dimensions based on the number of
wavelength points.
Parameters
----------
parameters : dict
A dictionary of input parameters
nlambda : int, optional
Number of wavelength points, if not specified in parameters
Returns
-------
dict
A dictionary of parsed and processed parameters, including: arrays of length
nlambda (wavelength-dependent parameters), Scalar parameters, Coronagraph
specifications.
Raises
------
ValueError
If wavelength information is missing and nlambda is not provided,
or if array parameters have incorrect dimensions
Note
-----
The function assumes one target (ntargs = 1) for now.
nmeananom and norbits are defaulted to 1.
"""
def parse_list_param(key, default_len):
value = parameters[key]
# Function to convert to float array, preserving Quantity if present
def to_float_array(v):
if isinstance(v, u.Quantity):
return u.Quantity(np.array(v.value, dtype=np.float64), v.unit)
else:
return np.array(v, dtype=np.float64)
if default_len > 1:
# Case 1 & 1a: default_len > 1 but value is a pure scalar (including Quantity scalar)
if np.isscalar(value) or (isinstance(value, u.Quantity) and value.isscalar):
logger.warning(
f"{key} should be a list of length {default_len}. pyEDITH will create one assuming the input value for all the elements of the list."
)
if isinstance(value, u.Quantity):
return u.Quantity(np.full(default_len, value.value), value.unit)
else:
return np.full(default_len, value)
# Case 2: default_len > 1 but value has a length > 1 and != default_len
elif (
isinstance(value, (list, np.ndarray, u.Quantity))
and len(value) != default_len
):
raise ValueError(
f"{key} should be a list of length {default_len}, but it has length {len(value)}."
)
# If value is already correct length, just convert to float array
return to_float_array(value)
else:
# Case 3: default_len == 1, return a single element array
if isinstance(value, u.Quantity):
if value.size > 1:
logger.warning(
f"{key} should be a list of length 1 but you assigned multiple values. pyEDITH will create a list assuming only the first input value."
)
return u.Quantity([value[0].value], value.unit)
else:
return u.Quantity([value.value], value.unit)
elif isinstance(value, (list, np.ndarray)) and len(value) > 1:
logger.warning(
f"{key} should be a list of length 1 but you assigned multiple values. pyEDITH will create a list assuming only the first input value."
)
return to_float_array([value[0]])
else:
return to_float_array([value])
parsed_params = {}
# NLAMBDA
if "wavelength" in parameters.keys():
if np.isscalar(parameters["wavelength"]) or (
isinstance(parameters["wavelength"], u.Quantity)
and np.isscalar(parameters["wavelength"].value)
):
parsed_params["nlambda"] = 1
else:
parsed_params["nlambda"] = len(parameters["wavelength"])
parsed_params["wavelength"] = parse_list_param(
"wavelength", parsed_params["nlambda"]
)
elif nlambda is not None:
parsed_params["nlambda"] = nlambda
else:
raise ValueError(
"pyEDITH does not have access to wavelength here, you should provide nlambda as an argument to this function."
)
# Use the determined or provided nlambda for array standardization
nlambda = parsed_params["nlambda"]
# ------ ARRAYS OF LENGTH NLAMBDA ------
wavelength_params = [
"snr",
"T_optical",
"epswarmTrcold",
"npix_multiplier",
"DC",
"RN",
"tread",
"CIC",
"QE",
"dQE",
"IFS_eff",
"mag", # used to be [ntargs x nlambda], now just [nlambda]
"Fstar_10pc",
"Fp/Fs",
"delta_mag", # used to be [nmeananom x norbits x ntargs]
"F0", # for validation purposes, the calculation of F0 is different in AYO
"det_npix_input", # for validation purposes
]
parsed_params.update(
{
key: parse_list_param(key, nlambda)
for key in list(set(wavelength_params) & set(parameters.keys()))
}
)
# ------ SCALARS (USED TO BE ARRAYS IN v. 0.2 and earlier) ------
target_params = [
"distance", # used to be [ntargs]
"magV", # used to be [ntargs]
"FstarV_10pc",
"stellar_radius", # used to be [ntargs]
"nzodis", # used to be [ntargs]
"ra", # used to be [ntargs]
"dec", # used to be [ntargs]
"delta_mag_min", # used to be [ntargs]
"Fp_min/Fs",
"semimajor_axis",
"separation", # used to be ARRAYS OF LENGTH nmeananom x norbits x ntargs (but nmeananom and norbits are defaulted to 1
]
for key in list(set(target_params) & set(parameters.keys())):
parsed_params[key] = float(parameters[key])
# ----- SCALARS ----
scalar_params = [
"photometric_aperture_radius",
"psf_trunc_ratio",
"diameter",
"toverhead_fixed",
"toverhead_multi",
"minimum_IWA",
"maximum_OWA",
"contrast",
"noisefloor_factor",
"noisefloor_PPF",
"ez_PPF",
"bandwidth",
"Tcore",
"TLyot",
"temperature",
"T_contamination",
"CRb_multiplier",
"t_photon_count_input", # only for ETC validation
]
for key in list(set(scalar_params) & set(parameters.keys())):
parsed_params[key] = float(parameters[key])
# ---- INTEGERS ---
if "nrolls" in parameters.keys():
parsed_params["nrolls"] = int(parameters["nrolls"])
# ----- OBSERVATORY SPECS ---
for key in [
"observatory_preset",
"telescope_type",
"coronagraph_type",
"detector_type",
"observing_mode",
]:
if key in parameters.keys():
parsed_params[key] = parameters[key]
# Handle boolean parameter
if "regrid_wavelength" in parameters.keys():
value = parameters["regrid_wavelength"]
if isinstance(value, str):
parsed_params["regrid_wavelength"] = value.lower() in ("true", "1", "yes")
else:
parsed_params["regrid_wavelength"] = bool(value)
return parsed_params
[docs]
def read_configuration(
input_file: Union[Path, str], secondary_flag: bool = False
) -> Tuple[Dict, Dict]:
"""
Read and parse configuration from an input file.
This function reads the input file, extracts parameters, and then parses both
the primary and secondary parameters. It serves as a high-level wrapper around
parse_input_file() and parse_parameters().
Parameters
----------
input_file : Union[Path, str]
Path to the input configuration file
secondary_flag : bool, optional
Flag indicating whether secondary variables should be processed, default is False
Returns
-------
Tuple[Dict, Dict]
A tuple containing two dictionaries:
parsed_parameters: dict
Parsed primary parameters
parsed_secondary_parameters: dict
Parsed secondary parameters (empty if secondary_flag is False)
"""
parameters, secondary_parameters = parse_input_file(input_file, secondary_flag)
parsed_parameters = parse_parameters(parameters)
if secondary_flag:
# Parse secondary parameters
parsed_secondary_parameters = parse_parameters(secondary_parameters)
else:
parsed_secondary_parameters = {}
return parsed_parameters, parsed_secondary_parameters
[docs]
def get_observatory_config(parameters: Dict[str, str]) -> Union[str, Dict[str, str]]:
"""
Generate observatory configuration from parameters.
This function extracts observatory configuration information from the parameters
dictionary. It either returns a preset name if specified or constructs a custom
configuration dictionary with telescope, coronagraph, and detector components.
Parameters
----------
parameters : Dict[str, str]
Dictionary containing configuration parameters
Returns
-------
Union[str, Dict[str, str]]
Either a string (if using a preset) or a dictionary (for custom configurations)
Raises
------
ValueError
If any required component type is not specified
"""
if "observatory_preset" in parameters:
config = parameters["observatory_preset"]
else:
config = {}
for component in ["telescope", "coronagraph", "detector"]:
component_type = parameters.get(f"{component}_type")
if component_type is None:
raise ValueError(
f"{component.capitalize()} type not specified. Please provide a '{component}_type' parameter or use a preset."
)
config[component] = component_type
print_observatory_config(config)
return config
[docs]
def print_observatory_config(config: Union[str, Dict[str, str]]) -> None:
"""
Print the observatory configuration to the terminal.
This function formats and displays the observatory configuration in a
human-readable format, showing either the preset name or the individual
component selections.
Parameters
----------
config : Union[str, Dict[str, str]]
The observatory configuration, either as a string (preset) or
a dictionary (custom configuration)
"""
logger.info("Observatory Configuration:")
if isinstance(config, str):
logger.info(f" Using preset: {config}")
else:
logger.info(f" Telescope: {config['telescope']}")
logger.info(f" Coronagraph: {config['coronagraph']}")
logger.info(f" Detector: {config['detector']}")
logger.info("") # Add a blank line for better readability