Building a Serverless ML Lineage: AWS Lambda, DVC, and Prefect

A practical guide on ML lineage fundamentals and MLOps workflow implementation for serverless ML system

Machine LearningDeep LearningPython

By Kuriko IWAI

Kuriko IWAI

Table of Contents

IntroductionWhat is Machine Learning (ML) LineageWhat We’ll Build
The System Architecture - AI Pricing for Retailers
The ML Lineage
Workflow in ActionStep 1: Initiating a DVC ProjectStep 2: The ML Lineage
Stage 1: The ETL Pipeline
Stage 2: The Data Drift Check
Stage 3: Preprocessing
Stage 4: Tuning the Model
Stage 5: Performing Inference
Stage 6: Assessing Model Risk and Fairness
Test in Local
Step 3: Deploying the DVC ProjectStep 4: Configuring Scheduled Run with Prefect
Configuring the Docker Image Registry
Configure Prefect Tasks and Flows
Test in Local
Step 5: Deploying the Application
Test in Local
Wrapping Up

Introduction

Machine learning (ML) lineage is critical in any robust ML system. It lets you track data and model versions, ensuring reproducibility, auditability, and compliance.

While many services on tracking the ML lineage exist, creating a comprehensive and manageable lineage often proves complicated.

In this article, I’ll walk you through integrating a comprehensive ML lineage solution for an ML application deployed on serverless AWS Lambda, covering the end-to-end pipeline stages:

  • ETL pipeline

  • Data drift detection

  • Preprocessing

  • Model tuning

  • Risk and fairness evaluation.

What is Machine Learning (ML) Lineage

Machine learning (ML) lineage is a framework for tracking and understanding the complete lifecycle of a machine learning model.

It contains information at different levels such as:

  • Code: The scripts, libraries, and configurations for model training.

  • Data: The original data, transformations, and features.

  • Experiments: Training runs, hyperparameter tuning results.

  • Models: The trained models and their versions.

  • Predictions: The outputs of deployed models.

ML lineage is essential for multiple reasons:

  • Reproducibility: Recreate the same model and prediction for validation.

  • Root cause analysis: Trace back to the data, code, or configuration change when a model fails in production.

  • Compliance: Some regulated industries require proof of model training to ensure fairness, transparency, and adherence to laws like GDPR and the EU AI Act.

What We’ll Build

In this project, I’ll integrate an ML lineage to the price prediction system on AWS Lambda architecture using DVC, an open-source version control system for ML applications.

The below diagram illustrates the system architecture and the ML lineage I’ll integrate:

Figure A: A comprehensive ML lineage for an ML application on serverless Lambda (Created by Kuriko IWAI)

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure A: A comprehensive ML lineage for an ML application on serverless Lambda (Created by Kuriko IWAI)

The System Architecture - AI Pricing for Retailers

The system operates as a containerized, serverless microservice designed to provide optimal price recommendations to maximize retailer sales.

Its core intelligence comes from AI models trained on historical purchase data predict the quantity of the product sold at various prices, allowing a determination of the best price.

For consistent deployment, the prediction logic and its dependencies are packaged into a Docker container image and stored in AWS ECR (Elastic Container Registry).

The prediction is then served by an AWS Lambda function, which retrieves and runs the container from ECR and exposes the result via AWS API Gateway for the Flask application to consume.

The ML Lineage

In the system, GitHub handles the code lineage, while DVC captures the lineage of:

  • Data (blue boxes): ETL and preprocessing.

  • Experiments (light orange): Hyperparamters tuning and validation.

  • Models and Prediction (dark orange): Final model artifacts and prediction results.

DVC tracks the lineage through separate stages, from data extraction to fairness testing (yellow rows in Figure A).

For each stage, DVC uses an MD5 or SHA256 hash to track and push metadata like artifacts, metrics, and reports to its remote on AWS S3.

The pipeline incorporates Evently AI to handle data drift tests, which are essential for identifying shifts in data distributions that could compromise the model's generalization capabilities in production.

Only models that successfully pass both the data drift and fairness tests can serve predictions via the AWS API gateway (red box in Figure A).

Lastly, this entire lineage process is triggered weekly by the open-source workflow scheduler, Prefect.

Prefect prompts DVC to check for updates in data and scripts, and executes the full lineage process if changes are detected.

Tools Used

Here is a summary of the tools used to track the ML lineage:

  • DVC: An open-source version system for data. Used to track the ML lineage.

  • AWS S3: A secure object storage service from AWS. Used as a remote storage.

  • Evently AI: An open-source ML and LLM observability framework. Used to detect data drift.

  • Prefect: A workflow orchestration engine. Used to manage the schedule run of the lineage.

Workflow in Action

The building process involves five main steps:

  1. Initiate a DVC project

  2. Define the lineage stages with the DVC script dvc.yaml and corresponding Python script

  3. Deploy the DVC project

  4. Configure scheduled run with Prefect

  5. Deploy the application

Let’s walk through each step together.

Step 1: Initiating a DVC Project

The first step is to initiate a DVC project:

1$dvc init
2

This command automatically creates a .dvc directory at the root of the project folder:

1.
2.dvc/
34└── cache/         # [.gitignore] store dvc caches (cached actual data files)
5└── tmp/           # [.gitignore]
6└── .gitignore     # gitignore cache, tmp, and config.local
7└── config         # dvc config for production
8└── config.local   # [.gitignore] dvc config for local
9

DVC maintains a fast, lightweight Git repository by separating the original data in large files from the repository.

The process involves caching the original data in the local .dvc/cache directory, creating a small .dvc metadata file which contains an MD5 hash and a link to the original data file path, pushing only the small metadata files to Git, and pushing the original data to the DVC remote.

Step 2: The ML Lineage

Next, I’ll configure the ML lineage with the following stages:

  1. etl_pipeline: Extract, clean, impute the original data and perform feature engineering.

  2. data_drift_check: Run data drift tests. If they fail, the system exits.

  3. preprocess: Create training, validation, and test datasets.

  4. tune_primary_model: Tune hyperparameters and train the model.

  5. inference_primary_model: Perform inference on the test dataset.

  6. assess_model_risk: Runs risk and fairness tests.

Each stage requires defining the DVC command and its corresponding Python script.

Let’s get started.

Stage 1: The ETL Pipeline

The first stage is to extract, clean, impute the original data, and perform feature engineering.

DVC Configuration

I’ll create the dvc.yaml file at the root of the project directory and add the etl_pipeline stage:

dvc.yaml

1stages:
2  etl_pipeline:
3    # the main command dvc will run in this stage
4    cmd: python src/data_handling/etl_pipeline.py
5
6    # dependencies necessary to run the main command
7    deps:
8      - src/data_handling/etl_pipeline.py
9      - src/data_handling/
10      - src/_utils/
11
12    # output paths for dvc to track
13    outs:
14      - data/original_df.parquet
15      - data/processed_df.parquet
16

The dvc.yaml file defines a sequence of steps (stages) using sections like:

  • cmd: The shell command to be executed for that stage

  • deps: Dependencies that need to run the cmd

  • prams: Default parameters for the cmd defined in the params.yaml file

  • metrics: The metrics files to track

  • reports: The report files to track

  • plots: The DVC plot files for visualization

  • outs: The output files produced by the cmd, which DVC will track

The configuration helps DVC ensure reproducibility by explicitly listing dependencies, outputs, and the commands of each stage. It also helps it manage the lineage by establishing a Directed Acyclic Graph (DAG) of the workflow, linking each stage to the next.

Python Scripts

Next, let’s add Python scripts, ensuring the data is stored using the file paths specified in the outs section of the dvc.yaml file:

src/data_handling/etl_pipeline.py:

1import os
2import argparse
3
4import src.data_handling.scripts as scripts
5from src._utils import main_logger
6
7def etl_pipeline():
8    # extract the entire data
9    df = scripts.extract_original_dataframe()
10
11    # load perquet file
12    ORIGINAL_DF_PATH = os.path.join('data', 'original_df.parquet')
13    df.to_parquet(ORIGINAL_DF_PATH, index=False) # dvc tracked
14
15    # transform
16    df = scripts.structure_missing_values(df=df)
17    df = scripts.handle_feature_engineering(df=df)
18
19    PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
20    df.to_parquet(PROCESSED_DF_PATH, index=False) # dvc tracked
21    return df
22
23# for dvc execution
24if __name__ == '__main__':  
25    parser = argparse.ArgumentParser(description="run etl pipeline")
26    parser.add_argument('--stockcode', type=str, default='', help="specific stockcode to process. empty runs full pipeline.")
27    parser.add_argument('--impute', action='store_true', help="flag to create imputation values")
28    args = parser.parse_args()
29
30    etl_pipeline(stockcode=args.stockcode, impute_stockcode=args.impute)
31

Outputs

The original and structured data in Pandas’ DataFrames are stored in the DVC cache:

  • data/original_df.parquet

  • data/processed_df.parquet

Stage 2: The Data Drift Check

Before jumping into preprocessing, I’ll run data drift tests to ensure any notable drift is in the data, using EventlyAI, an open-source ML and LLM observability framework.

The Data Drift

Data drift refers to any changes in the statistical properties like the mean, variance, or distribution of the data that the model is trained on.

Its major categories include:

  • Covariate Drift (Feature Drift): A change in the input feature distribution.

  • Prior Probability Drift (Label Drift): A change in the target variable distribution.

  • Concept Drift: A change in the relationship between the input data and the target variable.

Data drift compromises the model's generalization capabilities over time, making its detection after deployment crucial.

DVC Configuration

I’ll add the data_drift_check stage right after the etl_pipeline stage:

dvc.yaml:

1stages:
2  etl_pipeline:
3    ###
4  data_drift_check:
5     # the main command dvc will run in this stage
6    cmd: >
7      python src/data_handling/report_data_drift.py
8      data/processed/processed_df.csv 
9      data/processed_df_${params.stockcode}.parquet
10      reports/data_drift_report_${params.stockcode}.html
11      metrics/data_drift_${params.stockcode}.json
12      ${params.stockcode}
13
14    # default values to the parameters (defined in the param.yaml file)
15    params:
16      - params.stockcode
17
18    # dependencies necessary to run the main command
19    deps:
20      - src/data_handling/report_data_drift.py
21      - src/
22
23    # output file pathes for dvc to track
24    plots:
25      - reports/data_drift_report_${params.stockcode}.html:
26
27    metrics:
28      - metrics/data_drift_${params.stockcode}.json:
29          type: json
30

Then, add default values to the parameters passed to the DVC command:

params.yaml:

1params:
2  stockcode: <STOCKCODE OF CHOICE>
3

Python Scripts

After generating an API token from the EventlyAI workplace, I’ll add a Python script to detect data drift and store the results in the metrics variable:

src/data_handling/report_data_drift.py:

1import os
2import sys
3import json
4import pandas as pd
5import datetime
6from dotenv import load_dotenv
7
8from evidently import Dataset, DataDefinition, Report
9from evidently.presets import DataDriftPreset
10from evidently.ui.workspace import CloudWorkspace
11
12import src.data_handling.scripts as scripts
13from src._utils import main_logger
14
15
16if __name__ == '__main__':
17    # initiate evently cloud workspace
18    load_dotenv(override=True)
19    ws = CloudWorkspace(token=os.getenv('EVENTLY_API_TOKEN'), url='https://app.evidently.cloud')
20
21    # retrieve evently project
22    project = ws.get_project('EVENTLY AI PROJECT ID')
23
24    # retrieve paths from the command line args
25    REFERENCE_DATA_PATH = sys.argv[1]
26    CURRENT_DATA_PATH = sys.argv[2]
27    REPORT_OUTPUT_PATH = sys.argv[3]
28    METRICS_OUTPUT_PATH = sys.argv[4]
29    STOCKCODE = sys.argv[5]
30
31    # create folders if not exist
32    os.makedirs(os.path.dirname(REPORT_OUTPUT_PATH), exist_ok=True)
33    os.makedirs(os.path.dirname(METRICS_OUTPUT_PATH), exist_ok=True)
34
35    # extract datasets
36    reference_data_full = pd.read_csv(REFERENCE_DATA_PATH)
37    reference_data_stockcode = reference_data_full[reference_data_full['stockcode'] == STOCKCODE]
38    current_data_stockcode = pd.read_parquet(CURRENT_DATA_PATH)
39
40    # define data schema
41    nums, cats = scripts.categorize_num_cat_cols(df=reference_data_stockcode)
42    for col in nums: current_data_stockcode[col] = pd.to_numeric(current_data_stockcode[col], errors='coerce')
43
44    schema = DataDefinition(numerical_columns=nums, categorical_columns=cats)
45
46    # define evently dataset w/ the data schema
47    eval_data_1 = Dataset.from_pandas(reference_data_stockcode, data_definition=schema)
48    eval_data_2 = Dataset.from_pandas(current_data_stockcode, data_definition=schema)
49
50    # execute drift detection
51    report = Report(metrics=[DataDriftPreset()])
52    data_eval = report.run(reference_data=eval_data_1, current_data=eval_data_2)
53    data_eval.save_html(REPORT_OUTPUT_PATH)
54
55    # create metrics for dvc tracking
56    report_dict = json.loads(data_eval.json())
57    num_drifts = report_dict['metrics'][0]['value']['count']
58    shared_drifts = report_dict['metrics'][0]['value']['share']
59    metrics = dict(
60        drift_detected=bool(num_drifts > 0.0), num_drifts=num_drifts, shared_drifts=shared_drifts,
61        num_cols=nums,
62        cat_cols=cats,
63        stockcode=STOCKCODE,
64        timestamp=datetime.datetime.now().isoformat(),
65    )
66
67    # load metrics file
68    with open(METRICS_OUTPUT_PATH, 'w') as f:
69        json.dump(metrics, f, indent=4)
70        main_logger.info(f'... drift metrics saved to {METRICS_OUTPUT_PATH}... ')
71
72    # stop the system if data drift is found
73    if num_drifts > 0.0: sys.exit('❌ FATAL: data drift detected. stopping pipeline')
74

If data drift is found, the script immediately exits using the final sys.exit command.

Outputs

The script generates two files that DVC will track:

  • reports/data_drift_report.html: The data drift report in a HTML file.

  • metrics/data_drift.json: The data drift metics in a JSON file including drift results along with feature columns and a timestamp:

metrics/data_drift.json:

1{
2    "drift_detected": false,
3    "num_drifts": 0.0,
4    "shared_drifts": 0.0,
5    "num_cols": [
6        "invoiceno",
7        "invoicedate",
8        "unitprice",
9        "product_avg_quantity_last_month",
10        "product_max_price_all_time",
11        "unitprice_vs_max",
12        "unitprice_to_avg",
13        "unitprice_squared",
14        "unitprice_log"
15    ],
16    "cat_cols": [
17        "stockcode",
18        "customerid",
19        "country",
20        "year",
21        "year_month",
22        "day_of_week",
23        "is_registered"
24    ],
25    "timestamp": "2025-10-07T00:24:29.899495"
26}
27

The drift test results are also available on the Evently workplace dashboard for further analysis:

Figure B. Screenshot of the Evently workspace dashboard

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure B. Screenshot of the Evently workspace dashboard

Stage 3: Preprocessing

If no data drift is detected, the linage moves onto the preprocessing stage.

DVC Configuration

I’ll add the preprocess stage right after the data_drift_check stage:

dvc.yaml:

1stages:
2  etl_pipeline:
3    ###
4  data_drift_check:
5    ### 
6  preprocess:
7    cmd: >
8      python src/data_handling/preprocess.py --target_col ${params.target_col} --should_scale ${params.should_scale} --verbose ${params.verbose}
9
10    deps:
11      - src/data_handling/preprocess.py
12      - src/data_handling/
13      - src/_utils
14
15    # params from params.yaml
16    params:
17      - params.target_col
18      - params.should_scale
19      - params.verbose
20
21    outs:
22      # train, val, test datasets
23      - data/x_train_df.parquet
24      - data/x_val_df.parquet
25      - data/x_test_df.parquet
26      - data/y_train_df.parquet
27      - data/y_val_df.parquet
28      - data/y_test_df.parquet
29
30      # preprocessed input datasets
31      - data/x_train_processed.parquet
32      - data/x_val_processed.parquet
33      - data/x_test_processed.parquet
34
35      # trained preprocessor and human readable feature names for shap analysis
36      - preprocessors/column_transformer.pkl
37      - preprocessors/feature_names.json
38

And add default values of the parameters used in the cmd:

params.yaml:

1params:
2  target_col: "quantity"
3  should_scale: True
4  verbose: False
5

Python Scripts

Next, I’ll add a Python script to creating training, validation, and test datasets and preprocessing input data:

1import os
2import argparse
3import json
4import joblib
5import pandas as pd
6import numpy as np
7from sklearn.model_selection import train_test_split
8
9import src.data_handling.scripts as scripts
10from src._utils import main_logger
11
12def preprocess(stockcode: str = '', target_col: str = 'quantity', should_scale: bool = True, verbose: bool = False):
13    # initiate metrics to track (dvc)
14    DATA_DRIFT_METRICS_PATH = os.path.join('metrics', f'data_drift_{args.stockcode}.json')
15
16    if os.path.exists(DATA_DRIFT_METRICS_PATH):
17        with open(DATA_DRIFT_METRICS_PATH, 'r') as f:
18            metrics = json.load(f)
19    else: metrics = dict()
20
21    # load processed df from dvc cache
22    PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
23    df = pd.read_parquet(PROCESSED_DF_PATH)
24
25    # categorize num and cat columns
26    num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)
27    if verbose: main_logger.info(f'num_cols: {num_cols} \ncat_cols: {cat_cols}')
28
29    # structure cat cols
30    if cat_cols:
31        for col in cat_cols: df[col] = df[col].astype('string')
32
33    # initiate preprocessor (either load from the dvc cache or create from scratch)
34    PREPROCESSOR_PATH = os.path.join('preprocessors', 'column_transformer.pkl')
35    try:
36        preprocessor = joblib.load(PREPROCESSOR_PATH)
37    except:
38        preprocessor = scripts.create_preprocessor(num_cols=num_cols if should_scale else [], cat_cols=cat_cols)
39
40    # creates train, val, test datasets
41    y = df[target_col]
42    X = df.copy().drop(target_col, axis='columns')
43
44    # split
45    test_size, random_state = 50000, 42
46    X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)
47    X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=test_size, random_state=random_state, shuffle=False)
48
49    # store train, val, test datasets (dvc track)
50    X_train.to_parquet('data/x_train_df.parquet', index=False)
51    X_val.to_parquet('data/x_val_df.parquet', index=False)
52    X_test.to_parquet('data/x_test_df.parquet', index=False)
53    y_train.to_frame(name=target_col).to_parquet('data/y_train_df.parquet', index=False)
54    y_val.to_frame(name=target_col).to_parquet('data/y_val_df.parquet', index=False)
55    y_test.to_frame(name=target_col).to_parquet('data/y_test_df.parquet', index=False)
56
57    # preprocess
58    X_train = preprocessor.fit_transform(X_train)
59    X_val = preprocessor.transform(X_val)
60    X_test = preprocessor.transform(X_test)
61
62    # store preprocessed input data (dvc track)
63    pd.DataFrame(X_train).to_parquet(f'data/x_train_processed.parquet', index=False)
64    pd.DataFrame(X_val).to_parquet(f'data/x_val_processed.parquet', index=False)
65    pd.DataFrame(X_test).to_parquet(f'data/x_test_processed.parquet', index=False)
66
67    # save feature names (dvc track) for shap
68    with open('preprocessors/feature_names.json', 'w') as f:
69        feature_names = preprocessor.get_feature_names_out()
70        json.dump(feature_names.tolist(), f)
71
72    return  X_train, X_val, X_test, y_train, y_val, y_test, preprocessor
73
74
75if __name__ == '__main__':
76    parser = argparse.ArgumentParser(description='run data preprocessing')
77    parser.add_argument('--stockcode', type=str, default='', help='specific stockcode')
78    parser.add_argument('--target_col', type=str, default='quantity', help='the target column name')
79    parser.add_argument('--should_scale', type=bool, default=True, help='flag to scale numerical features')
80    parser.add_argument('--verbose', type=bool, default=False, help='flag for verbose logging')
81    args = parser.parse_args()
82
83    X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = preprocess(
84        target_col=args.target_col,
85        should_scale=args.should_scale,
86        verbose=args.verbose,
87        stockcode=args.stockcode,
88    )
89

Outputs

This stage generates the necessary datasets for both model training and inference:

Input features:

  • data/x_train_df.parquet

  • data/x_val_df.parquet

  • data/x_test_df.parquet

Preprocessed input features:

  • data/x_train_processed_df.parquet

  • data/x_val_processed_df.parquet

  • data/x_test_processed_df.parquet

Target variables:

  • data/y_train_df.parquet

  • data/y_val_df.parquet

  • data/y_test_df.parquet

The preprocessor and human-readable feature names are also stored in cache for inference and SHAP feature impact analysis later:

  • preprocessors/column_transformer.pk

  • preprocessors/feature_names.json

Lastly, DVC adds the preprocess_status , x_train_processed_path, and preprocessor_path to the data summary metrics file data.json created in Step 2 to track the end-to-end process of Steps 2 and 3:

metrics/data.json:

1{
2    "drift_detected": false,
3    "num_drifts": 0.0,
4    "shared_drifts": 0.0,
5    "num_cols": [
6        "invoiceno",
7        "invoicedate",
8        "unitprice",
9        "product_avg_quantity_last_month",
10        "product_max_price_all_time",
11        "unitprice_vs_max",
12        "unitprice_to_avg",
13        "unitprice_squared",
14        "unitprice_log"
15    ],
16    "cat_cols": [
17        "stockcode",
18        "customerid",
19        "country",
20        "year",
21        "year_month",
22        "day_of_week",
23        "is_registered"
24    ],
25    "timestamp": "2025-10-07T00:24:29.899495",
26
27    # updates
28    "preprocess_status": "completed",
29    "x_train_processed_path": "data/x_train_processed_85123A.parquet",
30    "preprocessor_path": "preprocessors/column_transformer.pkl"
31}
32

Next, let’s move onto the model/experiment lineage.

Stage 4: Tuning the Model

After creating the datasets, I’ll tune and train the primary model. It’s a multi-layered feedforward network on PyTorch, using training and validation datasets created in the preprocess stage.

DVC Configuration

First, I’ll add the tuning_primary_model stage right after the preprocess stage:

dvc.yaml:

1stages:
2  etl_pipeline:
3    ###
4  data_drift_check:
5    ### 
6  preprocess:
7    ### 
8  tune_primary_model:
9    cmd: >
10      python src/model/torch_model/main.py
11      data/x_train_processed_${params.stockcode}.parquet
12      data/x_val_processed_${params.stockcode}.parquet
13      data/y_train_df_${params.stockcode}.parquet
14      data/y_val_df_${params.stockcode}.parquet
15      ${tuning.should_local_save}
16      ${tuning.grid}
17      ${tuning.n_trials}
18      ${tuning.num_epochs}
19      ${params.stockcode}
20
21    deps:
22      - src/model/torch_model/main.py
23      - src/data_handling/
24      - src/model/
25      - src/_utils/
26
27    params:
28      - params.stockcode
29      - tuning.n_trials
30      - tuning.grid
31      - tuning.should_local_save
32
33    outs:
34      - models/production/dfn_best_${params.stockcode}.pth # dvc track
35
36    metrics:
37      - metrics/dfn_val_${params.stockcode}.json: # dvc track
38

Then, add default values to the parameters:

params.yaml:

1params:
2  target_col: "quantity"
3  should_scale: True
4  verbose: False
5
6tuning:
7  n_trials: 100
8  num_epochs: 3000
9  should_local_save: False
10  grid: False
11

Python Scripts

Next, I’ll add the Python scripts to tune the model using Bayesian optimization and then train the optimal model on the complete X_train and y_train datasets created in the preprocess stage.

src/model/torch_model/main.py:

1import os
2import sys
3import json
4import datetime
5import pandas as pd
6import torch
7import torch.nn as nn
8
9import src.model.torch_model.scripts as scripts
10
11
12def tune_and_train(
13        X_train, X_val, y_train, y_val,
14        stockcode: str = '',
15        should_local_save: bool = True,
16        grid: bool = False,
17        n_trials: int = 50,
18        num_epochs: int = 3000
19    ) -> tuple[nn.Module, dict]:
20
21    # perform bayesian optimization
22    best_dfn, best_optimizer, best_batch_size, best_checkpoint = scripts.bayesian_optimization(
23        X_train, X_val, y_train, y_val, n_trials=n_trials, num_epochs=num_epochs
24    )
25
26    # save the model artifact (dvc track)
27    DFN_FILE_PATH = os.path.join('models', 'production', f'dfn_best_{stockcode}.pth' if stockcode else 'dfn_best.pth')
28    os.makedirs(os.path.dirname(DFN_FILE_PATH), exist_ok=True)
29    torch.save(best_checkpoint, DFN_FILE_PATH)
30
31    return best_dfn, best_checkpoint
32
33
34
35def track_metrics_by_stockcode(X_val, y_val, best_model, checkpoint: dict, stockcode: str):
36    MODEL_VAL_METRICS_PATH = os.path.join('metrics', f'dfn_val_{stockcode}.json')
37    os.makedirs(os.path.dirname(MODEL_VAL_METRICS_PATH), exist_ok=True)
38
39    # validate the tuned model
40    _, mse, exp_mae, rmsle = scripts.perform_inference(model=best_model, X=X_val, y=y_val)
41    model_version = f"dfn_{stockcode}_{os.getpid()}"
42    metrics = dict(
43        stockcode=stockcode,
44        mse_val=mse,
45        mae_val=exp_mae,
46        rmsle_val=rmsle,
47        model_version=model_version,
48        hparams=checkpoint['hparams'],
49        optimizer=checkpoint['optimizer_name'],
50        batch_size=checkpoint['batch_size'],
51        lr=checkpoint['lr'],
52        timestamp=datetime.datetime.now().isoformat()
53    )
54    # store the validation results (dvc track)
55    with open(MODEL_VAL_METRICS_PATH, 'w') as f:
56        json.dump(metrics, f, indent=4)
57        main_logger.info(f'... validation metrics saved to {MODEL_VAL_METRICS_PATH} ...')
58
59
60if __name__ == '__main__':
61    # fetch command arg values
62    X_TRAIN_PATH = sys.argv[1]
63    X_VAL_PATH = sys.argv[2]
64    Y_TRAIN_PATH = sys.argv[3]
65    Y_VAL_PATH = sys.argv[4]
66    SHOULD_LOCAL_SAVE = sys.argv[5] == 'True'
67    GRID = sys.argv[6] == 'True'
68    N_TRIALS = int(sys.argv[7])
69    NUM_EPOCHS = int(sys.argv[8])
70    STOCKCODE = str(sys.argv[9])
71
72    # extract training and validation datasets from dvc cache
73    X_train, X_val = pd.read_parquet(X_TRAIN_PATH), pd.read_parquet(X_VAL_PATH)
74    y_train, y_val = pd.read_parquet(Y_TRAIN_PATH), pd.read_parquet(Y_VAL_PATH)
75
76    # tuning
77    best_model, checkpoint = tune_and_train(
78        X_train, X_val, y_train, y_val,
79        stockcode=STOCKCODE, should_local_save=SHOULD_LOCAL_SAVE, grid=GRID, n_trials=N_TRIALS, num_epochs=NUM_EPOCHS
80    )
81
82    # metrics tracking
83    track_metrics_by_stockcode(X_val, y_val, best_model=best_model, checkpoint=checkpoint, stockcode=STOCKCODE)
84

Outputs

The stage generates two files:

  • models/production/dfn_best.pth: Includes model artifacts and checkpoint like the optimal hyperparameter set.

  • metrics/dfn_val.json: Contains tuning results, model version, timestamp, and validation results for MSE, MAE, and RMSLE:

metrics/dfn_val.json:

1{
2    "stockcode": "85123A",
3    "mse_val": 0.6137686967849731,
4    "mae_val": 9.092489242553711,
5    "rmsle_val": 0.6953299045562744,
6    "model_version": "dfn_85123A_35604",
7    "hparams": {
8        "num_layers": 4,
9        "batch_norm": false,
10        "dropout_rate_layer_0": 0.13765888061300502,
11        "n_units_layer_0": 184,
12        "dropout_rate_layer_1": 0.5509872409359128,
13        "n_units_layer_1": 122,
14        "dropout_rate_layer_2": 0.2408753527744403,
15        "n_units_layer_2": 35,
16        "dropout_rate_layer_3": 0.03451842588822594,
17        "n_units_layer_3": 224,
18        "learning_rate": 0.026240673135104406,
19        "optimizer": "adamax",
20        "batch_size": 64
21    },
22    "optimizer": "adamax",
23    "batch_size": 64,
24    "lr": 0.026240673135104406,
25    "timestamp": "2025-10-07T00:31:08.700294"
26}
27

Stage 5: Performing Inference

After the model tuning phase is complete, I’ll configure the test inference for a final evaluation.

The final evaluation uses the MSE, MAE, and RMSLE metrics, as well as SHAP for feature impact and interpretability analysis.

SHAP (SHapley Additive exPlanations) is a framework for quantifying how much each feature contributes to a model’s prediction by using the concept of Shapley values from game theory.

The SHAP values are leveraged for future EDA and feature engineering.

DVC Configuration

First, I’ll add the inference_primary_model stage to the DVC configuration.

This stage has the plots section where DVC will track and version the generated visualization files on the SHAP values.

dvc.yaml:

1stages:
2  etl_pipeline:
3    ###
4  data_drift_check:
5    ### 
6  preprocess:
7    ### 
8  tune_primary_model:
9    ### 
10  inference_primary_model:
11    cmd: >
12      python src/model/torch_model/inference.py
13      data/x_test_processed_${params.stockcode}.parquet
14      data/y_test_df_${params.stockcode}.parquet
15      models/production/dfn_best_${params.stockcode}.pth
16      ${params.stockcode}
17      ${tracking.sensitive_feature_col}
18      ${tracking.privileged_group}
19
20    deps:
21      - src/model/torch_model/inference.py
22      - models/production/
23      - src/
24
25    params:
26      - params.stockcode
27      - tracking.sensitive_feature_col
28      - tracking.privileged_group
29
30    metrics:
31      - metrics/dfn_inf_${params.stockcode}.json: # dvc track
32          type: json
33
34    plots:
35      # shap summary / beeswarm plot for global interpretability
36      - reports/dfn_shap_summary_${params.stockcode}.json:
37          template: simple
38          x: shap_value
39          y: feature_name
40          title: SHAP Beeswarm Plot
41
42      # shap mean absolute vals - feature importance bar plot
43      - reports/dfn_shap_mean_abs_${params.stockcode}.json:
44          template: bar
45          x: mean_abs_shap
46          y: feature_name
47          title: Mean Absolute SHAP Importance
48
49    outs:
50      - data/dfn_inference_results_${params.stockcode}.parquet
51      - reports/dfn_raw_shap_values_${params.stockcode}.parquet # save raw shap vals for detailed analysis later
52

Python Scripts

Next, I’ll add scripts where the trained model performs inference:

src/model/torch_model/inference.py:

1import os
2import sys
3import json
4import datetime
5import numpy as np
6import pandas as pd
7import torch
8import shap
9
10import src.model.torch_model.scripts as scripts
11from src._utils import main_logger
12
13
14if __name__ == '__main__':
15    # load test dataset
16    X_TEST_PATH = sys.argv[1]
17    Y_TEST_PATH = sys.argv[2]
18    X_test, y_test = pd.read_parquet(X_TEST_PATH), pd.read_parquet(Y_TEST_PATH)
19
20    # create X_test w/ column names for shap analysis and sensitive feature tracking
21    X_test_with_col_names = X_test.copy()
22    FEATURE_NAMES_PATH = os.path.join('preprocessors', 'feature_names.json')
23    try:
24        with open(FEATURE_NAMES_PATH, 'r') as f: feature_names = json.load(f)
25    except FileNotFoundError: feature_names = X_test.columns.tolist()
26    if len(X_test_with_col_names.columns) == len(feature_names): X_test_with_col_names.columns = feature_names
27
28    # reconstruct the optimal model tuned in the previous stage
29    MODEL_PATH = sys.argv[3]
30    checkpoint = torch.load(MODEL_PATH)
31    model = scripts.load_model(checkpoint=checkpoint)
32
33    # perform inference
34    y_pred, mse, exp_mae, rmsle = scripts.perform_inference(model=model, X=X_test, y=y_test, batch_size=checkpoint['batch_size'])
35
36    # create result df w/ y_pred, y_true, and sensitive features
37    STOCKCODE = sys.argv[4]
38    SENSITIVE_FEATURE = sys.argv[5]
39    PRIVILEGED_GROUP = sys.argv[6]
40    inference_df = pd.DataFrame(y_pred.cpu().numpy().flatten(), columns=['y_pred'])
41    inference_df['y_true'] = y_test
42    inference_df[SENSITIVE_FEATURE] = X_test_with_col_names[f'cat__{SENSITIVE_FEATURE}_{str(PRIVILEGED_GROUP)}'].astype(bool)
43    inference_df.to_parquet(path=os.path.join('data', f'dfn_inference_results_{STOCKCODE}.parquet'))
44
45    # record inference metrics
46    MODEL_INF_METRICS_PATH = os.path.join('metrics', f'dfn_inf_{STOCKCODE}.json')
47    os.makedirs(os.path.dirname(MODEL_INF_METRICS_PATH), exist_ok=True)
48    model_version = f"dfn_{STOCKCODE}_{os.getpid()}"
49    inf_metrics = dict(
50        stockcode=STOCKCODE,
51        mse_inf=mse,
52        mae_inf=exp_mae,
53        rmsle_inf=rmsle,
54        model_version=model_version,
55        hparams=checkpoint['hparams'],
56        optimizer=checkpoint['optimizer_name'],
57        batch_size=checkpoint['batch_size'],
58        lr=checkpoint['lr'],
59        timestamp=datetime.datetime.now().isoformat()
60    )
61    with open(MODEL_INF_METRICS_PATH, 'w') as f: # dvc track
62        json.dump(inf_metrics, f, indent=4)
63        main_logger.info(f'... inference metrics saved to {MODEL_INF_METRICS_PATH} ...')
64
65
66    ## shap analysis
67    # compute shap vals
68    model.eval()
69
70    # prepare backgdound data
71    X_test_tensor = torch.from_numpy(X_test.values.astype(np.float32)).to(device_type)
72
73    # take the small samples from x_test as background
74    background = X_test_tensor[np.random.choice(X_test_tensor.shape[0], 100, replace=False)].to(device_type)
75
76    # define deepexplainer
77    explainer = shap.DeepExplainer(model, background)
78
79    # compute shap vals
80    shap_values = explainer.shap_values(X_test_tensor) # outputs = numpy array or tensor
81
82    # convert shap array to pandas df
83    if isinstance(shap_values, list): shap_values = shap_values[0]
84    if isinstance(shap_values, torch.Tensor): shap_values = shap_values.cpu().numpy()
85    shap_values = shap_values.squeeze(axis=-1) # type: ignore
86    shap_df = pd.DataFrame(shap_values, columns=feature_names)
87
88    # shap raw data (dvc track)
89    RAW_SHAP_OUT_PATH = os.path.join('reports', f'dfn_raw_shap_values_{STOCKCODE}.parquet')
90    os.makedirs(os.path.dirname(RAW_SHAP_OUT_PATH), exist_ok=True)
91    shap_df.to_parquet(RAW_SHAP_OUT_PATH, index=False)
92    main_logger.info(f'... shap values saved to {RAW_SHAP_OUT_PATH} ...')
93
94    # bar plot of mean abs shap vals (dvc report)
95    mean_abs_shap = shap_df.abs().mean().sort_values(ascending=False)
96    shap_mean_abs_df = pd.DataFrame({'feature_name': feature_names, 'mean_abs_shap': mean_abs_shap.values })
97    MEAN_ABS_SHAP_PATH = os.path.join('reports', f'dfn_shap_mean_abs_{STOCKCODE}.json')
98    shap_mean_abs_df.to_json(MEAN_ABS_SHAP_PATH, orient='records', indent=4)
99

Outputs

This stage generates five output files:

  • data/dfn_inference_result_${params_stockcode}.parquet: Stores prediction results, labeled targets, and any columns with sensitive features like gender, age, income, and more. I’ll use this file for the fairness test in the last stage.

  • metrics/dfn_inf.json: Stores evaluation metrics and tuning results:

1{
2    "stockcode": "85123A",
3    "mse_inf": 0.6841545701026917,
4    "mae_inf": 11.5866117477417,
5    "rmsle_inf": 0.7423332333564758,
6    "model_version": "dfn_85123A_35834",
7    "hparams": {
8        "num_layers": 4,
9        "batch_norm": false,
10        "dropout_rate_layer_0": 0.13765888061300502,
11        "n_units_layer_0": 184,
12        "dropout_rate_layer_1": 0.5509872409359128,
13        "n_units_layer_1": 122,
14        "dropout_rate_layer_2": 0.2408753527744403,
15        "n_units_layer_2": 35,
16        "dropout_rate_layer_3": 0.03451842588822594,
17        "n_units_layer_3": 224,
18        "learning_rate": 0.026240673135104406,
19        "optimizer": "adamax",
20        "batch_size": 64
21    },
22    "optimizer": "adamax",
23    "batch_size": 64,
24    "lr": 0.026240673135104406,
25    "timestamp": "2025-10-07T00:31:12.946405"
26}
27
  • reports/dfn_shap_mean_abs.json: Stores the mean SHAP values:
1[
2    {
3        "feature_name":"num__invoicedate",
4        "mean_abs_shap":0.219255722
5    },
6    {
7        "feature_name":"num__unitprice",
8        "mean_abs_shap":0.1069829418
9    },
10    {
11        "feature_name":"num__product_avg_quantity_last_month",
12        "mean_abs_shap":0.1021453096
13    },
14    {
15        "feature_name":"num__product_max_price_all_time",
16        "mean_abs_shap":0.0855356899
17    },
18...
19]
20
  • reports/dfn_shap_summary.json: Contains the data points necessary to draw the beeswarm/bar plots.

  • reports/dfn_raw_shap_values.parquet: Stores raw SHAP values.

Stage 6: Assessing Model Risk and Fairness

The last stage is to assess risk and fairness of the final inference results.

The Fairness Testing

Fairness testing in ML is the process of systematically evaluating a model’s predictions to ensure they are not unfairly biased toward specific groups defined by sensitive attributes like race and gender.

In this project, I’ll use the registration status is_registered column as a sensitive feature and make sure the Mean Outcome Difference (MOD) is within the specified threshold of 0.1.

The MOD is calculated as the absolute difference between the mean prediction values of the privileged (registered) and unprivileged (unregistered) groups.

DVC Configuration

First, I’ll add the assess_model_risk stage right after the inference_primary_model stage:

dvc.yaml:

1stages:
2  etl_pipeline:
3    ###
4  data_drift_check:
5    ### 
6  preprocess:
7    ### 
8  tune_primary_model:
9    ### 
10  inference_primary_model:
11    ###
12  assess_model_risk:
13    cmd: >
14      python src/model/torch_model/assess_risk_and_fairness.py
15      data/dfn_inference_results_${params.stockcode}.parquet
16      metrics/dfn_risk_fairness_${params.stockcode}.json
17      ${tracking.sensitive_feature_col}
18      ${params.stockcode}
19      ${tracking.privileged_group}
20      ${tracking.mod_threshold}
21
22    deps:
23      - src/model/torch_model/assess_risk_and_fairness.py
24      - src/_utils/
25      - data/dfn_inference_results_${params.stockcode}.parquet # ensure the result df as dependency
26
27    params:
28      - params.stockcode
29      - tracking.sensitive_feature_col
30      - tracking.privileged_group
31      - tracking.mod_threshold
32
33    metrics:
34      - metrics/dfn_risk_fairness_${params.stockcode}.json:
35          type: json
36

Then, add default values to the parameters:

param.yaml:

1params:
2  target_col: "quantity"
3  should_scale: True
4  verbose: False
5
6tuning:
7  n_trials: 100
8  num_epochs: 3000
9  should_local_save: False
10  grid: False
11
12# adding default values to the tracking metrics
13tracking:
14  sensitive_feature_col: "is_registered"
15  privileged_group: 1 # member
16  mod_threshold: 0.1
17

Python Script

The corresponding Python script contains the calculate_fairness_metrics function which performs the risk and fairness assessment:

src/model/torch_model/assess_risk_and_fairness.py:

1import os
2import json
3import datetime
4import argparse
5import pandas as pd
6from sklearn.metrics import mean_absolute_error, mean_squared_error, root_mean_squared_log_error
7
8from src._utils import main_logger
9
10
11def calculate_fairness_metrics(
12        df: pd.DataFrame,
13        sensitive_feature_col: str,
14        label_col: str = 'y_true',
15        prediction_col: str = 'y_pred',
16        privileged_group: int = 1,
17        mod_threshold: float = 0.1,
18    ) -> dict:
19
20    metrics = dict()
21    unprivileged_group = 0 if privileged_group == 1 else 1
22
23    ## 1. risk assessment - predictive performance metrics by group
24    for group, name in zip([unprivileged_group, privileged_group], ['unprivileged', 'privileged']):
25        subset = df[df[sensitive_feature_col] == group]
26        if len(subset) == 0: continue
27
28        y_true = subset[label_col].values
29        y_pred = subset[prediction_col].values
30
31        metrics[f'mse_{name}'] = float(mean_squared_error(y_true, y_pred)) # type: ignore
32        metrics[f'mae_{name}'] = float(mean_absolute_error(y_true, y_pred)) # type: ignore
33        metrics[f'rmsle_{name}'] = float(root_mean_squared_log_error(y_true, y_pred)) # type: ignore
34
35        # mean prediction (outcome disparity component)
36        metrics[f'mean_prediction_{name}'] = float(y_pred.mean()) # type: ignore
37
38    ## 2. bias assessment - fairness metrics
39    # absolute mean error difference
40    mae_diff = metrics.get('mae_unprivileged', 0) - metrics.get('mae_privileged', 0)
41    metrics['mae_diff'] = float(mae_diff)
42
43    # mean outcome difference
44    mod = metrics.get('mean_prediction_unprivileged', 0) - metrics.get('mean_prediction_privileged', 0)
45    metrics['mean_outcome_difference'] = float(mod)
46    metrics['is_mod_acceptable'] = 1 if abs(mod) <= mod_threshold else 0
47
48    return metrics
49
50
51def main():
52    parser = argparse.ArgumentParser(description='assess bias and fairness metrics on model inference results.')
53    parser.add_argument('inference_file_path', type=str, help='parquet file path to the inference results w/ y_true, y_pred, and sensitive feature cols.')
54    parser.add_argument('metrics_output_path', type=str, help='json file path to save the metrics output.')
55    parser.add_argument('sensitive_feature_col', type=str, help='column name of sensitive features')
56    parser.add_argument('stockcode', type=str)
57    parser.add_argument('privileged_group', type=int, default=1)
58    parser.add_argument('mod_threshold', type=float, default=.1)
59    args = parser.parse_args()
60
61    try:
62        # load inf df
63        df_inference = pd.read_parquet(args.inference_file_path)
64        LABEL_COL = 'y_true'
65        PREDICTION_COL = 'y_pred'
66        SENSITIVE_COL = args.sensitive_feature_col
67
68        # compute fairness metrics
69        metrics = calculate_fairness_metrics(
70            df=df_inference,
71            sensitive_feature_col=SENSITIVE_COL,
72            label_col=LABEL_COL,
73            prediction_col=PREDICTION_COL,
74            privileged_group=args.privileged_group,
75            mod_threshold=args.mod_threshold,
76        )
77
78        # add items to metrics
79        metrics['model_version'] = f'dfn_{args.stockcode}_{os.getpid()}'
80        metrics['sensitive_feature'] = args.sensitive_feature_col
81        metrics['privileged_group'] = args.privileged_group
82        metrics['mod_threshold'] = args.mod_threshold
83        metrics['stockcode'] = args.stockcode
84        metrics['timestamp'] = datetime.datetime.now().isoformat()
85
86        # load metrics (dvc track)
87        with open(args.metrics_output_path, 'w') as f:
88            json_metrics = { k: (v if pd.notna(v) else None) for k, v in metrics.items() }
89            json.dump(json_metrics, f, indent=4)
90
91    except Exception as e:
92        main_logger.error(f'... an error occurred during risk and fairness assessment: {e} ...')
93        exit(1)
94
95if __name__ == '__main__':
96    main()
97

Outputs

The final stage generates a metrics file which contains test results and model version:

metrics/dfn_risk_fairness.json:

1{
2    "mse_unprivileged": 3.5370739412593575,
3    "mae_unprivileged": 1.48263614013523,
4    "rmsle_unprivileged": 0.6080000224747837,
5    "mean_prediction_unprivileged": 1.8507767915725708,
6    "mae_diff": 1.48263614013523,
7    "mean_outcome_difference": 1.8507767915725708,
8    "is_mod_acceptable": 1,
9    "model_version": "dfn_85123A_35971",
10    "sensitive_feature": "is_registered",
11    "privileged_group": 1,
12    "mod_threshold": 0.1,
13    "timestamp": "2025-10-07T00:31:15.998590"
14}
15

That’s all for the lineage configuration. Now, I’ll test it in local.

Test in Local

I’ll run the entire ML lineage with this command:

1$dvc repro -f
2

-f forces DVC to rerun all the stages with or without any updates.

The command will automatically create the dvc.lock file at the root of the project directory:

1schema: '2.0'
2stages:
3  etl_pipeline_full:
4    cmd: python src/data_handling/etl_pipeline.py
5    deps:
6    - path: src/_utils/
7      hash: md5
8      md5: ae41392532188d290395495f6827ed00.dir
9      size: 15870
10      nfiles: 10
11    - path: src/data_handling/
12      hash: md5
13      md5: a8a61a4b270581a7c387d51e416f4e86.dir
14      size: 95715
15...
16

The dvc.lock file must be published in Git to make sure DVC will load the latest files:

1$git add dvc.lock .dvc dvc.yaml params.yaml
2$git commit -m'updated dvc config'
3$git push
4

Step 3: Deploying the DVC Project

Next, I’ll deploy the DVC project to ensure the AWS Lambda function can access the cached files in production.

I’ll start by configuring the DVC remote where the cached files are stored.

DVC offers various storage types like AWS S3 and Google Cloud. I’ll use AWS S3 for this project but your choice depend on the project ecosystem, your familiarity with the tool, and any resource constraints.

First, I’ll create a new S3 bucket in the selected AWS region:

1$aws s3 mb s3://<PROJECT NAME>/<BUCKET NAME>  --region <AWS REGION>
2

*Make sure the IAM role has the following permissions: s3:ListBucket, s3:GetObject, s3:PutObject, and s3:DeleteObject.

Then, add theURI of the S3 bucket to the DVC remote:

1$dvc remote add -d <DVC REMOTE NAME> ss3://<PROJECT NAME>/<BUCKET NAME>
2

Next, push the cache files to the DVC remote:

1$dvc push
2

Now, all cache files are stored in the S3 bucket:

Figure C. Screenshot of the DVC remote in AWS S3 bucket

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure C. Screenshot of the DVC remote in AWS S3 bucket

As showed in Figure A, this deployment step is necessary for the AWS Lambda function to access the DVC cache in production.

Step 4: Configuring Scheduled Run with Prefect

The next step is to configure the scheduled run of the entire lineage with Prefect.

Prefect is an open-source workflow orchestration tool for building, scheduling, and monitoring pipelines. It uses a concept called a work pool to effectively decouple the orchestration logic from the execution infrastructure.

Then, the work pool serves as a standardized base configuration by running a Docker container image to guarantee a consistent execution environment for all flows.

Configuring the Docker Image Registry

The first step is to configure the Docker image registry for the Prefect work pool:

  • For local deployment: A container registry in the Docker Hub.

  • For production deployment: AWS ECR.

For local deployment, I’ll first authenticate the Docker client:

1$docker login
2

And grant a user permission to run Docker commands without sudo:

1$sudo dscl . -append /Groups/docker GroupMembership $USER
2

For production deployment, I’ll create a new ECR:

1$aws ecr create-repository --repository-name <REGISTORY NAME> --region <AWS REGION>
2

(Make sure the IAM role has access to this new ECR URI.)

Configure Prefect Tasks and Flows

Next, I’ll configure the Prefect task and flow in the project:

  • The Prefect task executes the dvc repro and dvc push commands

  • The Prefect flow weekly executes the Prefect task.

src/prefect_flows.py:

1import os
2import sys
3import subprocess
4from datetime import timedelta, datetime
5from dotenv import load_dotenv
6from prefect import flow, task
7from prefect.schedules import Schedule
8from prefect_aws import AwsCredentials
9
10from src._utils import main_logger
11
12# add project root to the python path - enabling prefect to find the script
13sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
14
15# define the prefect task
16@task(retries=3, retry_delay_seconds=30)
17def run_dvc_pipeline():
18    # execute the dvc pipeline 
19    result = subprocess.run(["dvc", "repro"], capture_output=True, text=True, check=True)
20
21    # push the updated data
22    subprocess.run(["dvc", "push"], check=True)
23
24
25# define the prefect flow
26@flow(name="Weekly Data Pipeline")
27def weekly_data_flow():
28    run_dvc_pipeline()
29
30if __name__ == '__main__':
31    # docker image registry (either docker hub or aws ecr)
32    load_dotenv(override=True)
33    ENV = os.getenv('ENV', 'production')
34    DOCKER_HUB_REPO = os.getenv('DOCKER_HUB_REPO')
35    ECR_FOR_PREFECT_PATH = os.getenv('S3_BUCKET_FOR_PREFECT_PATH')
36    image_repo = f'{DOCKER_HUB_REPO}:ml-sales-pred-data-latest' if ENV == 'local' else f'{ECR_FOR_PREFECT_PATH}:latest'
37
38    # define weekly schedule
39    weekly_schedule = Schedule(
40        interval=timedelta(weeks=1),
41        anchor_date=datetime(2025, 9, 29, 9, 0, 0),
42        active=True,
43    )
44
45    # aws credentials to access ecr
46    AwsCredentials(
47        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
48        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
49        region_name=os.getenv('AWS_REGION_NAME'),
50    ).save('aws', overwrite=True)
51
52    # deploy the prefect flow
53    weekly_data_flow.deploy(
54        name='weekly-data-flow',
55        schedule=weekly_schedule, # schedule
56        work_pool_name="wp-ml-sales-pred", # work pool where the docker image (flow) runs
57        image=image_repo, # create a docker image at docker hub (local) or ecr (production)
58        concurrency_limit=3,
59        push=True # push the docker image to the image_repo
60    )
61

Test in Local

Next, I’ll test the workflow locally with the Prefect server:

1$uv run prefect server start
2
3$export PREFECT_API_URL="http://127.0.0.1:4200/api"
4

Run the prefect_flows.py script:

1$uv run src/prefect_flows.py
2

Upon the successful execution, the Prefect dashboard indicates the workflow is scheduled to run:

Figure D. Screenshot of the Prefect dashboard

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure D. Screenshot of the Prefect dashboard

Step 5: Deploying the Application

The final step is to deploy the entire application as a containerized Lambda by configuring the Dockerfile and the Flask application scripts.

The specific process in this final deployment step depends on the infrastructure.

But the common point is that DVC eliminates the need to store the large Parquet or CSV files directly in the feature store or model store because it caches them as lightweight hashed files.

So, first, I’ll simplify the loading logic of the Flask application script by using the dvc.api framework:

app.py:

1### ... the rest components remain the same  ...
2
3import dvc.api
4
5DVC_REMOTE_NAME=<REMOTE NAME IN .dvc/config file>
6
7
8def configure_dvc_for_lambda():
9    # set dvc directories to /tmp
10    os.environ.update({
11        'DVC_CACHE_DIR': '/tmp/dvc-cache',
12        'DVC_DATA_DIR': '/tmp/dvc-data',
13        'DVC_CONFIG_DIR': '/tmp/dvc-config',
14        'DVC_GLOBAL_CONFIG_DIR': '/tmp/dvc-global-config',
15        'DVC_SITE_CACHE_DIR': '/tmp/dvc-site-cache'
16    })
17    for dir_path in ['/tmp/dvc-cache', '/tmp/dvc-data', '/tmp/dvc-config']:
18        os.makedirs(dir_path, exist_ok=True)
19
20
21def load_x_test():
22    global X_test
23    if not os.environ.get('PYTEST_RUN', False):
24        main_logger.info("... loading x_test ...")
25
26        # config dvc directories
27        configure_dvc_for_lambda()
28        try:
29            with dvc.api.open(X_TEST_PATH, remote=DVC_REMOTE_NAME, mode='rb') as fd:
30                X_test = pd.read_parquet(fd)
31                main_logger.info('✅ successfully loaded x_test via dvc api')
32        except Exception as e:
33            main_logger.error(f'❌ general loading error: {e}', exc_info=True)
34
35
36def load_preprocessor():
37    global preprocessor
38    if not os.environ.get('PYTEST_RUN', False):
39        main_logger.info("... loading preprocessor ...")
40        configure_dvc_for_lambda()
41        try:
42            with dvc.api.open(PREPROCESSOR_PATH, remote=DVC_REMOTE_NAME, mode='rb') as fd:
43                preprocessor = joblib.load(fd)
44                main_logger.info('✅ successfully loaded preprocessor via dvc api')
45
46        except Exception as e:
47            main_logger.error(f'❌ general loading error: {e}', exc_info=True)
48
49### ... the rest components remain the same  ...
50

Then, update the Dockerfile to enable Docker to correctly reference the DVC components:

Dockerfile.lambda.production:

1# use an official python runtime
2FROM public.ecr.aws/lambda/python:3.12
3
4# set environment variables (adding dvc related env variables)
5ENV JOBLIB_MULTIPROCESSING=0
6ENV DVC_HOME="/tmp/.dvc"
7ENV DVC_CACHE_DIR="/tmp/.dvc/cache"
8ENV DVC_REMOTE_NAME="storage"
9ENV DVC_GLOBAL_SITE_CACHE_DIR="/tmp/dvc_global"
10
11# copy requirements file and install dependencies
12COPY requirements.txt ${LAMBDA_TASK_ROOT}
13RUN python -m pip install --upgrade pip
14RUN pip install --no-cache-dir -r requirements.txt
15RUN pip install --no-cache-dir dvc dvc-s3
16
17# setup dvc
18RUN dvc init --no-scm
19RUN dvc config core.no_scm true
20
21# copy the code to the lambda task root
22COPY . ${LAMBDA_TASK_ROOT}
23CMD [ "app.handler" ]
24

Lastly, ensure the large files are ignored from the Docker container image:

.dockerignore:

1### ... the rest components remain the same  ...
2
3# dvc cache contains large files
4.dvc/cache
5.dvcignore
6
7# add all folders that DVC will track
8data/
9preprocessors/
10models/
11reports/
12metrics/
13

Test in Local

Finally, I’ll build and test the Docker image:

1$docker build -t my-app -f Dockerfile.lambda.local .
2$docker run -p 5002:5002 -e ENV=local my-app app.py
3

Upon the successful configuration, the waitress server will run the Flask application.

After confirming the changes, push the code to Git:

1$git add .
2$git commit -m'updated dockerfiles and flask app scripts'
3$git push
4

This push command triggers the CI/CD pipeline via GitHub Actions, which generates a Docker container image and pushes it to AWS ECR.

And then after a successful pipeline flow and verification, we can manually run the deployment workflow using GitHub Actions.

Wrapping Up

Building robust ML applications requires comprehensive ML lineage to ensure reliability and traceability.

In this article, you learned how to build an ML lineage by integrating open-source services like DVC and Prefect.

In practice, initial planning matters. Specifically, defining how metrics are tracked and at which stages leads directly to a cleaner, more maintainable code structure and the extensibility in the future.

Moving forward, we can consider adding more stages to the lineage and integrating advanced logic for data drift detection or fairness tests.

This will further ensure continued model performance and data integrity in the production environment.

Continue Your Learning

If you enjoyed this blog, these related entries will complete the picture:

Related Books for Further Understanding

These books cover the wide range of theories and practices; from fundamentals to PhD level.

Linear Algebra Done Right

Linear Algebra Done Right

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps

Share What You Learned

Kuriko IWAI, "Building a Serverless ML Lineage: AWS Lambda, DVC, and Prefect" in Kernel Labs

https://kuriko-iwai.com/ml-lineage-for-serverless-production

Looking for Solutions?

Written by Kuriko IWAI. All images, unless otherwise noted, are by the author. All experimentations on this blog utilize synthetic or licensed data.