ZenDag Quickstart: Your First Pipeline
Welcome to ZenDag! This guide will walk you through creating a simple, single-stage ML pipeline. We’ll cover:
Writing a Python function for a pipeline stage.
Defining its configuration using Hydra-Zen.
Using ZenDag to generate a DVC pipeline.
Running the pipeline with DVC.
Seeing basic data versioning and MLflow logging in action.
ZenDag aims to simplify MLOps by integrating Hydra for configuration, DVC for data/pipeline versioning, and MLflow for experiment tracking.
Prerequisites
Before you start, make sure you have:
A Python environment with
zendag,pandas,hydra-core,hydra-zen,dvc, andmlflowinstalled. If you used the ZenDag Cookiecutter template,pixi installshould set this up.An MLflow tracking server running (or be prepared for MLflow to use local file storage).
mlflow uiin a separate terminal can start a local server.
Step 1: Write Your Python Stage Function
With ZenDag, you write Python functions as you normally would. The main constraint is:
All file paths your function reads from or writes to must be arguments to that function.
Let’s create a simple function that reads a CSV, scales a column, and writes a new CSV. We’ll also use the @mlflow_run decorator from ZenDag to automatically handle MLflow setup.
Create a file src/my_project/stages/simple_transform.py (assuming your project is my_project):
# src/my_project/stages/simple_transform.py
import pandas as pd
from pathlib import Path
import logging
import mlflow # We can use mlflow directly for custom logging
# Make sure zendag is importable
from zendag.mlflow_utils import mlflow_run
log = logging.getLogger(__name__)
@mlflow_run # ZenDag decorator for MLflow integration
def transform_data(input_csv_path: str, output_csv_path: str, scale_factor: float = 2.0):
"""
Reads data from input_csv_path, multiplies 'value' column by scale_factor,
and saves to output_csv_path.
"""
log.info(f"Starting data transformation...")
Path(input_csv_path).parent.mkdir(parents=True, exist_ok=True) # Ensure dir exists for dummy data creation
df = pd.read_csv(input_csv_path)
df['scaled_value'] = df['value'] * scale_factor
output_dir = Path(output_csv_path).parent
output_dir.mkdir(parents=True, exist_ok=True)
log.info(f"Writing {len(df)} rows to: {output_csv_path}")
df.to_csv(output_csv_path, index=False)
# Custom MLflow logging (parameters from config are logged automatically by @mlflow_run)
mlflow.log_param("stage_specific_scale_factor", scale_factor)
mlflow.log_metric("num_rows_processed", len(df))
mlflow.log_metric("sum_scaled_value", df['scaled_value'].sum())
log.info("Data transformation complete.")
return {"output_file": output_csv_path, "rows_processed": len(df)}
Step 2: Define Function Call as Configuration (Hydra-Zen)
Next, we’ll use Hydra-Zen to define the call to our transform_data function as a configuration. This is where we link the function arguments (our file paths) to DVC’s dependency and output tracking using ZenDag utilities.
Crucial: Path arguments in your Hydra-Zen config must use
zendag.config_utils.deps_path("path/to/input")for inputs andzendag.config_utils.outs_path("path/to/output")for outputs.
Create configs/transform_config.py:
# configs/transform_config.py
from hydra_zen import builds, store
from zendag.config_utils import deps_path, outs_path
from my_project.stages.simple_transform import transform_data
# Define the configuration for calling transform_data
TransformConfig = builds(
transform_data, # The Python function this config represents
populate_full_signature=True, # Includes all args from transform_data
# Map function arguments to DVC tracked paths:
input_csv_path=deps_path("data/raw/input.csv"), # DVC dependency
output_csv_path=outs_path("data/processed/output.csv"), # DVC output
# Set other parameters for the function call
scale_factor=1.5
)
# Register this configuration with Hydra-Zen's store
# 'group' is the DVC stage group, 'name' is a specific config instance
store(TransformConfig, group="transform", name="default_transform")
Step 3: Select Configs & Configure Pipeline (configure.py)
Now, we create a configure.py script in our project root. This script will:
Import our defined configurations (which registers them with Hydra-Zen’s global store).
Tell ZenDag which stage groups and config instances to include in our DVC pipeline.
Call
zendag.core.configure_pipelineto generatedvc.yaml.
Here’s a minimal configure.py:
# configure.py (in project root)
import hydra_zen
import os
import logging
from pathlib import Path
import pandas as pd # For creating dummy data
from zendag.core import configure_pipeline
import configs.transform_config
store = hydra_zen.store
# List of DVC stage groups to include in the pipeline
STAGE_GROUPS = ["transform"] # Corresponds to the group name in store()
if __name__ == "__main__":
# Configure the ZenDag pipeline
log.info(f"Configuring ZenDag pipeline to generate {DVC_FILENAME}...")
configure_pipeline(
store=store,
stage_groups=STAGE_GROUPS,
dvc_filename=DVC_FILENAME,
run_script="my_project.run_hydra_stage" # Assumed script for running stages
)
log.info(f"dvc.yaml generated successfully.")
Step 4: Run the Pipeline with DVC
Now we execute the workflow:
Run the configuration script: This generates
dvc.yamland composed configs inartifacts/.python configure.pyInspect
dvc.yaml: Open the generateddvc.yaml. You should see something like:stages: transform/default_transform: cmd: python -m my_project.run_hydra_stage -cd artifacts/transform -cn default_transform hydra.run.dir='artifacts/transform/default_transform' deps: - data/raw/input.csv outs: - artifacts/transform/default_transform/data/processed/output.csv # Path relative to artifacts root params: - artifacts/transform/default_transform.yaml
Notice how
depsandoutsmatch what we specified withdeps_pathandouts_path. The output path is automatically prefixed with the stage’s artifact directory.Run the DVC pipeline:
dvc exp run
DVC will execute the
cmddefined for thetransform/default_transformstage. You’ll see output from your Python script and MLflow.Check Outputs & Logs:
DVC Output: Look for
artifacts/transform/default_transform/data/processed/output.csv. A corresponding.dvcfile for this output will also be in that directory.MLflow: If your MLflow server is running (or using local
mlruns), you should find a new run with parameters likescale_factorand metrics likenum_rows_processed.
Data Versioning in Action
DVC tracks your data. Let’s see this:
Modify Input Data: Open
data/raw/input.csvand change some values.Check DVC Status:
dvc statusDVC will report that
data/raw/input.csvhas changed.Re-run the Pipeline:
dvc exp run # Or, more specifically for reproduction: # dvc repro transform/default_transform
DVC detects the input change and re-executes the
transform/default_transformstage.Commit Data and Pipeline Changes: DVC works with Git. To save this version of your data and pipeline:
git add dvc.yaml dvc.lock # You might also add the composed config: artifacts/transform/default_transform.yaml git commit -m "Ran transform v1, updated input data" # If you have a DVC remote configured: # dvc push
Conclusion
You’ve successfully created and run your first ZenDag pipeline!
You wrote a standard Python function.
Used Hydra-Zen and ZenDag utilities (
deps_path,outs_path) to define its configuration and link it to DVC.ZenDag’s
configure_pipelineautomatically generated thedvc.yaml.DVC executed the stage, and
@mlflow_runhandled MLflow logging.DVC tracked changes to your input data, enabling reproducible runs.
In the next notebook, we’ll explore how ZenDag helps build more complex, multi-stage pipelines (DAGs) automatically.