Source code for pyEDITH.parse_input

from typing import Union, Dict, Tuple
from pathlib import Path
import astropy.units as u
import numpy as np
from .units import *
import pandas as pd
import os
import logging

logger = logging.getLogger("pyEDITH")


[docs] def parse_input_file( file_path: Union[Path, str], secondary_flag: bool ) -> Tuple[Dict, Dict]: """ Parse an input file and extract variables and secondary variables. This function reads a configuration file, processes its contents line by line, and extracts primary and optional secondary variables. It handles various data types including arrays, strings, and numeric values, and performs special processing for IFS observing mode parameters. The function handles various data types including arrays, strings, and numbers. Comments in the input file should start with ';'. Parameters ---------- file_path : Union[Path, str] Path to the input file secondary_flag : bool Flag indicating whether secondary variables are expected Returns ------- Tuple[Dict, Dict] A tuple containing two dictionaries: variables: dict Primary variables extracted from the file secondary_variables: dict Secondary variables extracted from the file (any non-specified variable will be the same as in the variables dictionary) Raises ------ KeyError If secondary flag is True but no secondary variables are found in the input file, or if IMAGER mode is specified with multiple wavelengths FileNotFoundError If a specified spectrum file cannot be found ValueError If required parameters are missing or if there are issues with the spectrum file """ with open(file_path, "r") as file: content = file.read() # Remove comments and empty lines lines = [ line.split(";")[0].strip() for line in content.split("\n") if line.strip() and not line.strip().startswith(";") ] variables = {} secondary_variables = {} has_secondary = False for line in lines: if "=" in line: key, value = line.split("=", 1) key = key.strip() value = value.strip() # Handle arrays if value.startswith("[") and value.endswith("]"): value = [float(v.strip()) for v in value[1:-1].split(",")] # Handle strings elif value.startswith("'") and value.endswith("'"): value = value[1:-1] elif value.startswith('"') and value.endswith('"'): value = value[1:-1] else: # Handle numbers try: value = float(value) if value.is_integer(): value = int(value) except ValueError: pass # Keep as string if it's not a number if "secondary" in key: has_secondary = True if secondary_flag: secondary_variables[key[10:]] = ( value # it replaces the default value ) else: variables[key] = value if secondary_flag and not has_secondary: raise KeyError( "Secondary flag is True but no secondary variables found in the input file." ) # Handle IFS mode if variables.get("observing_mode") == "IFS": required_columns = ["wavelength", "Fstar_10pc", "Fp/Fs"] # Check if all required columns are provided as lists in the input file if all( col in variables and isinstance(variables[col], list) for col in required_columns ): # Ensure all lists have the same length lengths = [len(variables[col]) for col in required_columns] if len(set(lengths)) != 1: raise ValueError( f"All of {', '.join(required_columns)} must have the same length" ) variables["nlambda"] = lengths[0] # If not all required columns are provided, try to read from spectrum file elif "spectrum_file" in variables: spectrum_file = variables["spectrum_file"] # Check if the file exists and is readable if not os.path.isfile(spectrum_file): raise FileNotFoundError(f"Spectrum file not found: {spectrum_file}") # Read the spectrum file spectrum_df = pd.read_csv(variables["spectrum_file"]) # Ensure the file has exactly 3 columns if len(spectrum_df.columns) != 3: raise ValueError( f"Spectrum file must contain exactly 3 columns (wavelength, stellar flux, planet contrast), but it has {len(spectrum_df.columns)}" ) # Rename the columns to ensure they have the correct names spectrum_df.columns = ["wavelength", "Fstar_10pc", "Fp/Fs"] # Verify that all columns can be converted to float for column in spectrum_df.columns: try: spectrum_df[column] = spectrum_df[column].astype(float) except ValueError: raise ValueError(f"Column '{column}' contains non-numeric values") # Set the wavelength-dependent parameters from the file variables["wavelength"] = spectrum_df["wavelength"].tolist() variables["Fstar_10pc"] = spectrum_df["Fstar_10pc"].tolist() variables["Fp/Fs"] = spectrum_df["Fp/Fs"].tolist() variables["nlambda"] = len(spectrum_df["wavelength"].tolist()) else: raise ValueError( "Required parameters 'wavelength', 'Fstar_10pc', and 'Fp/Fs' are not provided. Please write them explicitly or provide a spectrum_file path." ) if variables.get("observing_mode") == "IMAGER" and isinstance( variables["wavelength"], list ): raise KeyError( "In IMAGER mode you can only use one wavelength at a time. If you are simulating photometry, please run every single wavelength separately. If you want to model a spectrum, please use IFS mode." ) return variables, secondary_variables
[docs] def parse_parameters(parameters: dict, nlambda: int = None) -> dict: """ Parse and process input parameters for simulation. This function handles various parameter types including wavelength-dependent parameters, target-specific parameters, and scalar values. It converts parameters to appropriate data types and ensures arrays have the correct dimensions based on the number of wavelength points. Parameters ---------- parameters : dict A dictionary of input parameters nlambda : int, optional Number of wavelength points, if not specified in parameters Returns ------- dict A dictionary of parsed and processed parameters, including: arrays of length nlambda (wavelength-dependent parameters), Scalar parameters, Coronagraph specifications. Raises ------ ValueError If wavelength information is missing and nlambda is not provided, or if array parameters have incorrect dimensions Note ----- The function assumes one target (ntargs = 1) for now. nmeananom and norbits are defaulted to 1. """ def parse_list_param(key, default_len): value = parameters[key] # Function to convert to float array, preserving Quantity if present def to_float_array(v): if isinstance(v, u.Quantity): return u.Quantity(np.array(v.value, dtype=np.float64), v.unit) else: return np.array(v, dtype=np.float64) if default_len > 1: # Case 1 & 1a: default_len > 1 but value is a pure scalar (including Quantity scalar) if np.isscalar(value) or (isinstance(value, u.Quantity) and value.isscalar): logger.warning( f"{key} should be a list of length {default_len}. pyEDITH will create one assuming the input value for all the elements of the list." ) if isinstance(value, u.Quantity): return u.Quantity(np.full(default_len, value.value), value.unit) else: return np.full(default_len, value) # Case 2: default_len > 1 but value has a length > 1 and != default_len elif ( isinstance(value, (list, np.ndarray, u.Quantity)) and len(value) != default_len ): raise ValueError( f"{key} should be a list of length {default_len}, but it has length {len(value)}." ) # If value is already correct length, just convert to float array return to_float_array(value) else: # Case 3: default_len == 1, return a single element array if isinstance(value, u.Quantity): if value.size > 1: logger.warning( f"{key} should be a list of length 1 but you assigned multiple values. pyEDITH will create a list assuming only the first input value." ) return u.Quantity([value[0].value], value.unit) else: return u.Quantity([value.value], value.unit) elif isinstance(value, (list, np.ndarray)) and len(value) > 1: logger.warning( f"{key} should be a list of length 1 but you assigned multiple values. pyEDITH will create a list assuming only the first input value." ) return to_float_array([value[0]]) else: return to_float_array([value]) parsed_params = {} # NLAMBDA if "wavelength" in parameters.keys(): if np.isscalar(parameters["wavelength"]) or ( isinstance(parameters["wavelength"], u.Quantity) and np.isscalar(parameters["wavelength"].value) ): parsed_params["nlambda"] = 1 else: parsed_params["nlambda"] = len(parameters["wavelength"]) parsed_params["wavelength"] = parse_list_param( "wavelength", parsed_params["nlambda"] ) elif nlambda is not None: parsed_params["nlambda"] = nlambda else: raise ValueError( "pyEDITH does not have access to wavelength here, you should provide nlambda as an argument to this function." ) # Use the determined or provided nlambda for array standardization nlambda = parsed_params["nlambda"] # ------ ARRAYS OF LENGTH NLAMBDA ------ wavelength_params = [ "snr", "T_optical", "epswarmTrcold", "npix_multiplier", "DC", "RN", "tread", "CIC", "QE", "dQE", "IFS_eff", "mag", # used to be [ntargs x nlambda], now just [nlambda] "Fstar_10pc", "Fp/Fs", "delta_mag", # used to be [nmeananom x norbits x ntargs] "F0", # for validation purposes, the calculation of F0 is different in AYO "det_npix_input", # for validation purposes ] parsed_params.update( { key: parse_list_param(key, nlambda) for key in list(set(wavelength_params) & set(parameters.keys())) } ) # ------ SCALARS (USED TO BE ARRAYS IN v. 0.2 and earlier) ------ target_params = [ "distance", # used to be [ntargs] "magV", # used to be [ntargs] "FstarV_10pc", "stellar_radius", # used to be [ntargs] "nzodis", # used to be [ntargs] "ra", # used to be [ntargs] "dec", # used to be [ntargs] "delta_mag_min", # used to be [ntargs] "Fp_min/Fs", "semimajor_axis", "separation", # used to be ARRAYS OF LENGTH nmeananom x norbits x ntargs (but nmeananom and norbits are defaulted to 1 ] for key in list(set(target_params) & set(parameters.keys())): parsed_params[key] = float(parameters[key]) # ----- SCALARS ---- scalar_params = [ "photometric_aperture_radius", "psf_trunc_ratio", "diameter", "toverhead_fixed", "toverhead_multi", "minimum_IWA", "maximum_OWA", "contrast", "noisefloor_factor", "noisefloor_PPF", "ez_PPF", "bandwidth", "Tcore", "TLyot", "temperature", "T_contamination", "CRb_multiplier", "t_photon_count_input", # only for ETC validation ] for key in list(set(scalar_params) & set(parameters.keys())): parsed_params[key] = float(parameters[key]) # ---- INTEGERS --- if "nrolls" in parameters.keys(): parsed_params["nrolls"] = int(parameters["nrolls"]) # ----- OBSERVATORY SPECS --- for key in [ "observatory_preset", "telescope_type", "coronagraph_type", "detector_type", "observing_mode", ]: if key in parameters.keys(): parsed_params[key] = parameters[key] # Handle boolean parameter if "regrid_wavelength" in parameters.keys(): value = parameters["regrid_wavelength"] if isinstance(value, str): parsed_params["regrid_wavelength"] = value.lower() in ("true", "1", "yes") else: parsed_params["regrid_wavelength"] = bool(value) return parsed_params
[docs] def read_configuration( input_file: Union[Path, str], secondary_flag: bool = False ) -> Tuple[Dict, Dict]: """ Read and parse configuration from an input file. This function reads the input file, extracts parameters, and then parses both the primary and secondary parameters. It serves as a high-level wrapper around parse_input_file() and parse_parameters(). Parameters ---------- input_file : Union[Path, str] Path to the input configuration file secondary_flag : bool, optional Flag indicating whether secondary variables should be processed, default is False Returns ------- Tuple[Dict, Dict] A tuple containing two dictionaries: parsed_parameters: dict Parsed primary parameters parsed_secondary_parameters: dict Parsed secondary parameters (empty if secondary_flag is False) """ parameters, secondary_parameters = parse_input_file(input_file, secondary_flag) parsed_parameters = parse_parameters(parameters) if secondary_flag: # Parse secondary parameters parsed_secondary_parameters = parse_parameters(secondary_parameters) else: parsed_secondary_parameters = {} return parsed_parameters, parsed_secondary_parameters
[docs] def get_observatory_config(parameters: Dict[str, str]) -> Union[str, Dict[str, str]]: """ Generate observatory configuration from parameters. This function extracts observatory configuration information from the parameters dictionary. It either returns a preset name if specified or constructs a custom configuration dictionary with telescope, coronagraph, and detector components. Parameters ---------- parameters : Dict[str, str] Dictionary containing configuration parameters Returns ------- Union[str, Dict[str, str]] Either a string (if using a preset) or a dictionary (for custom configurations) Raises ------ ValueError If any required component type is not specified """ if "observatory_preset" in parameters: config = parameters["observatory_preset"] else: config = {} for component in ["telescope", "coronagraph", "detector"]: component_type = parameters.get(f"{component}_type") if component_type is None: raise ValueError( f"{component.capitalize()} type not specified. Please provide a '{component}_type' parameter or use a preset." ) config[component] = component_type print_observatory_config(config) return config