Automatic DAGs & DVC Caching with ZenDag
In the Quickstart Notebook, we saw how to create a single-stage pipeline. ZenDag truly shines when building multi-stage pipelines, as its deps_path
and outs_path
utilities automatically define the Directed Acyclic Graph (DAG) for DVC. We’ll also see DVC’s powerful caching mechanism in action.
Building a Multi-Stage Pipeline
Let’s create a three-stage pipeline:
generate_data
: Creates some raw data.process_data
: Takes raw data, processes it.summarize_data
: Takes processed data, creates a summary.
Stage 1: Generate Data
# src/my_project/stages/generate_data_stage.py
import pandas as pd
from pathlib import Path
import logging
import mlflow
from zendag.mlflow_utils import mlflow_run
log = logging.getLogger(__name__)
@mlflow_run
def generate_data(output_csv_path: str, num_rows: int = 100, base_value: int = 5):
log.info(f"Generating {num_rows} rows of data to {output_csv_path}")
df = pd.DataFrame({
'id': range(num_rows),
'value': [(i % 10) * base_value for i in range(num_rows)]
})
Path(output_csv_path).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(output_csv_path, index=False)
mlflow.log_param("num_rows_generated", num_rows)
mlflow.log_param("base_value", base_value)
log.info("Data generation complete.")
return {"generated_file": output_csv_path}
Config (configs/generate_data_config.py
):
# configs/generate_data_config.py
from hydra_zen import builds, store
from zendag.config_utils import outs_path
from my_project.stages.generate_data_stage import generate_data
GenerateDataConfig = builds(
generate_data,
output_csv_path=outs_path("generated_data.csv"), # This is an output of this stage
num_rows=50
)
store(GenerateDataConfig, group="generate_data", name="default_gen")
Stage 2: Process Data
This stage will take the output of generate_data
as its input.
# src/my_project/stages/process_data_stage.py
import pandas as pd
from pathlib import Path
import logging
import mlflow
from zendag.mlflow_utils import mlflow_run
log = logging.getLogger(__name__)
@mlflow_run
def process_data(input_csv_path: str, processed_output_csv_path: str, scale_factor: float = 0.5):
log.info(f"Processing data from {input_csv_path}")
# Ensure dummy input exists if generate_data wasn't run or its output isn't available
if not Path(input_csv_path).exists():
log.warning(f"Input {input_csv_path} not found for process_data. Creating dummy for example.")
pd.DataFrame({'id': range(10), 'value': range(0,100,10)}).to_csv(input_csv_path, index=False)
df = pd.read_csv(input_csv_path)
df['processed_value'] = df['value'] * scale_factor
Path(processed_output_csv_path).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(processed_output_csv_path, index=False)
mlflow.log_param("processing_scale_factor", scale_factor)
mlflow.log_metric("processed_rows", len(df))
log.info(f"Processed data saved to {processed_output_csv_path}")
return {"processed_file": processed_output_csv_path}
Config (configs/process_data_config.py
):
# configs/process_data_config.py
from hydra_zen import builds, store
from zendag.config_utils import deps_path, outs_path
from my_project.stages.process_data_stage import process_data
ProcessDataConfig = builds(
process_data,
populate_full_signature=True,
# Crucial: input_csv_path depends on the output of the 'generate_data/default_gen' stage
input_csv_path=deps_path( "generated_data.csv", "generate_data","default_gen",),
processed_output_csv_path=outs_path("main_processed_data.csv"),
scale_factor=3.0
)
store(ProcessDataConfig, group="process_data", name="default_proc")
Note the deps_path
:
deps_path("generated_data.csv", gererate_data", "default_gen")
tells ZenDag/DVC that this input comes from theoutput_csv_path
(which wasgenerated_data.csv
) of the stage instance nameddefault_gen
in thegenerate_data
group.The
stage_dir
interpolation provides the base output directory of the dependency stage.
Stage 3: Summarize Data
# src/my_project/stages/summarize_data_stage.py
import pandas as pd
from pathlib import Path
import logging
import mlflow
from zendag.mlflow_utils import mlflow_run
log = logging.getLogger(__name__)
@mlflow_run
def summarize_data(processed_input_csv_path: str, summary_output_txt_path: str):
log.info(f"Summarizing data from {processed_input_csv_path}")
# Ensure dummy input exists if process_data wasn't run or its output isn't available
if not Path(processed_input_csv_path).exists():
log.warning(f"Input {processed_input_csv_path} not found for summarize_data. Creating dummy for example.")
pd.DataFrame({'id': range(5), 'processed_value': range(0,50,10)}).to_csv(processed_input_csv_path, index=False)
df = pd.read_csv(processed_input_csv_path)
mean_val = df['processed_value'].mean()
max_val = df['processed_value'].max()
summary_content = f"Data Summary:\nMean Processed Value: {mean_val}\nMax Processed Value: {max_val}\nRows: {len(df)}"
Path(summary_output_txt_path).parent.mkdir(parents=True, exist_ok=True)
with open(summary_output_txt_path, 'w') as f:
f.write(summary_content)
mlflow.log_metric("mean_processed", mean_val)
mlflow.log_metric("max_processed", max_val)
log.info(f"Summary saved to {summary_output_txt_path}")
return {"summary_file": summary_output_txt_path}
Config (configs/summarize_data_config.py
):
# configs/summarize_data_config.py
from hydra_zen import builds, store
from zendag.config_utils import deps_path, outs_path
from my_project.stages.summarize_data_stage import summarize_data
SummarizeDataConfig = builds(
summarize_data,
populate_full_signature=True,
processed_input_csv_path=deps_path( "main_processed_data.csv", "process_data","default_proc",),
summary_output_txt_path=outs_path("summary_report.txt")
)
store(SummarizeDataConfig, group="summarize_data", name="default_summary")
Updated configure.py
Modify your configure.py
to include these new stage groups and their config imports:
# configure.py (modified)
import hydra_zen
import os
import logging
from pathlib import Path
from zendag.core import configure_pipeline
# Import your config modules
import configs.generate_data_config
import configs.process_data_config
import configs.summarize_data_config
store = hydra_zen.store
STAGE_GROUPS = [
"generate_data",
"process_data",
"summarize_data"
]
if __name__ == "__main__":
# Configure the ZenDag pipeline
logging.info(f"Configuring ZenDag pipeline to generate {DVC_FILENAME}...")
configure_pipeline(
store=store, stage_groups=STAGE_GROUPS,
)
logging.info(f"{DVC_FILENAME} generated for multi-stage pipeline.")
Running configure
and Inspecting the DAG
Run
python configure.py
(orpixi run configure
).Open
dvc.yaml
. You’ll see all three stages. Notice how thedeps
ofprocess_data/default_proc
correctly points to the output path ofgenerate_data/default_gen
, and similarly forsummarize_data
. ZenDag resolved these paths.Visualize the DAG:
dvc dag
You should see a graph like:
generate_data/default_gen -> process_data/default_proc -> summarize_data/default_summary
Running the Pipeline & DVC Caching
First Run:
dvc exp run
All three stages will execute. Outputs will be generated in their respective
artifacts
subdirectories.Second Run (Caching): Run it again immediately:
dvc exp run
DVC will report that all stages are “cached” or “up-to-date” and won’t re-execute the Python code. This is because no inputs or parameters have changed.
Modifying a Parameter & Selective Re-run:
Open
configs/process_data_config.py
and changescale_factor
inProcessDataConfig
(e.g., to4.0
).Run
python configure.py
again. This updatesartifacts/process_data/default_proc.yaml
.Now, run the pipeline:
dvc exp run
Observe the output:
generate_data/default_gen
will likely be “Pipeline-cached”.process_data/default_proc
will re-run because its parameter file (default_proc.yaml
) changed.summarize_data/default_summary
will also re-run because its input (the output ofprocess_data
) changed.
Conclusion
ZenDag’s use of deps_path
and outs_path
allows DVC to automatically understand the relationships between your pipeline stages, forming a DAG. DVC’s caching mechanism then ensures that only necessary parts of your pipeline are re-executed when inputs or parameters change, saving significant time and computational resources. The ${stage_dir:...}
interpolation is key for linking outputs of one stage to inputs of another in a clean way.