Building a Serverless ML Lineage: AWS Lambda, DVC, and Prefect
A practical guide on ML lineage fundamentals and MLOps workflow implementation for serverless ML system
By Kuriko IWAI

Table of Contents
IntroductionWhat is Machine Learning (ML) LineageWhat We’ll BuildIntroduction
Machine learning (ML) lineage is critical in any robust ML system. It lets you track data and model versions, ensuring reproducibility, auditability, and compliance.
While many services on tracking the ML lineage exist, creating a comprehensive and manageable lineage often proves complicated.
In this article, I’ll walk you through integrating a comprehensive ML lineage solution for an ML application deployed on serverless AWS Lambda, covering the end-to-end pipeline stages:
ETL pipeline
Data drift detection
Preprocessing
Model tuning
Risk and fairness evaluation.
What is Machine Learning (ML) Lineage
Machine learning (ML) lineage is a framework for tracking and understanding the complete lifecycle of a machine learning model.
It contains information at different levels such as:
Code: The scripts, libraries, and configurations for model training.
Data: The original data, transformations, and features.
Experiments: Training runs, hyperparameter tuning results.
Models: The trained models and their versions.
Predictions: The outputs of deployed models.
ML lineage is essential for multiple reasons:
Reproducibility: Recreate the same model and prediction for validation.
Root cause analysis: Trace back to the data, code, or configuration change when a model fails in production.
Compliance: Some regulated industries require proof of model training to ensure fairness, transparency, and adherence to laws like GDPR and the EU AI Act.
What We’ll Build
In this project, I’ll integrate an ML lineage to the price prediction system on AWS Lambda architecture using DVC, an open-source version control system for ML applications.
The below diagram illustrates the system architecture and the ML lineage I’ll integrate:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure A: A comprehensive ML lineage for an ML application on serverless Lambda (Created by Kuriko IWAI)
◼ The System Architecture - AI Pricing for Retailers
The system operates as a containerized, serverless microservice designed to provide optimal price recommendations to maximize retailer sales.
Its core intelligence comes from AI models trained on historical purchase data predict the quantity of the product sold at various prices, allowing a determination of the best price.
For consistent deployment, the prediction logic and its dependencies are packaged into a Docker container image and stored in AWS ECR (Elastic Container Registry).
The prediction is then served by an AWS Lambda function, which retrieves and runs the container from ECR and exposes the result via AWS API Gateway for the Flask application to consume.
◼ The ML Lineage
In the system, GitHub handles the code lineage, while DVC captures the lineage of:
Data (blue boxes): ETL and preprocessing.
Experiments (light orange): Hyperparamters tuning and validation.
Models and Prediction (dark orange): Final model artifacts and prediction results.
DVC tracks the lineage through separate stages, from data extraction to fairness testing (yellow rows in Figure A).
For each stage, DVC uses an MD5 or SHA256 hash to track and push metadata like artifacts, metrics, and reports to its remote on AWS S3.
The pipeline incorporates Evently AI to handle data drift tests, which are essential for identifying shifts in data distributions that could compromise the model's generalization capabilities in production.
Only models that successfully pass both the data drift and fairness tests can serve predictions via the AWS API gateway (red box in Figure A).
Lastly, this entire lineage process is triggered weekly by the open-source workflow scheduler, Prefect.
Prefect prompts DVC to check for updates in data and scripts, and executes the full lineage process if changes are detected.
▫ Tools Used
Here is a summary of the tools used to track the ML lineage:
DVC: An open-source version system for data. Used to track the ML lineage.
AWS S3: A secure object storage service from AWS. Used as a remote storage.
Evently AI: An open-source ML and LLM observability framework. Used to detect data drift.
Prefect: A workflow orchestration engine. Used to manage the schedule run of the lineage.
Workflow in Action
The building process involves five main steps:
Initiate a DVC project
Define the lineage stages with the DVC script dvc.yaml and corresponding Python script
Deploy the DVC project
Configure scheduled run with Prefect
Deploy the application
Let’s walk through each step together.
Step 1: Initiating a DVC Project
The first step is to initiate a DVC project:
1$dvc init
2
This command automatically creates a .dvc directory at the root of the project folder:
1.
2.dvc/
3│
4└── cache/ # [.gitignore] store dvc caches (cached actual data files)
5└── tmp/ # [.gitignore]
6└── .gitignore # gitignore cache, tmp, and config.local
7└── config # dvc config for production
8└── config.local # [.gitignore] dvc config for local
9
DVC maintains a fast, lightweight Git repository by separating the original data in large files from the repository.
The process involves caching the original data in the local .dvc/cache directory, creating a small .dvc metadata file which contains an MD5 hash and a link to the original data file path, pushing only the small metadata files to Git, and pushing the original data to the DVC remote.
Step 2: The ML Lineage
Next, I’ll configure the ML lineage with the following stages:
etl_pipeline: Extract, clean, impute the original data and perform feature engineering.
data_drift_check: Run data drift tests. If they fail, the system exits.
preprocess: Create training, validation, and test datasets.
tune_primary_model: Tune hyperparameters and train the model.
inference_primary_model: Perform inference on the test dataset.
assess_model_risk: Runs risk and fairness tests.
Each stage requires defining the DVC command and its corresponding Python script.
Let’s get started.
◼ Stage 1: The ETL Pipeline
The first stage is to extract, clean, impute the original data, and perform feature engineering.
▫ DVC Configuration
I’ll create the dvc.yaml file at the root of the project directory and add the etl_pipeline stage:
dvc.yaml
1stages:
2 etl_pipeline:
3 # the main command dvc will run in this stage
4 cmd: python src/data_handling/etl_pipeline.py
5
6 # dependencies necessary to run the main command
7 deps:
8 - src/data_handling/etl_pipeline.py
9 - src/data_handling/
10 - src/_utils/
11
12 # output paths for dvc to track
13 outs:
14 - data/original_df.parquet
15 - data/processed_df.parquet
16
The dvc.yaml file defines a sequence of steps (stages) using sections like:
cmd: The shell command to be executed for that stage
deps: Dependencies that need to run the cmd
prams: Default parameters for the cmd defined in the params.yaml file
metrics: The metrics files to track
reports: The report files to track
plots: The DVC plot files for visualization
outs: The output files produced by the cmd, which DVC will track
The configuration helps DVC ensure reproducibility by explicitly listing dependencies, outputs, and the commands of each stage. It also helps it manage the lineage by establishing a Directed Acyclic Graph (DAG) of the workflow, linking each stage to the next.
▫ Python Scripts
Next, let’s add Python scripts, ensuring the data is stored using the file paths specified in the outs section of the dvc.yaml file:
src/data_handling/etl_pipeline.py:
1import os
2import argparse
3
4import src.data_handling.scripts as scripts
5from src._utils import main_logger
6
7def etl_pipeline():
8 # extract the entire data
9 df = scripts.extract_original_dataframe()
10
11 # load perquet file
12 ORIGINAL_DF_PATH = os.path.join('data', 'original_df.parquet')
13 df.to_parquet(ORIGINAL_DF_PATH, index=False) # dvc tracked
14
15 # transform
16 df = scripts.structure_missing_values(df=df)
17 df = scripts.handle_feature_engineering(df=df)
18
19 PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
20 df.to_parquet(PROCESSED_DF_PATH, index=False) # dvc tracked
21 return df
22
23# for dvc execution
24if __name__ == '__main__':
25 parser = argparse.ArgumentParser(description="run etl pipeline")
26 parser.add_argument('--stockcode', type=str, default='', help="specific stockcode to process. empty runs full pipeline.")
27 parser.add_argument('--impute', action='store_true', help="flag to create imputation values")
28 args = parser.parse_args()
29
30 etl_pipeline(stockcode=args.stockcode, impute_stockcode=args.impute)
31
▫ Outputs
The original and structured data in Pandas’ DataFrames are stored in the DVC cache:
data/original_df.parquet
data/processed_df.parquet
◼ Stage 2: The Data Drift Check
Before jumping into preprocessing, I’ll run data drift tests to ensure any notable drift is in the data, using EventlyAI, an open-source ML and LLM observability framework.
▫ The Data Drift
Data drift refers to any changes in the statistical properties like the mean, variance, or distribution of the data that the model is trained on.
Its major categories include:
Covariate Drift (Feature Drift): A change in the input feature distribution.
Prior Probability Drift (Label Drift): A change in the target variable distribution.
Concept Drift: A change in the relationship between the input data and the target variable.
Data drift compromises the model's generalization capabilities over time, making its detection after deployment crucial.
▫ DVC Configuration
I’ll add the data_drift_check stage right after the etl_pipeline stage:
dvc.yaml:
1stages:
2 etl_pipeline:
3 ###
4 data_drift_check:
5 # the main command dvc will run in this stage
6 cmd: >
7 python src/data_handling/report_data_drift.py
8 data/processed/processed_df.csv
9 data/processed_df_${params.stockcode}.parquet
10 reports/data_drift_report_${params.stockcode}.html
11 metrics/data_drift_${params.stockcode}.json
12 ${params.stockcode}
13
14 # default values to the parameters (defined in the param.yaml file)
15 params:
16 - params.stockcode
17
18 # dependencies necessary to run the main command
19 deps:
20 - src/data_handling/report_data_drift.py
21 - src/
22
23 # output file pathes for dvc to track
24 plots:
25 - reports/data_drift_report_${params.stockcode}.html:
26
27 metrics:
28 - metrics/data_drift_${params.stockcode}.json:
29 type: json
30
Then, add default values to the parameters passed to the DVC command:
params.yaml:
1params:
2 stockcode: <STOCKCODE OF CHOICE>
3
▫ Python Scripts
After generating an API token from the EventlyAI workplace, I’ll add a Python script to detect data drift and store the results in the metrics variable:
src/data_handling/report_data_drift.py:
1import os
2import sys
3import json
4import pandas as pd
5import datetime
6from dotenv import load_dotenv
7
8from evidently import Dataset, DataDefinition, Report
9from evidently.presets import DataDriftPreset
10from evidently.ui.workspace import CloudWorkspace
11
12import src.data_handling.scripts as scripts
13from src._utils import main_logger
14
15
16if __name__ == '__main__':
17 # initiate evently cloud workspace
18 load_dotenv(override=True)
19 ws = CloudWorkspace(token=os.getenv('EVENTLY_API_TOKEN'), url='https://app.evidently.cloud')
20
21 # retrieve evently project
22 project = ws.get_project('EVENTLY AI PROJECT ID')
23
24 # retrieve paths from the command line args
25 REFERENCE_DATA_PATH = sys.argv[1]
26 CURRENT_DATA_PATH = sys.argv[2]
27 REPORT_OUTPUT_PATH = sys.argv[3]
28 METRICS_OUTPUT_PATH = sys.argv[4]
29 STOCKCODE = sys.argv[5]
30
31 # create folders if not exist
32 os.makedirs(os.path.dirname(REPORT_OUTPUT_PATH), exist_ok=True)
33 os.makedirs(os.path.dirname(METRICS_OUTPUT_PATH), exist_ok=True)
34
35 # extract datasets
36 reference_data_full = pd.read_csv(REFERENCE_DATA_PATH)
37 reference_data_stockcode = reference_data_full[reference_data_full['stockcode'] == STOCKCODE]
38 current_data_stockcode = pd.read_parquet(CURRENT_DATA_PATH)
39
40 # define data schema
41 nums, cats = scripts.categorize_num_cat_cols(df=reference_data_stockcode)
42 for col in nums: current_data_stockcode[col] = pd.to_numeric(current_data_stockcode[col], errors='coerce')
43
44 schema = DataDefinition(numerical_columns=nums, categorical_columns=cats)
45
46 # define evently dataset w/ the data schema
47 eval_data_1 = Dataset.from_pandas(reference_data_stockcode, data_definition=schema)
48 eval_data_2 = Dataset.from_pandas(current_data_stockcode, data_definition=schema)
49
50 # execute drift detection
51 report = Report(metrics=[DataDriftPreset()])
52 data_eval = report.run(reference_data=eval_data_1, current_data=eval_data_2)
53 data_eval.save_html(REPORT_OUTPUT_PATH)
54
55 # create metrics for dvc tracking
56 report_dict = json.loads(data_eval.json())
57 num_drifts = report_dict['metrics'][0]['value']['count']
58 shared_drifts = report_dict['metrics'][0]['value']['share']
59 metrics = dict(
60 drift_detected=bool(num_drifts > 0.0), num_drifts=num_drifts, shared_drifts=shared_drifts,
61 num_cols=nums,
62 cat_cols=cats,
63 stockcode=STOCKCODE,
64 timestamp=datetime.datetime.now().isoformat(),
65 )
66
67 # load metrics file
68 with open(METRICS_OUTPUT_PATH, 'w') as f:
69 json.dump(metrics, f, indent=4)
70 main_logger.info(f'... drift metrics saved to {METRICS_OUTPUT_PATH}... ')
71
72 # stop the system if data drift is found
73 if num_drifts > 0.0: sys.exit('❌ FATAL: data drift detected. stopping pipeline')
74
If data drift is found, the script immediately exits using the final sys.exit command.
▫ Outputs
The script generates two files that DVC will track:
reports/data_drift_report.html: The data drift report in a HTML file.
metrics/data_drift.json: The data drift metics in a JSON file including drift results along with feature columns and a timestamp:
metrics/data_drift.json:
1{
2 "drift_detected": false,
3 "num_drifts": 0.0,
4 "shared_drifts": 0.0,
5 "num_cols": [
6 "invoiceno",
7 "invoicedate",
8 "unitprice",
9 "product_avg_quantity_last_month",
10 "product_max_price_all_time",
11 "unitprice_vs_max",
12 "unitprice_to_avg",
13 "unitprice_squared",
14 "unitprice_log"
15 ],
16 "cat_cols": [
17 "stockcode",
18 "customerid",
19 "country",
20 "year",
21 "year_month",
22 "day_of_week",
23 "is_registered"
24 ],
25 "timestamp": "2025-10-07T00:24:29.899495"
26}
27
The drift test results are also available on the Evently workplace dashboard for further analysis:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure B. Screenshot of the Evently workspace dashboard
◼ Stage 3: Preprocessing
If no data drift is detected, the linage moves onto the preprocessing stage.
▫ DVC Configuration
I’ll add the preprocess stage right after the data_drift_check stage:
dvc.yaml:
1stages:
2 etl_pipeline:
3 ###
4 data_drift_check:
5 ###
6 preprocess:
7 cmd: >
8 python src/data_handling/preprocess.py --target_col ${params.target_col} --should_scale ${params.should_scale} --verbose ${params.verbose}
9
10 deps:
11 - src/data_handling/preprocess.py
12 - src/data_handling/
13 - src/_utils
14
15 # params from params.yaml
16 params:
17 - params.target_col
18 - params.should_scale
19 - params.verbose
20
21 outs:
22 # train, val, test datasets
23 - data/x_train_df.parquet
24 - data/x_val_df.parquet
25 - data/x_test_df.parquet
26 - data/y_train_df.parquet
27 - data/y_val_df.parquet
28 - data/y_test_df.parquet
29
30 # preprocessed input datasets
31 - data/x_train_processed.parquet
32 - data/x_val_processed.parquet
33 - data/x_test_processed.parquet
34
35 # trained preprocessor and human readable feature names for shap analysis
36 - preprocessors/column_transformer.pkl
37 - preprocessors/feature_names.json
38
And add default values of the parameters used in the cmd:
params.yaml:
1params:
2 target_col: "quantity"
3 should_scale: True
4 verbose: False
5
▫ Python Scripts
Next, I’ll add a Python script to creating training, validation, and test datasets and preprocessing input data:
1import os
2import argparse
3import json
4import joblib
5import pandas as pd
6import numpy as np
7from sklearn.model_selection import train_test_split
8
9import src.data_handling.scripts as scripts
10from src._utils import main_logger
11
12def preprocess(stockcode: str = '', target_col: str = 'quantity', should_scale: bool = True, verbose: bool = False):
13 # initiate metrics to track (dvc)
14 DATA_DRIFT_METRICS_PATH = os.path.join('metrics', f'data_drift_{args.stockcode}.json')
15
16 if os.path.exists(DATA_DRIFT_METRICS_PATH):
17 with open(DATA_DRIFT_METRICS_PATH, 'r') as f:
18 metrics = json.load(f)
19 else: metrics = dict()
20
21 # load processed df from dvc cache
22 PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
23 df = pd.read_parquet(PROCESSED_DF_PATH)
24
25 # categorize num and cat columns
26 num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)
27 if verbose: main_logger.info(f'num_cols: {num_cols} \ncat_cols: {cat_cols}')
28
29 # structure cat cols
30 if cat_cols:
31 for col in cat_cols: df[col] = df[col].astype('string')
32
33 # initiate preprocessor (either load from the dvc cache or create from scratch)
34 PREPROCESSOR_PATH = os.path.join('preprocessors', 'column_transformer.pkl')
35 try:
36 preprocessor = joblib.load(PREPROCESSOR_PATH)
37 except:
38 preprocessor = scripts.create_preprocessor(num_cols=num_cols if should_scale else [], cat_cols=cat_cols)
39
40 # creates train, val, test datasets
41 y = df[target_col]
42 X = df.copy().drop(target_col, axis='columns')
43
44 # split
45 test_size, random_state = 50000, 42
46 X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)
47 X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=test_size, random_state=random_state, shuffle=False)
48
49 # store train, val, test datasets (dvc track)
50 X_train.to_parquet('data/x_train_df.parquet', index=False)
51 X_val.to_parquet('data/x_val_df.parquet', index=False)
52 X_test.to_parquet('data/x_test_df.parquet', index=False)
53 y_train.to_frame(name=target_col).to_parquet('data/y_train_df.parquet', index=False)
54 y_val.to_frame(name=target_col).to_parquet('data/y_val_df.parquet', index=False)
55 y_test.to_frame(name=target_col).to_parquet('data/y_test_df.parquet', index=False)
56
57 # preprocess
58 X_train = preprocessor.fit_transform(X_train)
59 X_val = preprocessor.transform(X_val)
60 X_test = preprocessor.transform(X_test)
61
62 # store preprocessed input data (dvc track)
63 pd.DataFrame(X_train).to_parquet(f'data/x_train_processed.parquet', index=False)
64 pd.DataFrame(X_val).to_parquet(f'data/x_val_processed.parquet', index=False)
65 pd.DataFrame(X_test).to_parquet(f'data/x_test_processed.parquet', index=False)
66
67 # save feature names (dvc track) for shap
68 with open('preprocessors/feature_names.json', 'w') as f:
69 feature_names = preprocessor.get_feature_names_out()
70 json.dump(feature_names.tolist(), f)
71
72 return X_train, X_val, X_test, y_train, y_val, y_test, preprocessor
73
74
75if __name__ == '__main__':
76 parser = argparse.ArgumentParser(description='run data preprocessing')
77 parser.add_argument('--stockcode', type=str, default='', help='specific stockcode')
78 parser.add_argument('--target_col', type=str, default='quantity', help='the target column name')
79 parser.add_argument('--should_scale', type=bool, default=True, help='flag to scale numerical features')
80 parser.add_argument('--verbose', type=bool, default=False, help='flag for verbose logging')
81 args = parser.parse_args()
82
83 X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = preprocess(
84 target_col=args.target_col,
85 should_scale=args.should_scale,
86 verbose=args.verbose,
87 stockcode=args.stockcode,
88 )
89
▫ Outputs
This stage generates the necessary datasets for both model training and inference:
Input features:
data/x_train_df.parquet
data/x_val_df.parquet
data/x_test_df.parquet
Preprocessed input features:
data/x_train_processed_df.parquet
data/x_val_processed_df.parquet
data/x_test_processed_df.parquet
Target variables:
data/y_train_df.parquet
data/y_val_df.parquet
data/y_test_df.parquet
The preprocessor and human-readable feature names are also stored in cache for inference and SHAP feature impact analysis later:
preprocessors/column_transformer.pk
preprocessors/feature_names.json
Lastly, DVC adds the preprocess_status , x_train_processed_path, and preprocessor_path to the data summary metrics file data.json created in Step 2 to track the end-to-end process of Steps 2 and 3:
metrics/data.json:
1{
2 "drift_detected": false,
3 "num_drifts": 0.0,
4 "shared_drifts": 0.0,
5 "num_cols": [
6 "invoiceno",
7 "invoicedate",
8 "unitprice",
9 "product_avg_quantity_last_month",
10 "product_max_price_all_time",
11 "unitprice_vs_max",
12 "unitprice_to_avg",
13 "unitprice_squared",
14 "unitprice_log"
15 ],
16 "cat_cols": [
17 "stockcode",
18 "customerid",
19 "country",
20 "year",
21 "year_month",
22 "day_of_week",
23 "is_registered"
24 ],
25 "timestamp": "2025-10-07T00:24:29.899495",
26
27 # updates
28 "preprocess_status": "completed",
29 "x_train_processed_path": "data/x_train_processed_85123A.parquet",
30 "preprocessor_path": "preprocessors/column_transformer.pkl"
31}
32
Next, let’s move onto the model/experiment lineage.
◼ Stage 4: Tuning the Model
After creating the datasets, I’ll tune and train the primary model. It’s a multi-layered feedforward network on PyTorch, using training and validation datasets created in the preprocess stage.
▫ DVC Configuration
First, I’ll add the tuning_primary_model stage right after the preprocess stage:
dvc.yaml:
1stages:
2 etl_pipeline:
3 ###
4 data_drift_check:
5 ###
6 preprocess:
7 ###
8 tune_primary_model:
9 cmd: >
10 python src/model/torch_model/main.py
11 data/x_train_processed_${params.stockcode}.parquet
12 data/x_val_processed_${params.stockcode}.parquet
13 data/y_train_df_${params.stockcode}.parquet
14 data/y_val_df_${params.stockcode}.parquet
15 ${tuning.should_local_save}
16 ${tuning.grid}
17 ${tuning.n_trials}
18 ${tuning.num_epochs}
19 ${params.stockcode}
20
21 deps:
22 - src/model/torch_model/main.py
23 - src/data_handling/
24 - src/model/
25 - src/_utils/
26
27 params:
28 - params.stockcode
29 - tuning.n_trials
30 - tuning.grid
31 - tuning.should_local_save
32
33 outs:
34 - models/production/dfn_best_${params.stockcode}.pth # dvc track
35
36 metrics:
37 - metrics/dfn_val_${params.stockcode}.json: # dvc track
38
Then, add default values to the parameters:
params.yaml:
1params:
2 target_col: "quantity"
3 should_scale: True
4 verbose: False
5
6tuning:
7 n_trials: 100
8 num_epochs: 3000
9 should_local_save: False
10 grid: False
11
▫ Python Scripts
Next, I’ll add the Python scripts to tune the model using Bayesian optimization and then train the optimal model on the complete X_train and y_train datasets created in the preprocess stage.
src/model/torch_model/main.py:
1import os
2import sys
3import json
4import datetime
5import pandas as pd
6import torch
7import torch.nn as nn
8
9import src.model.torch_model.scripts as scripts
10
11
12def tune_and_train(
13 X_train, X_val, y_train, y_val,
14 stockcode: str = '',
15 should_local_save: bool = True,
16 grid: bool = False,
17 n_trials: int = 50,
18 num_epochs: int = 3000
19 ) -> tuple[nn.Module, dict]:
20
21 # perform bayesian optimization
22 best_dfn, best_optimizer, best_batch_size, best_checkpoint = scripts.bayesian_optimization(
23 X_train, X_val, y_train, y_val, n_trials=n_trials, num_epochs=num_epochs
24 )
25
26 # save the model artifact (dvc track)
27 DFN_FILE_PATH = os.path.join('models', 'production', f'dfn_best_{stockcode}.pth' if stockcode else 'dfn_best.pth')
28 os.makedirs(os.path.dirname(DFN_FILE_PATH), exist_ok=True)
29 torch.save(best_checkpoint, DFN_FILE_PATH)
30
31 return best_dfn, best_checkpoint
32
33
34
35def track_metrics_by_stockcode(X_val, y_val, best_model, checkpoint: dict, stockcode: str):
36 MODEL_VAL_METRICS_PATH = os.path.join('metrics', f'dfn_val_{stockcode}.json')
37 os.makedirs(os.path.dirname(MODEL_VAL_METRICS_PATH), exist_ok=True)
38
39 # validate the tuned model
40 _, mse, exp_mae, rmsle = scripts.perform_inference(model=best_model, X=X_val, y=y_val)
41 model_version = f"dfn_{stockcode}_{os.getpid()}"
42 metrics = dict(
43 stockcode=stockcode,
44 mse_val=mse,
45 mae_val=exp_mae,
46 rmsle_val=rmsle,
47 model_version=model_version,
48 hparams=checkpoint['hparams'],
49 optimizer=checkpoint['optimizer_name'],
50 batch_size=checkpoint['batch_size'],
51 lr=checkpoint['lr'],
52 timestamp=datetime.datetime.now().isoformat()
53 )
54 # store the validation results (dvc track)
55 with open(MODEL_VAL_METRICS_PATH, 'w') as f:
56 json.dump(metrics, f, indent=4)
57 main_logger.info(f'... validation metrics saved to {MODEL_VAL_METRICS_PATH} ...')
58
59
60if __name__ == '__main__':
61 # fetch command arg values
62 X_TRAIN_PATH = sys.argv[1]
63 X_VAL_PATH = sys.argv[2]
64 Y_TRAIN_PATH = sys.argv[3]
65 Y_VAL_PATH = sys.argv[4]
66 SHOULD_LOCAL_SAVE = sys.argv[5] == 'True'
67 GRID = sys.argv[6] == 'True'
68 N_TRIALS = int(sys.argv[7])
69 NUM_EPOCHS = int(sys.argv[8])
70 STOCKCODE = str(sys.argv[9])
71
72 # extract training and validation datasets from dvc cache
73 X_train, X_val = pd.read_parquet(X_TRAIN_PATH), pd.read_parquet(X_VAL_PATH)
74 y_train, y_val = pd.read_parquet(Y_TRAIN_PATH), pd.read_parquet(Y_VAL_PATH)
75
76 # tuning
77 best_model, checkpoint = tune_and_train(
78 X_train, X_val, y_train, y_val,
79 stockcode=STOCKCODE, should_local_save=SHOULD_LOCAL_SAVE, grid=GRID, n_trials=N_TRIALS, num_epochs=NUM_EPOCHS
80 )
81
82 # metrics tracking
83 track_metrics_by_stockcode(X_val, y_val, best_model=best_model, checkpoint=checkpoint, stockcode=STOCKCODE)
84
▫ Outputs
The stage generates two files:
models/production/dfn_best.pth: Includes model artifacts and checkpoint like the optimal hyperparameter set.
metrics/dfn_val.json: Contains tuning results, model version, timestamp, and validation results for MSE, MAE, and RMSLE:
metrics/dfn_val.json:
1{
2 "stockcode": "85123A",
3 "mse_val": 0.6137686967849731,
4 "mae_val": 9.092489242553711,
5 "rmsle_val": 0.6953299045562744,
6 "model_version": "dfn_85123A_35604",
7 "hparams": {
8 "num_layers": 4,
9 "batch_norm": false,
10 "dropout_rate_layer_0": 0.13765888061300502,
11 "n_units_layer_0": 184,
12 "dropout_rate_layer_1": 0.5509872409359128,
13 "n_units_layer_1": 122,
14 "dropout_rate_layer_2": 0.2408753527744403,
15 "n_units_layer_2": 35,
16 "dropout_rate_layer_3": 0.03451842588822594,
17 "n_units_layer_3": 224,
18 "learning_rate": 0.026240673135104406,
19 "optimizer": "adamax",
20 "batch_size": 64
21 },
22 "optimizer": "adamax",
23 "batch_size": 64,
24 "lr": 0.026240673135104406,
25 "timestamp": "2025-10-07T00:31:08.700294"
26}
27
◼ Stage 5: Performing Inference
After the model tuning phase is complete, I’ll configure the test inference for a final evaluation.
The final evaluation uses the MSE, MAE, and RMSLE metrics, as well as SHAP for feature impact and interpretability analysis.
SHAP (SHapley Additive exPlanations) is a framework for quantifying how much each feature contributes to a model’s prediction by using the concept of Shapley values from game theory.
The SHAP values are leveraged for future EDA and feature engineering.
▫ DVC Configuration
First, I’ll add the inference_primary_model stage to the DVC configuration.
This stage has the plots section where DVC will track and version the generated visualization files on the SHAP values.
dvc.yaml:
1stages:
2 etl_pipeline:
3 ###
4 data_drift_check:
5 ###
6 preprocess:
7 ###
8 tune_primary_model:
9 ###
10 inference_primary_model:
11 cmd: >
12 python src/model/torch_model/inference.py
13 data/x_test_processed_${params.stockcode}.parquet
14 data/y_test_df_${params.stockcode}.parquet
15 models/production/dfn_best_${params.stockcode}.pth
16 ${params.stockcode}
17 ${tracking.sensitive_feature_col}
18 ${tracking.privileged_group}
19
20 deps:
21 - src/model/torch_model/inference.py
22 - models/production/
23 - src/
24
25 params:
26 - params.stockcode
27 - tracking.sensitive_feature_col
28 - tracking.privileged_group
29
30 metrics:
31 - metrics/dfn_inf_${params.stockcode}.json: # dvc track
32 type: json
33
34 plots:
35 # shap summary / beeswarm plot for global interpretability
36 - reports/dfn_shap_summary_${params.stockcode}.json:
37 template: simple
38 x: shap_value
39 y: feature_name
40 title: SHAP Beeswarm Plot
41
42 # shap mean absolute vals - feature importance bar plot
43 - reports/dfn_shap_mean_abs_${params.stockcode}.json:
44 template: bar
45 x: mean_abs_shap
46 y: feature_name
47 title: Mean Absolute SHAP Importance
48
49 outs:
50 - data/dfn_inference_results_${params.stockcode}.parquet
51 - reports/dfn_raw_shap_values_${params.stockcode}.parquet # save raw shap vals for detailed analysis later
52
▫ Python Scripts
Next, I’ll add scripts where the trained model performs inference:
src/model/torch_model/inference.py:
1import os
2import sys
3import json
4import datetime
5import numpy as np
6import pandas as pd
7import torch
8import shap
9
10import src.model.torch_model.scripts as scripts
11from src._utils import main_logger
12
13
14if __name__ == '__main__':
15 # load test dataset
16 X_TEST_PATH = sys.argv[1]
17 Y_TEST_PATH = sys.argv[2]
18 X_test, y_test = pd.read_parquet(X_TEST_PATH), pd.read_parquet(Y_TEST_PATH)
19
20 # create X_test w/ column names for shap analysis and sensitive feature tracking
21 X_test_with_col_names = X_test.copy()
22 FEATURE_NAMES_PATH = os.path.join('preprocessors', 'feature_names.json')
23 try:
24 with open(FEATURE_NAMES_PATH, 'r') as f: feature_names = json.load(f)
25 except FileNotFoundError: feature_names = X_test.columns.tolist()
26 if len(X_test_with_col_names.columns) == len(feature_names): X_test_with_col_names.columns = feature_names
27
28 # reconstruct the optimal model tuned in the previous stage
29 MODEL_PATH = sys.argv[3]
30 checkpoint = torch.load(MODEL_PATH)
31 model = scripts.load_model(checkpoint=checkpoint)
32
33 # perform inference
34 y_pred, mse, exp_mae, rmsle = scripts.perform_inference(model=model, X=X_test, y=y_test, batch_size=checkpoint['batch_size'])
35
36 # create result df w/ y_pred, y_true, and sensitive features
37 STOCKCODE = sys.argv[4]
38 SENSITIVE_FEATURE = sys.argv[5]
39 PRIVILEGED_GROUP = sys.argv[6]
40 inference_df = pd.DataFrame(y_pred.cpu().numpy().flatten(), columns=['y_pred'])
41 inference_df['y_true'] = y_test
42 inference_df[SENSITIVE_FEATURE] = X_test_with_col_names[f'cat__{SENSITIVE_FEATURE}_{str(PRIVILEGED_GROUP)}'].astype(bool)
43 inference_df.to_parquet(path=os.path.join('data', f'dfn_inference_results_{STOCKCODE}.parquet'))
44
45 # record inference metrics
46 MODEL_INF_METRICS_PATH = os.path.join('metrics', f'dfn_inf_{STOCKCODE}.json')
47 os.makedirs(os.path.dirname(MODEL_INF_METRICS_PATH), exist_ok=True)
48 model_version = f"dfn_{STOCKCODE}_{os.getpid()}"
49 inf_metrics = dict(
50 stockcode=STOCKCODE,
51 mse_inf=mse,
52 mae_inf=exp_mae,
53 rmsle_inf=rmsle,
54 model_version=model_version,
55 hparams=checkpoint['hparams'],
56 optimizer=checkpoint['optimizer_name'],
57 batch_size=checkpoint['batch_size'],
58 lr=checkpoint['lr'],
59 timestamp=datetime.datetime.now().isoformat()
60 )
61 with open(MODEL_INF_METRICS_PATH, 'w') as f: # dvc track
62 json.dump(inf_metrics, f, indent=4)
63 main_logger.info(f'... inference metrics saved to {MODEL_INF_METRICS_PATH} ...')
64
65
66 ## shap analysis
67 # compute shap vals
68 model.eval()
69
70 # prepare backgdound data
71 X_test_tensor = torch.from_numpy(X_test.values.astype(np.float32)).to(device_type)
72
73 # take the small samples from x_test as background
74 background = X_test_tensor[np.random.choice(X_test_tensor.shape[0], 100, replace=False)].to(device_type)
75
76 # define deepexplainer
77 explainer = shap.DeepExplainer(model, background)
78
79 # compute shap vals
80 shap_values = explainer.shap_values(X_test_tensor) # outputs = numpy array or tensor
81
82 # convert shap array to pandas df
83 if isinstance(shap_values, list): shap_values = shap_values[0]
84 if isinstance(shap_values, torch.Tensor): shap_values = shap_values.cpu().numpy()
85 shap_values = shap_values.squeeze(axis=-1) # type: ignore
86 shap_df = pd.DataFrame(shap_values, columns=feature_names)
87
88 # shap raw data (dvc track)
89 RAW_SHAP_OUT_PATH = os.path.join('reports', f'dfn_raw_shap_values_{STOCKCODE}.parquet')
90 os.makedirs(os.path.dirname(RAW_SHAP_OUT_PATH), exist_ok=True)
91 shap_df.to_parquet(RAW_SHAP_OUT_PATH, index=False)
92 main_logger.info(f'... shap values saved to {RAW_SHAP_OUT_PATH} ...')
93
94 # bar plot of mean abs shap vals (dvc report)
95 mean_abs_shap = shap_df.abs().mean().sort_values(ascending=False)
96 shap_mean_abs_df = pd.DataFrame({'feature_name': feature_names, 'mean_abs_shap': mean_abs_shap.values })
97 MEAN_ABS_SHAP_PATH = os.path.join('reports', f'dfn_shap_mean_abs_{STOCKCODE}.json')
98 shap_mean_abs_df.to_json(MEAN_ABS_SHAP_PATH, orient='records', indent=4)
99
▫ Outputs
This stage generates five output files:
data/dfn_inference_result_${params_stockcode}.parquet: Stores prediction results, labeled targets, and any columns with sensitive features like gender, age, income, and more. I’ll use this file for the fairness test in the last stage.
metrics/dfn_inf.json: Stores evaluation metrics and tuning results:
1{
2 "stockcode": "85123A",
3 "mse_inf": 0.6841545701026917,
4 "mae_inf": 11.5866117477417,
5 "rmsle_inf": 0.7423332333564758,
6 "model_version": "dfn_85123A_35834",
7 "hparams": {
8 "num_layers": 4,
9 "batch_norm": false,
10 "dropout_rate_layer_0": 0.13765888061300502,
11 "n_units_layer_0": 184,
12 "dropout_rate_layer_1": 0.5509872409359128,
13 "n_units_layer_1": 122,
14 "dropout_rate_layer_2": 0.2408753527744403,
15 "n_units_layer_2": 35,
16 "dropout_rate_layer_3": 0.03451842588822594,
17 "n_units_layer_3": 224,
18 "learning_rate": 0.026240673135104406,
19 "optimizer": "adamax",
20 "batch_size": 64
21 },
22 "optimizer": "adamax",
23 "batch_size": 64,
24 "lr": 0.026240673135104406,
25 "timestamp": "2025-10-07T00:31:12.946405"
26}
27
- reports/dfn_shap_mean_abs.json: Stores the mean SHAP values:
1[
2 {
3 "feature_name":"num__invoicedate",
4 "mean_abs_shap":0.219255722
5 },
6 {
7 "feature_name":"num__unitprice",
8 "mean_abs_shap":0.1069829418
9 },
10 {
11 "feature_name":"num__product_avg_quantity_last_month",
12 "mean_abs_shap":0.1021453096
13 },
14 {
15 "feature_name":"num__product_max_price_all_time",
16 "mean_abs_shap":0.0855356899
17 },
18...
19]
20
reports/dfn_shap_summary.json: Contains the data points necessary to draw the beeswarm/bar plots.
reports/dfn_raw_shap_values.parquet: Stores raw SHAP values.
◼ Stage 6: Assessing Model Risk and Fairness
The last stage is to assess risk and fairness of the final inference results.
▫ The Fairness Testing
Fairness testing in ML is the process of systematically evaluating a model’s predictions to ensure they are not unfairly biased toward specific groups defined by sensitive attributes like race and gender.
In this project, I’ll use the registration status is_registered column as a sensitive feature and make sure the Mean Outcome Difference (MOD) is within the specified threshold of 0.1.
The MOD is calculated as the absolute difference between the mean prediction values of the privileged (registered) and unprivileged (unregistered) groups.
▫ DVC Configuration
First, I’ll add the assess_model_risk stage right after the inference_primary_model stage:
dvc.yaml:
1stages:
2 etl_pipeline:
3 ###
4 data_drift_check:
5 ###
6 preprocess:
7 ###
8 tune_primary_model:
9 ###
10 inference_primary_model:
11 ###
12 assess_model_risk:
13 cmd: >
14 python src/model/torch_model/assess_risk_and_fairness.py
15 data/dfn_inference_results_${params.stockcode}.parquet
16 metrics/dfn_risk_fairness_${params.stockcode}.json
17 ${tracking.sensitive_feature_col}
18 ${params.stockcode}
19 ${tracking.privileged_group}
20 ${tracking.mod_threshold}
21
22 deps:
23 - src/model/torch_model/assess_risk_and_fairness.py
24 - src/_utils/
25 - data/dfn_inference_results_${params.stockcode}.parquet # ensure the result df as dependency
26
27 params:
28 - params.stockcode
29 - tracking.sensitive_feature_col
30 - tracking.privileged_group
31 - tracking.mod_threshold
32
33 metrics:
34 - metrics/dfn_risk_fairness_${params.stockcode}.json:
35 type: json
36
Then, add default values to the parameters:
param.yaml:
1params:
2 target_col: "quantity"
3 should_scale: True
4 verbose: False
5
6tuning:
7 n_trials: 100
8 num_epochs: 3000
9 should_local_save: False
10 grid: False
11
12# adding default values to the tracking metrics
13tracking:
14 sensitive_feature_col: "is_registered"
15 privileged_group: 1 # member
16 mod_threshold: 0.1
17
▫ Python Script
The corresponding Python script contains the calculate_fairness_metrics function which performs the risk and fairness assessment:
src/model/torch_model/assess_risk_and_fairness.py:
1import os
2import json
3import datetime
4import argparse
5import pandas as pd
6from sklearn.metrics import mean_absolute_error, mean_squared_error, root_mean_squared_log_error
7
8from src._utils import main_logger
9
10
11def calculate_fairness_metrics(
12 df: pd.DataFrame,
13 sensitive_feature_col: str,
14 label_col: str = 'y_true',
15 prediction_col: str = 'y_pred',
16 privileged_group: int = 1,
17 mod_threshold: float = 0.1,
18 ) -> dict:
19
20 metrics = dict()
21 unprivileged_group = 0 if privileged_group == 1 else 1
22
23 ## 1. risk assessment - predictive performance metrics by group
24 for group, name in zip([unprivileged_group, privileged_group], ['unprivileged', 'privileged']):
25 subset = df[df[sensitive_feature_col] == group]
26 if len(subset) == 0: continue
27
28 y_true = subset[label_col].values
29 y_pred = subset[prediction_col].values
30
31 metrics[f'mse_{name}'] = float(mean_squared_error(y_true, y_pred)) # type: ignore
32 metrics[f'mae_{name}'] = float(mean_absolute_error(y_true, y_pred)) # type: ignore
33 metrics[f'rmsle_{name}'] = float(root_mean_squared_log_error(y_true, y_pred)) # type: ignore
34
35 # mean prediction (outcome disparity component)
36 metrics[f'mean_prediction_{name}'] = float(y_pred.mean()) # type: ignore
37
38 ## 2. bias assessment - fairness metrics
39 # absolute mean error difference
40 mae_diff = metrics.get('mae_unprivileged', 0) - metrics.get('mae_privileged', 0)
41 metrics['mae_diff'] = float(mae_diff)
42
43 # mean outcome difference
44 mod = metrics.get('mean_prediction_unprivileged', 0) - metrics.get('mean_prediction_privileged', 0)
45 metrics['mean_outcome_difference'] = float(mod)
46 metrics['is_mod_acceptable'] = 1 if abs(mod) <= mod_threshold else 0
47
48 return metrics
49
50
51def main():
52 parser = argparse.ArgumentParser(description='assess bias and fairness metrics on model inference results.')
53 parser.add_argument('inference_file_path', type=str, help='parquet file path to the inference results w/ y_true, y_pred, and sensitive feature cols.')
54 parser.add_argument('metrics_output_path', type=str, help='json file path to save the metrics output.')
55 parser.add_argument('sensitive_feature_col', type=str, help='column name of sensitive features')
56 parser.add_argument('stockcode', type=str)
57 parser.add_argument('privileged_group', type=int, default=1)
58 parser.add_argument('mod_threshold', type=float, default=.1)
59 args = parser.parse_args()
60
61 try:
62 # load inf df
63 df_inference = pd.read_parquet(args.inference_file_path)
64 LABEL_COL = 'y_true'
65 PREDICTION_COL = 'y_pred'
66 SENSITIVE_COL = args.sensitive_feature_col
67
68 # compute fairness metrics
69 metrics = calculate_fairness_metrics(
70 df=df_inference,
71 sensitive_feature_col=SENSITIVE_COL,
72 label_col=LABEL_COL,
73 prediction_col=PREDICTION_COL,
74 privileged_group=args.privileged_group,
75 mod_threshold=args.mod_threshold,
76 )
77
78 # add items to metrics
79 metrics['model_version'] = f'dfn_{args.stockcode}_{os.getpid()}'
80 metrics['sensitive_feature'] = args.sensitive_feature_col
81 metrics['privileged_group'] = args.privileged_group
82 metrics['mod_threshold'] = args.mod_threshold
83 metrics['stockcode'] = args.stockcode
84 metrics['timestamp'] = datetime.datetime.now().isoformat()
85
86 # load metrics (dvc track)
87 with open(args.metrics_output_path, 'w') as f:
88 json_metrics = { k: (v if pd.notna(v) else None) for k, v in metrics.items() }
89 json.dump(json_metrics, f, indent=4)
90
91 except Exception as e:
92 main_logger.error(f'... an error occurred during risk and fairness assessment: {e} ...')
93 exit(1)
94
95if __name__ == '__main__':
96 main()
97
▫ Outputs
The final stage generates a metrics file which contains test results and model version:
metrics/dfn_risk_fairness.json:
1{
2 "mse_unprivileged": 3.5370739412593575,
3 "mae_unprivileged": 1.48263614013523,
4 "rmsle_unprivileged": 0.6080000224747837,
5 "mean_prediction_unprivileged": 1.8507767915725708,
6 "mae_diff": 1.48263614013523,
7 "mean_outcome_difference": 1.8507767915725708,
8 "is_mod_acceptable": 1,
9 "model_version": "dfn_85123A_35971",
10 "sensitive_feature": "is_registered",
11 "privileged_group": 1,
12 "mod_threshold": 0.1,
13 "timestamp": "2025-10-07T00:31:15.998590"
14}
15
That’s all for the lineage configuration. Now, I’ll test it in local.
◼ Test in Local
I’ll run the entire ML lineage with this command:
1$dvc repro -f
2
-f forces DVC to rerun all the stages with or without any updates.
The command will automatically create the dvc.lock file at the root of the project directory:
1schema: '2.0'
2stages:
3 etl_pipeline_full:
4 cmd: python src/data_handling/etl_pipeline.py
5 deps:
6 - path: src/_utils/
7 hash: md5
8 md5: ae41392532188d290395495f6827ed00.dir
9 size: 15870
10 nfiles: 10
11 - path: src/data_handling/
12 hash: md5
13 md5: a8a61a4b270581a7c387d51e416f4e86.dir
14 size: 95715
15...
16
The dvc.lock file must be published in Git to make sure DVC will load the latest files:
1$git add dvc.lock .dvc dvc.yaml params.yaml
2$git commit -m'updated dvc config'
3$git push
4
Step 3: Deploying the DVC Project
Next, I’ll deploy the DVC project to ensure the AWS Lambda function can access the cached files in production.
I’ll start by configuring the DVC remote where the cached files are stored.
DVC offers various storage types like AWS S3 and Google Cloud. I’ll use AWS S3 for this project but your choice depend on the project ecosystem, your familiarity with the tool, and any resource constraints.
First, I’ll create a new S3 bucket in the selected AWS region:
1$aws s3 mb s3://<PROJECT NAME>/<BUCKET NAME> --region <AWS REGION>
2
*Make sure the IAM role has the following permissions: s3:ListBucket, s3:GetObject, s3:PutObject, and s3:DeleteObject.
Then, add theURI of the S3 bucket to the DVC remote:
1$dvc remote add -d <DVC REMOTE NAME> ss3://<PROJECT NAME>/<BUCKET NAME>
2
Next, push the cache files to the DVC remote:
1$dvc push
2
Now, all cache files are stored in the S3 bucket:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure C. Screenshot of the DVC remote in AWS S3 bucket
As showed in Figure A, this deployment step is necessary for the AWS Lambda function to access the DVC cache in production.
Step 4: Configuring Scheduled Run with Prefect
The next step is to configure the scheduled run of the entire lineage with Prefect.
Prefect is an open-source workflow orchestration tool for building, scheduling, and monitoring pipelines. It uses a concept called a work pool to effectively decouple the orchestration logic from the execution infrastructure.
Then, the work pool serves as a standardized base configuration by running a Docker container image to guarantee a consistent execution environment for all flows.
◼ Configuring the Docker Image Registry
The first step is to configure the Docker image registry for the Prefect work pool:
For local deployment: A container registry in the Docker Hub.
For production deployment: AWS ECR.
For local deployment, I’ll first authenticate the Docker client:
1$docker login
2
And grant a user permission to run Docker commands without sudo:
1$sudo dscl . -append /Groups/docker GroupMembership $USER
2
For production deployment, I’ll create a new ECR:
1$aws ecr create-repository --repository-name <REGISTORY NAME> --region <AWS REGION>
2
(Make sure the IAM role has access to this new ECR URI.)
◼ Configure Prefect Tasks and Flows
Next, I’ll configure the Prefect task and flow in the project:
The Prefect task executes the dvc repro and dvc push commands
The Prefect flow weekly executes the Prefect task.
src/prefect_flows.py:
1import os
2import sys
3import subprocess
4from datetime import timedelta, datetime
5from dotenv import load_dotenv
6from prefect import flow, task
7from prefect.schedules import Schedule
8from prefect_aws import AwsCredentials
9
10from src._utils import main_logger
11
12# add project root to the python path - enabling prefect to find the script
13sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
14
15# define the prefect task
16@task(retries=3, retry_delay_seconds=30)
17def run_dvc_pipeline():
18 # execute the dvc pipeline
19 result = subprocess.run(["dvc", "repro"], capture_output=True, text=True, check=True)
20
21 # push the updated data
22 subprocess.run(["dvc", "push"], check=True)
23
24
25# define the prefect flow
26@flow(name="Weekly Data Pipeline")
27def weekly_data_flow():
28 run_dvc_pipeline()
29
30if __name__ == '__main__':
31 # docker image registry (either docker hub or aws ecr)
32 load_dotenv(override=True)
33 ENV = os.getenv('ENV', 'production')
34 DOCKER_HUB_REPO = os.getenv('DOCKER_HUB_REPO')
35 ECR_FOR_PREFECT_PATH = os.getenv('S3_BUCKET_FOR_PREFECT_PATH')
36 image_repo = f'{DOCKER_HUB_REPO}:ml-sales-pred-data-latest' if ENV == 'local' else f'{ECR_FOR_PREFECT_PATH}:latest'
37
38 # define weekly schedule
39 weekly_schedule = Schedule(
40 interval=timedelta(weeks=1),
41 anchor_date=datetime(2025, 9, 29, 9, 0, 0),
42 active=True,
43 )
44
45 # aws credentials to access ecr
46 AwsCredentials(
47 aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
48 aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
49 region_name=os.getenv('AWS_REGION_NAME'),
50 ).save('aws', overwrite=True)
51
52 # deploy the prefect flow
53 weekly_data_flow.deploy(
54 name='weekly-data-flow',
55 schedule=weekly_schedule, # schedule
56 work_pool_name="wp-ml-sales-pred", # work pool where the docker image (flow) runs
57 image=image_repo, # create a docker image at docker hub (local) or ecr (production)
58 concurrency_limit=3,
59 push=True # push the docker image to the image_repo
60 )
61
◼ Test in Local
Next, I’ll test the workflow locally with the Prefect server:
1$uv run prefect server start
2
3$export PREFECT_API_URL="http://127.0.0.1:4200/api"
4
Run the prefect_flows.py script:
1$uv run src/prefect_flows.py
2
Upon the successful execution, the Prefect dashboard indicates the workflow is scheduled to run:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure D. Screenshot of the Prefect dashboard
Step 5: Deploying the Application
The final step is to deploy the entire application as a containerized Lambda by configuring the Dockerfile and the Flask application scripts.
The specific process in this final deployment step depends on the infrastructure.
But the common point is that DVC eliminates the need to store the large Parquet or CSV files directly in the feature store or model store because it caches them as lightweight hashed files.
So, first, I’ll simplify the loading logic of the Flask application script by using the dvc.api framework:
app.py:
1### ... the rest components remain the same ...
2
3import dvc.api
4
5DVC_REMOTE_NAME=<REMOTE NAME IN .dvc/config file>
6
7
8def configure_dvc_for_lambda():
9 # set dvc directories to /tmp
10 os.environ.update({
11 'DVC_CACHE_DIR': '/tmp/dvc-cache',
12 'DVC_DATA_DIR': '/tmp/dvc-data',
13 'DVC_CONFIG_DIR': '/tmp/dvc-config',
14 'DVC_GLOBAL_CONFIG_DIR': '/tmp/dvc-global-config',
15 'DVC_SITE_CACHE_DIR': '/tmp/dvc-site-cache'
16 })
17 for dir_path in ['/tmp/dvc-cache', '/tmp/dvc-data', '/tmp/dvc-config']:
18 os.makedirs(dir_path, exist_ok=True)
19
20
21def load_x_test():
22 global X_test
23 if not os.environ.get('PYTEST_RUN', False):
24 main_logger.info("... loading x_test ...")
25
26 # config dvc directories
27 configure_dvc_for_lambda()
28 try:
29 with dvc.api.open(X_TEST_PATH, remote=DVC_REMOTE_NAME, mode='rb') as fd:
30 X_test = pd.read_parquet(fd)
31 main_logger.info('✅ successfully loaded x_test via dvc api')
32 except Exception as e:
33 main_logger.error(f'❌ general loading error: {e}', exc_info=True)
34
35
36def load_preprocessor():
37 global preprocessor
38 if not os.environ.get('PYTEST_RUN', False):
39 main_logger.info("... loading preprocessor ...")
40 configure_dvc_for_lambda()
41 try:
42 with dvc.api.open(PREPROCESSOR_PATH, remote=DVC_REMOTE_NAME, mode='rb') as fd:
43 preprocessor = joblib.load(fd)
44 main_logger.info('✅ successfully loaded preprocessor via dvc api')
45
46 except Exception as e:
47 main_logger.error(f'❌ general loading error: {e}', exc_info=True)
48
49### ... the rest components remain the same ...
50
Then, update the Dockerfile to enable Docker to correctly reference the DVC components:
Dockerfile.lambda.production:
1# use an official python runtime
2FROM public.ecr.aws/lambda/python:3.12
3
4# set environment variables (adding dvc related env variables)
5ENV JOBLIB_MULTIPROCESSING=0
6ENV DVC_HOME="/tmp/.dvc"
7ENV DVC_CACHE_DIR="/tmp/.dvc/cache"
8ENV DVC_REMOTE_NAME="storage"
9ENV DVC_GLOBAL_SITE_CACHE_DIR="/tmp/dvc_global"
10
11# copy requirements file and install dependencies
12COPY requirements.txt ${LAMBDA_TASK_ROOT}
13RUN python -m pip install --upgrade pip
14RUN pip install --no-cache-dir -r requirements.txt
15RUN pip install --no-cache-dir dvc dvc-s3
16
17# setup dvc
18RUN dvc init --no-scm
19RUN dvc config core.no_scm true
20
21# copy the code to the lambda task root
22COPY . ${LAMBDA_TASK_ROOT}
23CMD [ "app.handler" ]
24
Lastly, ensure the large files are ignored from the Docker container image:
.dockerignore:
1### ... the rest components remain the same ...
2
3# dvc cache contains large files
4.dvc/cache
5.dvcignore
6
7# add all folders that DVC will track
8data/
9preprocessors/
10models/
11reports/
12metrics/
13
◼ Test in Local
Finally, I’ll build and test the Docker image:
1$docker build -t my-app -f Dockerfile.lambda.local .
2$docker run -p 5002:5002 -e ENV=local my-app app.py
3
Upon the successful configuration, the waitress server will run the Flask application.
After confirming the changes, push the code to Git:
1$git add .
2$git commit -m'updated dockerfiles and flask app scripts'
3$git push
4
This push command triggers the CI/CD pipeline via GitHub Actions, which generates a Docker container image and pushes it to AWS ECR.
And then after a successful pipeline flow and verification, we can manually run the deployment workflow using GitHub Actions.
Wrapping Up
Building robust ML applications requires comprehensive ML lineage to ensure reliability and traceability.
In this article, you learned how to build an ML lineage by integrating open-source services like DVC and Prefect.
In practice, initial planning matters. Specifically, defining how metrics are tracked and at which stages leads directly to a cleaner, more maintainable code structure and the extensibility in the future.
Moving forward, we can consider adding more stages to the lineage and integrating advanced logic for data drift detection or fairness tests.
This will further ensure continued model performance and data integrity in the production environment.
Continue Your Learning
If you enjoyed this blog, these related entries will complete the picture:
Architecting Production ML: A Deep Dive into Deployment and Scalability
Data Pipeline Architecture: From Traditional DWH to Modern Lakehouse
Building an Automated CI/CD Pipeline for Serverless Machine Learning on AWS
Related Books for Further Understanding
These books cover the wide range of theories and practices; from fundamentals to PhD level.

Linear Algebra Done Right

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps
Share What You Learned
Kuriko IWAI, "Building a Serverless ML Lineage: AWS Lambda, DVC, and Prefect" in Kernel Labs
https://kuriko-iwai.com/ml-lineage-for-serverless-production
Looking for Solutions?
- Deploying ML Systems 👉 Book a briefing session
- Hiring an ML Engineer 👉 Drop an email
- Learn by Doing 👉 Enroll AI Engineering Masterclass
Written by Kuriko IWAI. All images, unless otherwise noted, are by the author. All experimentations on this blog utilize synthetic or licensed data.


