ZenDag Quickstart: Your First Pipeline
Welcome to ZenDag! This guide will walk you through creating a simple, single-stage ML pipeline. We’ll cover:
Writing a Python function for a pipeline stage.
Defining its configuration using Hydra-Zen.
Using ZenDag to generate a DVC pipeline.
Running the pipeline with DVC.
Seeing basic data versioning and MLflow logging in action.
ZenDag aims to simplify MLOps by integrating Hydra for configuration, DVC for data/pipeline versioning, and MLflow for experiment tracking.
Prerequisites
Before you start, make sure you have:
A Python environment with
zendag
,pandas
,hydra-core
,hydra-zen
,dvc
, andmlflow
installed. If you used the ZenDag Cookiecutter template,pixi install
should set this up.An MLflow tracking server running (or be prepared for MLflow to use local file storage).
mlflow ui
in a separate terminal can start a local server.
Step 1: Write Your Python Stage Function
With ZenDag, you write Python functions as you normally would. The main constraint is:
All file paths your function reads from or writes to must be arguments to that function.
Let’s create a simple function that reads a CSV, scales a column, and writes a new CSV. We’ll also use the @mlflow_run
decorator from ZenDag to automatically handle MLflow setup.
Create a file src/my_project/stages/simple_transform.py
(assuming your project is my_project
):
# src/my_project/stages/simple_transform.py
import pandas as pd
from pathlib import Path
import logging
import mlflow # We can use mlflow directly for custom logging
# Make sure zendag is importable
from zendag.mlflow_utils import mlflow_run
log = logging.getLogger(__name__)
@mlflow_run # ZenDag decorator for MLflow integration
def transform_data(input_csv_path: str, output_csv_path: str, scale_factor: float = 2.0):
"""
Reads data from input_csv_path, multiplies 'value' column by scale_factor,
and saves to output_csv_path.
"""
log.info(f"Starting data transformation...")
Path(input_csv_path).parent.mkdir(parents=True, exist_ok=True) # Ensure dir exists for dummy data creation
df = pd.read_csv(input_csv_path)
df['scaled_value'] = df['value'] * scale_factor
output_dir = Path(output_csv_path).parent
output_dir.mkdir(parents=True, exist_ok=True)
log.info(f"Writing {len(df)} rows to: {output_csv_path}")
df.to_csv(output_csv_path, index=False)
# Custom MLflow logging (parameters from config are logged automatically by @mlflow_run)
mlflow.log_param("stage_specific_scale_factor", scale_factor)
mlflow.log_metric("num_rows_processed", len(df))
mlflow.log_metric("sum_scaled_value", df['scaled_value'].sum())
log.info("Data transformation complete.")
return {"output_file": output_csv_path, "rows_processed": len(df)}
Step 2: Define Function Call as Configuration (Hydra-Zen)
Next, we’ll use Hydra-Zen to define the call to our transform_data
function as a configuration. This is where we link the function arguments (our file paths) to DVC’s dependency and output tracking using ZenDag utilities.
Crucial: Path arguments in your Hydra-Zen config must use
zendag.config_utils.deps_path("path/to/input")
for inputs andzendag.config_utils.outs_path("path/to/output")
for outputs.
Create configs/transform_config.py
:
# configs/transform_config.py
from hydra_zen import builds, store
from zendag.config_utils import deps_path, outs_path
from my_project.stages.simple_transform import transform_data
# Define the configuration for calling transform_data
TransformConfig = builds(
transform_data, # The Python function this config represents
populate_full_signature=True, # Includes all args from transform_data
# Map function arguments to DVC tracked paths:
input_csv_path=deps_path("data/raw/input.csv"), # DVC dependency
output_csv_path=outs_path("data/processed/output.csv"), # DVC output
# Set other parameters for the function call
scale_factor=1.5
)
# Register this configuration with Hydra-Zen's store
# 'group' is the DVC stage group, 'name' is a specific config instance
store(TransformConfig, group="transform", name="default_transform")
Step 3: Select Configs & Configure Pipeline (configure.py
)
Now, we create a configure.py
script in our project root. This script will:
Import our defined configurations (which registers them with Hydra-Zen’s global store).
Tell ZenDag which stage groups and config instances to include in our DVC pipeline.
Call
zendag.core.configure_pipeline
to generatedvc.yaml
.
Here’s a minimal configure.py
:
# configure.py (in project root)
import hydra_zen
import os
import logging
from pathlib import Path
import pandas as pd # For creating dummy data
from zendag.core import configure_pipeline
import configs.transform_config
store = hydra_zen.store
# List of DVC stage groups to include in the pipeline
STAGE_GROUPS = ["transform"] # Corresponds to the group name in store()
if __name__ == "__main__":
# Configure the ZenDag pipeline
log.info(f"Configuring ZenDag pipeline to generate {DVC_FILENAME}...")
configure_pipeline(
store=store,
stage_groups=STAGE_GROUPS,
dvc_filename=DVC_FILENAME,
run_script="my_project.run_hydra_stage" # Assumed script for running stages
)
log.info(f"dvc.yaml generated successfully.")
Step 4: Run the Pipeline with DVC
Now we execute the workflow:
Run the configuration script: This generates
dvc.yaml
and composed configs inartifacts/
.python configure.py
Inspect
dvc.yaml
: Open the generateddvc.yaml
. You should see something like:stages: transform/default_transform: cmd: python -m my_project.run_hydra_stage -cd artifacts/transform -cn default_transform hydra.run.dir='artifacts/transform/default_transform' deps: - data/raw/input.csv outs: - artifacts/transform/default_transform/data/processed/output.csv # Path relative to artifacts root params: - artifacts/transform/default_transform.yaml
Notice how
deps
andouts
match what we specified withdeps_path
andouts_path
. The output path is automatically prefixed with the stage’s artifact directory.Run the DVC pipeline:
dvc exp run
DVC will execute the
cmd
defined for thetransform/default_transform
stage. You’ll see output from your Python script and MLflow.Check Outputs & Logs:
DVC Output: Look for
artifacts/transform/default_transform/data/processed/output.csv
. A corresponding.dvc
file for this output will also be in that directory.MLflow: If your MLflow server is running (or using local
mlruns
), you should find a new run with parameters likescale_factor
and metrics likenum_rows_processed
.
Data Versioning in Action
DVC tracks your data. Let’s see this:
Modify Input Data: Open
data/raw/input.csv
and change some values.Check DVC Status:
dvc status
DVC will report that
data/raw/input.csv
has changed.Re-run the Pipeline:
dvc exp run # Or, more specifically for reproduction: # dvc repro transform/default_transform
DVC detects the input change and re-executes the
transform/default_transform
stage.Commit Data and Pipeline Changes: DVC works with Git. To save this version of your data and pipeline:
git add dvc.yaml dvc.lock # You might also add the composed config: artifacts/transform/default_transform.yaml git commit -m "Ran transform v1, updated input data" # If you have a DVC remote configured: # dvc push
Conclusion
You’ve successfully created and run your first ZenDag pipeline!
You wrote a standard Python function.
Used Hydra-Zen and ZenDag utilities (
deps_path
,outs_path
) to define its configuration and link it to DVC.ZenDag’s
configure_pipeline
automatically generated thedvc.yaml
.DVC executed the stage, and
@mlflow_run
handled MLflow logging.DVC tracked changes to your input data, enabling reproducible runs.
In the next notebook, we’ll explore how ZenDag helps build more complex, multi-stage pipelines (DAGs) automatically.