ZenDag Quickstart: Your First Pipeline

Welcome to ZenDag! This guide will walk you through creating a simple, single-stage ML pipeline. We’ll cover:

  1. Writing a Python function for a pipeline stage.

  2. Defining its configuration using Hydra-Zen.

  3. Using ZenDag to generate a DVC pipeline.

  4. Running the pipeline with DVC.

  5. Seeing basic data versioning and MLflow logging in action.

ZenDag aims to simplify MLOps by integrating Hydra for configuration, DVC for data/pipeline versioning, and MLflow for experiment tracking.

Prerequisites

Before you start, make sure you have:

  • A Python environment with zendag, pandas, hydra-core, hydra-zen, dvc, and mlflow installed. If you used the ZenDag Cookiecutter template, pixi install should set this up.

  • An MLflow tracking server running (or be prepared for MLflow to use local file storage). mlflow ui in a separate terminal can start a local server.

Step 1: Write Your Python Stage Function

With ZenDag, you write Python functions as you normally would. The main constraint is:

All file paths your function reads from or writes to must be arguments to that function.

Let’s create a simple function that reads a CSV, scales a column, and writes a new CSV. We’ll also use the @mlflow_run decorator from ZenDag to automatically handle MLflow setup.

Create a file src/my_project/stages/simple_transform.py (assuming your project is my_project):

# src/my_project/stages/simple_transform.py
import pandas as pd
from pathlib import Path
import logging
import mlflow # We can use mlflow directly for custom logging

# Make sure zendag is importable
from zendag.mlflow_utils import mlflow_run

log = logging.getLogger(__name__)

@mlflow_run # ZenDag decorator for MLflow integration
def transform_data(input_csv_path: str, output_csv_path: str, scale_factor: float = 2.0):
    """
    Reads data from input_csv_path, multiplies 'value' column by scale_factor,
    and saves to output_csv_path.
    """
    log.info(f"Starting data transformation...")
    Path(input_csv_path).parent.mkdir(parents=True, exist_ok=True) # Ensure dir exists for dummy data creation

    df = pd.read_csv(input_csv_path)

    df['scaled_value'] = df['value'] * scale_factor

    output_dir = Path(output_csv_path).parent
    output_dir.mkdir(parents=True, exist_ok=True)
    log.info(f"Writing {len(df)} rows to: {output_csv_path}")
    df.to_csv(output_csv_path, index=False)

    # Custom MLflow logging (parameters from config are logged automatically by @mlflow_run)
    mlflow.log_param("stage_specific_scale_factor", scale_factor)
    mlflow.log_metric("num_rows_processed", len(df))
    mlflow.log_metric("sum_scaled_value", df['scaled_value'].sum())

    log.info("Data transformation complete.")
    return {"output_file": output_csv_path, "rows_processed": len(df)}

Step 2: Define Function Call as Configuration (Hydra-Zen)

Next, we’ll use Hydra-Zen to define the call to our transform_data function as a configuration. This is where we link the function arguments (our file paths) to DVC’s dependency and output tracking using ZenDag utilities.

Crucial: Path arguments in your Hydra-Zen config must use zendag.config_utils.deps_path("path/to/input") for inputs and zendag.config_utils.outs_path("path/to/output") for outputs.

Create configs/transform_config.py:

# configs/transform_config.py
from hydra_zen import builds, store
from zendag.config_utils import deps_path, outs_path

from my_project.stages.simple_transform import transform_data

# Define the configuration for calling transform_data
TransformConfig = builds(
    transform_data,  # The Python function this config represents
    populate_full_signature=True,  # Includes all args from transform_data

    # Map function arguments to DVC tracked paths:
    input_csv_path=deps_path("data/raw/input.csv"),  # DVC dependency
    output_csv_path=outs_path("data/processed/output.csv"),  # DVC output

    # Set other parameters for the function call
    scale_factor=1.5
)

# Register this configuration with Hydra-Zen's store
# 'group' is the DVC stage group, 'name' is a specific config instance
store(TransformConfig, group="transform", name="default_transform")

Step 3: Select Configs & Configure Pipeline (configure.py)

Now, we create a configure.py script in our project root. This script will:

  1. Import our defined configurations (which registers them with Hydra-Zen’s global store).

  2. Tell ZenDag which stage groups and config instances to include in our DVC pipeline.

  3. Call zendag.core.configure_pipeline to generate dvc.yaml.

Here’s a minimal configure.py:

# configure.py (in project root)
import hydra_zen
import os
import logging
from pathlib import Path
import pandas as pd # For creating dummy data

from zendag.core import configure_pipeline

import configs.transform_config

store = hydra_zen.store

# List of DVC stage groups to include in the pipeline
STAGE_GROUPS = ["transform"] # Corresponds to the group name in store()

if __name__ == "__main__":
    # Configure the ZenDag pipeline
    log.info(f"Configuring ZenDag pipeline to generate {DVC_FILENAME}...")
    configure_pipeline(
        store=store,
        stage_groups=STAGE_GROUPS,
        dvc_filename=DVC_FILENAME,
        run_script="my_project.run_hydra_stage" # Assumed script for running stages
    )
    log.info(f"dvc.yaml generated successfully.")

Step 4: Run the Pipeline with DVC

Now we execute the workflow:

  1. Run the configuration script: This generates dvc.yaml and composed configs in artifacts/.

    python configure.py
    
  2. Inspect dvc.yaml: Open the generated dvc.yaml. You should see something like:

    stages:
      transform/default_transform:
        cmd: python -m my_project.run_hydra_stage -cd artifacts/transform -cn default_transform hydra.run.dir='artifacts/transform/default_transform'
        deps:
        - data/raw/input.csv 
        outs:
        - artifacts/transform/default_transform/data/processed/output.csv # Path relative to artifacts root
        params:
        - artifacts/transform/default_transform.yaml
    

    Notice how deps and outs match what we specified with deps_path and outs_path. The output path is automatically prefixed with the stage’s artifact directory.

  3. Run the DVC pipeline:

    dvc exp run
    

    DVC will execute the cmd defined for the transform/default_transform stage. You’ll see output from your Python script and MLflow.

  4. Check Outputs & Logs:

    • DVC Output: Look for artifacts/transform/default_transform/data/processed/output.csv. A corresponding .dvc file for this output will also be in that directory.

    • MLflow: If your MLflow server is running (or using local mlruns), you should find a new run with parameters like scale_factor and metrics like num_rows_processed.

Data Versioning in Action

DVC tracks your data. Let’s see this:

  1. Modify Input Data: Open data/raw/input.csv and change some values.

  2. Check DVC Status:

    dvc status
    

    DVC will report that data/raw/input.csv has changed.

  3. Re-run the Pipeline:

    dvc exp run
    # Or, more specifically for reproduction:
    # dvc repro transform/default_transform
    

    DVC detects the input change and re-executes the transform/default_transform stage.

  4. Commit Data and Pipeline Changes: DVC works with Git. To save this version of your data and pipeline:

    git add dvc.yaml dvc.lock 
    # You might also add the composed config: artifacts/transform/default_transform.yaml
    git commit -m "Ran transform v1, updated input data"
    # If you have a DVC remote configured:
    # dvc push
    

Conclusion

You’ve successfully created and run your first ZenDag pipeline!

  • You wrote a standard Python function.

  • Used Hydra-Zen and ZenDag utilities (deps_path, outs_path) to define its configuration and link it to DVC.

  • ZenDag’s configure_pipeline automatically generated the dvc.yaml.

  • DVC executed the stage, and @mlflow_run handled MLflow logging.

  • DVC tracked changes to your input data, enabling reproducible runs.

In the next notebook, we’ll explore how ZenDag helps build more complex, multi-stage pipelines (DAGs) automatically.