Source code for zendag.core

import logging
import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple

import hydra
import hydra_zen
from omegaconf import OmegaConf

_log = logging.getLogger(__name__)

ARTIFACTS_ROOT = os.getenv("ARTIFACTS_DIR", "artifacts")  # Default to "artifacts"


# Default stage dir function, can be overridden
[docs] def default_stage_dir_fn(stage: str, name: str) -> str: """Generates the default path for a stage's output directory.""" return f"{ARTIFACTS_ROOT}/{stage}/{name}"
# Default config dir function, can be overridden
[docs] def default_configs_dir_fn(stage: str) -> str: """Generates the default path for storing composed stage configs.""" return f"{ARTIFACTS_ROOT}/{stage}"
[docs] def configure_pipeline( store: hydra_zen.ZenStore, stage_groups: List[str], stage_dir_fn: Callable[[str, str], str] = default_stage_dir_fn, configs_dir_fn: Callable[[str], str] = default_configs_dir_fn, dvc_filename: str = "dvc.yaml", run_script: str = "zendag.run", # Changed from xp_workflow.run config_root: Optional[str] = None, # Optional root for hydra initialization manual_dvc: Optional[dict] = None, ) -> None: """ Configures the DVC pipeline based on Hydra-Zen stored configurations. Generates composed Hydra configs for each stage instance and creates a dvc.yaml file defining the pipeline stages, dependencies, and outputs. Dependencies and outputs are automatically discovered during Hydra config resolution via specially registered resolvers for 'deps' and 'outs'. Args: store: The Hydra-Zen store containing the configured stage components. stage_groups: A list of stage group names (e.g., 'training', 'data_prep') present in the store. Stages within these groups will be processed. stage_dir_fn: A function `fn(stage_name, config_name) -> str` that returns the base output directory path for a given stage instance. Defaults to `artifacts/<stage_name>/<config_name>`. configs_dir_fn: A function `fn(stage_name) -> str` that returns the directory path where composed Hydra configs for a stage group will be stored. Defaults to `artifacts/<stage_name>`. dvc_filename: The name of the DVC pipeline file to generate. Defaults to 'dvc.yaml'. run_script: The Python module path to execute for running a stage (e.g., 'my_project.run'). Defaults to 'zendag.run'. config_root: The path relative to which Hydra should initialize (defaults to cwd). Needed if configs are stored outside the cwd. """ dvc_stages: Dict[str, Dict[str, Any]] = {} all_deps: Dict[Tuple[str, str], List[str]] = {} all_outs: Dict[Tuple[str, str], List[str]] = {} _log.info("Initializing Hydra (version_base=1.3) for configuration composition.") # Initialize Hydra once if needed, respecting config_root if hydra.core.global_hydra.GlobalHydra.instance().is_initialized(): hydra.core.global_hydra.GlobalHydra.instance().clear() if config_root: hydra.initialize(version_base="1.3", config_path=config_root) else: hydra.initialize(version_base="1.3") try: store.add_to_hydra_store(overwrite_ok=True) _log.debug(" Successfully added store configurations to hydra") except Exception as e: _log.error( f" Failed add store configurations to hydra'. Error: {e}", exc_info=True, ) raise e _log.info(f"Processing stage groups: {stage_groups}") for stage in stage_groups: cfg_dir = Path(configs_dir_fn(stage)) cfg_dir.mkdir(exist_ok=True, parents=True) _log.debug(f"Ensured configuration directory exists: {cfg_dir}") stage_items = list(store[stage]) # Get all items (configs) for this stage group if not stage_items: _log.warning(f"No configurations found in store for stage group: '{stage}'") continue _log.info(f"Processing stage group '{stage}' with {len(stage_items)} configuration(s)...") for _, name in stage_items: stage_key = (stage, name) _log.info(f" Processing configuration: '{name}'") # 1. Compose the configuration try: if stage is None: cfg = hydra.compose(name) else: cfg = hydra.compose(overrides=[f"+{stage}={name}"]) _log.debug(f" Successfully composed configuration for '{stage}/{name}'.") except Exception as e: _log.error( f" Failed to compose configuration for '{stage}/{name}'. Error: {e}", exc_info=True, ) continue # 2. Write the composed config (for DVC params tracking) composed_config_path = cfg_dir / f"{name}.yaml" try: composed_config_path.write_text(hydra_zen.to_yaml(cfg)) _log.debug(f" Wrote composed configuration to: {composed_config_path}") except Exception as e: _log.error( f" Failed to write composed configuration to {composed_config_path}. Error: {e}", exc_info=True, ) continue # 3. Resolve config to discover deps/outs via side-effects current_deps: List[str] = [] current_outs: List[str] = [] # IMPORTANT: Register resolvers *before* resolve call # These resolvers have side-effects (appending to lists) OmegaConf.register_new_resolver("outs", lambda k: current_outs.append(k) or k, replace=True) OmegaConf.register_new_resolver("deps", lambda k: current_deps.append(k) or k, replace=True) # Hydra resolver needs the runtime context for the *specific* stage instance OmegaConf.register_new_resolver( "hydra", lambda k, _parent_, _root_: OmegaConf.select( OmegaConf.create({"runtime": {"output_dir": stage_dir_fn(stage, name)}}), k, throw_on_missing=True, ), replace=True, use_cache=False, # Ensure it recalculates for each stage ) # Allow deps/outs to resolve stage_dir_fn if needed OmegaConf.register_new_resolver( "stage_dir", lambda s_name, c_name: stage_dir_fn(s_name, c_name), replace=True, ) _log.debug(f" Resolving configuration for '{stage}/{name}' to discover dependencies and outputs...") try: OmegaConf.resolve(cfg) # Make unique and sort for consistency unique_deps = sorted(list(set(current_deps))) unique_outs = sorted(list(set(current_outs))) all_deps[stage_key] = unique_deps all_outs[stage_key] = unique_outs _log.info(f" Discovered Deps: {unique_deps}") _log.info(f" Discovered Outs: {unique_outs}") except Exception as e: _log.error( f" Failed during config resolution for '{stage}/{name}'. Check interpolations (esp. deps/outs). Error: {e}", exc_info=True, ) # Store empty lists to avoid crashing later, but log error all_deps[stage_key] = [] all_outs[stage_key] = [] # 4. Define DVC stage entry dvc_stage_name = f"{stage}/{name}" # Ensure the output directory path uses the function, not hardcoded 'artifacts' hydra_run_dir = stage_dir_fn(stage, name) dvc_stages[dvc_stage_name] = dict( cmd=( f"python -m {run_script} " f"-cd {configs_dir_fn(stage)} -cn {name} " f"hydra.run.dir='{hydra_run_dir}'" # Use quotes for safety ), deps=all_deps[stage_key], outs=all_outs[stage_key], params=[{f"{composed_config_path.as_posix()}": None}], # Use as_posix for consistency ) _log.debug(f" Defined DVC stage '{dvc_stage_name}'.") # 5. Write dvc.yaml dvc_file = Path(dvc_filename) try: # Use OmegaConf to dump YAML for consistency and potentially better handling dvc_data = {"stages": dvc_stages} if manual_dvc is not None: OmegaConf.merge(dvc_data, manual_dvc) # Convert DictConfig back to primitive types suitable for pyyaml dump if needed # or use OmegaConf.save if hydra_zen.to_yaml doesn't handle it well (it should) dvc_file.write_text(hydra_zen.to_yaml(dvc_data)) # hydra-zen's yaml dump is generally good _log.info(f"Successfully wrote DVC pipeline configuration to: {dvc_file}") except Exception as e: _log.error(f"Failed to write DVC pipeline file {dvc_file}. Error: {e}", exc_info=True)