Building a Production-Ready Data CI/CD Pipeline: Versioning, Drift Detection, and Orchestration

A step-by-step guide to building data CI/CD in production ML systems on serverless architecture

Machine LearningDeep LearningPython

By Kuriko IWAI

Kuriko IWAI

Table of Contents

IntroductionWhat is Data CI/CD Pipeline
Core Components of Data CI/CD Pipeline
The Data CI/CD ArchitectureStep 1. Creating the DVC Pipeline
What is DVC
DVC’s Hashing Algorithms
Workflow in Action
The
The
Testing in Local
Deploying the DVC Pipeline
Step 2. Configure Data Drift Detection
What is Data Drift
Drift Detection in Action
Testing in Local
Step 3. Updating the Infrastructure
Test in Local
Step 4. Configuring Scheduled Run with Prefect
What is Prefect
Workflow as Docker Image
Configure Prefect Tasks and Flows
Test in Local
Wrapping Up

Introduction

Building robust Machine Learning (ML) applications demands meticulous version control for all components: code, models, and the data that powers them.

For data specifically, it's critical to maintain data quality and detect destructive data drift once the application is running in production.

In this article, I’ll explore a step-by-step guide to deploying a comprehensive Data CI/CD pipeline leveraging open-source frameworks:

  • DVC for data and pipeline automation,

  • Evidently for crucial data drift detection, and

  • Prefect for reliable scheduling.

What is Data CI/CD Pipeline

A Data CI/CD pipeline is an extension of the Continuous Integration/Continuous Delivery practices, specifically tailored to automate the entire process of getting data into the model lifecycle.

This concept is essential in Machine Learning Operations (MLOps) because the system is defined by:

  • The code and infrastructure,

  • The model, and

  • The data used for training and inference.

In this article, I’ll focus on the data part. For the code and infrastructure, you can find a step-by-step guide here.

Core Components of Data CI/CD Pipeline

The data CI/CD pipeline ensures quality, consistency, and availability of the data throughout the model lifecycle.

Continuous Integration (CI) automates the testing and validation of data processing code and the data itself. The automation process involves:

  • Running the ETL pipeline to extract, transform, and load updated data into the feature store, and

  • Validating data by checking the incoming data for quality issues and detecting schema changes, missing values, outliers, and distribution shifts.

Continuous Delivery/Deployment (CD) automates the delivery of validated, production-ready data to downstream systems like model training process. The automation process involves:

  • Versioning the data to ensure reproducibility where the data and model artifacts trained on the data are linked together, and

  • Deploying the data (features) to a feature store for training and serving, ensuring consistency of the data.

The Data CI/CD Architecture

Now, I’ll build the data CI/CD pipeline for the price prediction system with DVC, an open-source version system for data, and Prefect, a workflow orchestration engine.

The below diagram shows the entire system architecture (grey) and the data pipeline I’ll build (orange and pink):

Figure A. The system architecture with data pipeline (Created by Kuriko IWAI)

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure A. The system architecture with data pipeline (Created by Kuriko IWAI)

The system leverages:

  • DVC (an open-source version system for data) for constructing the entire data CI/CD pipeline,

  • EventlyAI (an open-source ML and LLM observability framework) for detecting data drift, and

  • Prefect (a workflow orchestration engine) for scheduling the DVC pipeline.

In the system, the DVC pipeline (orange box) first checks if the code or data has changed and runs ETL scripts only when it detects any changes.

After running the ETL scripts, the pipeline automatically runs a data drift detection test using the Evently AI and, if the test passes, pushes the data to the feature store on AWS S3.

On the other hand, Prefect (pink box) manages the weekly scheduled execution of the DVC pipeline.

When triggered, DVC automatically checks for data and ETL script updates, ensuring data quality and consistency.

The building process involves the four main steps:

  1. Create the DVC pipeline,

  2. Configure data drift detection,

  3. Update the infrastructure, and

  4. Define the schedule run with Prefect.

Let us take a look.

Step 1. Creating the DVC Pipeline

First step is to create the DVC pipeline where the ETL scripts, data validation, and versioning are automated.

What is DVC

DVC is an open-source tool for version control to data and models in ML projects, acting as Git for models and data.

DVC operates by separating large data files from the Git using the following strategies:

  • Tracking data with lightweight metadata: DVC creates a DVC metadata file ( .dvc extension file) that contains a hash of the large original file and a link to its location to track data versioning.

  • Caching actual data: The large original files are cached and moved into a dedicated DVC cache directory at .dvc/cache. This cache directory is automatically added to the .gitignore file.

  • Git integration: DVC pushes only the small metadata files to the remote Git repository.

This way, DVC keeps the repository fast and lightweight.

DVC’s Hashing Algorithms

A hashing algorithm is a mathematical function that takes an input and converts it into a fixed-size string of characters called hash value, hash code, or message digest.

DVC supports the three hashing algorithms for its cache:

Default: MD5 (Message-Digest Algorithm 5)

  • Hashing algorithm that takes an input like a file and produces a fixed-size, 128-bit (16-byte) hash value no matter how large the input data is.

  • One-way function where regenerating the original data from the hash is computationally infeasible.

  • Collision vulnerability risk where the algorithm produces the same hash value from different inputs (insecure for applications like SSL certificates or password storage).

SHA-256 (Secure Hash Algorithm 256-bit)

  • Cryptographically-secure hashing algorithm which produces a fixe-sized, 256-bit hash value.

  • Highly resistant to both pre-image attacks (finding the original input from the hash) and collisions (finding two different inputs that produce the same hash).

  • Though slow to compute, suitable for projects that require security and strict compliance standards.

SHA-1

  • A legacy hash algorithm that DVC supports for legacy projects. Not recommended to applying to a new project.

In this project, I’ll use the default MD5 algorithm for the hash.

Workflow in Action

First, I’ll initiate the DVC project by running the command:

1$dvc init
2

This command automatically creates a .dvc directory at the root of the project folder with the following components:

1.
2.dvc/
34└── cache/         # [.gitignore] store dvc caches (cached actual data files)
5└── tmp/           # [.gitignore]
6└── .gitignore     # gitignore cache, tmp, and config.local
7└── config         # dvc config for production
8└── config.local   # [.gitignore] dvc config for local
9

The .dvc/.gitignore file automatically includes the cache, tmp folders and config.local

Then, I’ll configure the DVC pipeline by adding two stages:

  • The etl_pipeline stage: Raw data is extracted, transformed, and loaded to the S3 bucket, and

  • ##e: Preprocess the transformed data.

The preprocess stage is separated from the etl_pipeline stage because the preprocessing steps must be tailored to the model type.

For instance, LightGBM, one of the backup models in the project, does not require scaling numerical columns, whereas multi-layered feed-forward network, the primary model in the project, requires both scaling and encoding.

Each stage requires configuring:

  • The Python script that DVC will run when updated, and

  • The dvc.yaml file, a blueprint for the DVC pipeline.

The etl_pipeline Stage

The Python script in this stage is to extract, transform, and load the processed data:

src/data_handling/etl_pipeline.py

1import os
2import argparse
3
4import src.data_handling.scripts as scripts
5
6def etl_pipeline():
7    # extract the entire data
8    df = scripts.extract_original_dataframe()
9    ORIGINAL_DF_PATH = os.path.join('data', 'original_df.parquet')
10    df.to_parquet(ORIGINAL_DF_PATH, index=False) # tracked by dvc
11
12    # transform
13    df = scripts.structure_missing_values(df=df)
14    df = scripts.handle_feature_engineering(df=df)
15
16    # load
17    PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
18    df.to_parquet(PROCESSED_DF_PATH, index=False) # tracked by dvc
19    return df
20
21
22# this block is necessary for dvc to execute the script
23if __name__ == '__main__':
24    # fetch vals from the command line args  
25    parser = argparse.ArgumentParser(description="run etl pipeline")
26    args = parser.parse_args()
27
28    # execute
29    etl_pipeline()
30

Then, the dvc.yaml file tracks both the original and processed data output by the script.

dvc.yaml

1stages:
2  etl_pipeline:
3    # run the script
4    cmd: python src/data_handling/etl_pipeline.py
5
6    # dependencies necessary to run the script
7    deps:
8      - src/data_handling/etl_pipeline.py # the main script to run
9      - src/data_handling/ # all data_handling scripts as dependencies 
10      - src/_utils # all utility functions as dependencies
11
12    # output files that dvc tracks
13    outs:
14      - data/original_df.parquet # output from the first part of the script
15      - data/processed_df.parquet # processed data
16

The dvc.yaml file defines a sequence of steps as stages using:

  • cmd: The shell command to be executed for that stage,

  • deps: Dependencies that needs to run the cmd,

  • prams: Default parameters for the cmd defined in the params.yaml file, and

  • outs: The output files produced by the cmd, which DVC will track.

The configuration helps DVC to:

  • Ensure reproducibility by explicitly listing all dependencies, outputs, and the exact commands for each stage and

  • Manage data lineage by establishing a Directed Acyclic Graph (DAG) of the workflow, linking each stage to the next.

The preprocess Stage

Next, I’ll add the Python script for the preprocess stage where the transformed data is scaled and encoded:

src/data_handling/preprocess.py

1import os
2import argparse
3import joblib
4import pandas as pd
5import numpy as np
6from sklearn.model_selection import train_test_split
7
8import src.data_handling.scripts as scripts
9from src._utils import main_logger
10
11
12def preprocess(stockcode: str = '', target_col: str = 'quantity'):
13    # file paths
14    PROCESSED_DF_PATH = os.path.join('data', 'processed_df.parquet')
15    PREPROCESSOR_PATH = os.path.join('preprocessors', 'column_transformer.pkl')
16
17    # extract the processed df from the dvc cache
18    df = pd.read_parquet(PROCESSED_DF_PATH)
19
20    # categorize num and cat columns
21    num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)
22    if cat_cols:
23        for col in cat_cols: df[col] = df[col].astype('string')
24
25    # creates train, val, test datasets
26    y = df[target_col]
27    X = df.copy().drop(target_col, axis='columns')
28
29    test_size, random_state = 50000, 42
30    X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)
31    X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=test_size, random_state=random_state, shuffle=False)
32
33    # store train, val, test data for model training and inference. tracked by dvc
34    X_train.to_parquet('data/x_train_df.parquet', index=False)
35    X_val.to_parquet('data/x_val_df.parquet', index=False)
36    X_test.to_parquet('data/x_test_df.parquet', index=False)
37    y_train.to_frame(name=target_col).to_parquet('data/y_train_df.parquet', index=False)
38    y_val.to_frame(name=target_col).to_parquet('data/y_val_df.parquet', index=False)
39    y_test.to_frame(name=target_col).to_parquet('data/y_test_df.parquet', index=False)
40
41    # preprocess
42    X_train, X_val, X_test, preprocessor = scripts.transform_input(
43        X_train, X_val, X_test,
44        num_cols=num_cols if should_scale else [],
45        cat_cols=cat_cols
46    )
47
48    # trained preprocessor 
49    preprocessor.fit(X)
50    joblib.dump(preprocessor, PREPROCESSOR_PATH) # tracked by dvc
51
52    return  X_train, X_val, X_test, y_train, y_val, y_test, preprocessor
53
54
55# execute the script
56if __name__ == '__main__':
57    parser = argparse.ArgumentParser(description='run data preprocessing')
58    parser.add_argument('--stockcode', type=str, default='', help='specific stockcode')
59    parser.add_argument('--target_col', type=str, default='quantity', help='the target column name')
60    args = parser.parse_args()
61
62    X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = preprocess(
63        target_col=args.target_col,
64        stockcode=args.stockcode,
65    )
66

Then, add the stage to the dvc.yaml file:

dvc.yaml

1stages:
2  etl_pipeline:
3    # run the script
4    cmd: python src/data_handling/etl_pipeline.py
5
6    # dependencies necessary to run the script
7    deps:
8      - src/data_handling/etl_pipeline.py # the main script to run
9      - src/data_handling/ # all data_handling scripts as dependencies 
10      - src/_utils # all utility functions as dependencies
11
12    # output files that dvc tracks
13    outs:
14      - data/original_df.parquet # output from the first part of the script
15      - data/processed_df.parquet # processed data
16
17  ## ADDED
18  preprocess:
19    cmd: python src/data_handling/preprocess.py --target_col ${params.target_col}
20
21    deps:
22      - src/data_handling/preprocess.py
23      - src/data_handling/
24      - src/_utils
25
26    # params from params.yaml
27    params:
28      - params.target_col
29      - params.should_scale
30      - params.verbose
31
32    outs:
33      # train, validation, test datasets
34      - data/x_train_df.parquet
35      - data/x_val_df.parquet
36      - data/x_test_df.parquet
37      - data/y_train_df.parquet
38      - data/y_val_df.parquet
39      - data/y_test_df.parquet
40
41      # fitted preprocessor
42      - preprocessors/column_transformer.pkl
43

Now, DVC tracks the following data:

etl_pipeline:

  • data/original_df.parquet

  • data/processed_df.parquet

preprocess:

  • data/x_train_df.parquet

  • data/x_val_df.parquet

  • data/x_test_df.parquet

  • data/y_train_df.parquet

  • data/y_val_df.parquet

  • data/y_test_df.parquet

  • preprocessors/column_transformer.pk

Lastly, I’ll configure the params.yaml file to define the default parameters for the DVC pipeline:

params.yaml

1params:
2  target_col: "quantity"
3  should_scale: True
4  verbose: False
5

Testing in Local

I’ll test the pipeline in local by running the command:

1$dvc repro
2

The dvc repro command lets DVC start the process:

  • Reading the dvc.yaml file to determine the pipeline,

  • Checking the current state of all dependencies,

  • Only running the stages that have changed from the previous run, and

  • Generating the artifacts and the dvc.lock file upon the successful run.

The Artifacts (Pipeline Outputs)

The output files defined in the outs section in the dvc.yaml file are generated in the data folder.

Then, DVC automatically moves the content of these outputs into the DVC cache (.dvc/cache/), and then creates a link in the workspace.

1.
2.data/                         # contains all data (tracked by dvc)
34└── .gitignore                 # original parquet files are gitignored.
5└── origindal_df.parquet       # output files generated by dvc pipeline 
6└── processed_df.parquet
7└── x_train.parquet
89.dvc/                     
1011└── cache/                     # stores the original data (gitignore)
12     └── files/
13          └── md5/             # selected hash algorithm
14              └── 01/ 
15              └── 1c/
16              └── ...
17

The dvc.lock File

This process also generates the dvc.lock file, which records the state of all outputs by storing their unique identifiers using the selected MD5 hash algorithm:

dvc.lock

1schema: '2.0'
2stages:
3  etl_pipeline:
4    cmd: python src/data_handling/etl_pipeline.py
5    deps:
6    - path: src/_utils
7      hash: md5
8      md5: 7a0c87652406dac2d1dfa2b31ce3abf4.dir
9      size: 15816
10      nfiles: 10
11    - path: src/data_handling/
12      hash: md5
13      md5: 5c7503f116d719e73d7598068782e3c4.dir
14      size: 96986
15      nfiles: 31
16    - path: src/data_handling/etl_pipeline.py
17      hash: md5
18      md5: e7ae635652a4c6f98eae3f5dab570da9
19      size: 1862
20    outs:
21    - path: data/original_df.parquet
22      hash: md5
23      md5: 01451de363c83fb489bad09276ad4a1b
24      size: 3631450
25    - path: data/processed_df.parquet
26      hash: md5
27      md5: 5605f2648e91221d258d52380e0aa83b
28      size: 7019782
29 ...
30

To fully version the pipeline state, the dvc.lock file is pushed to Git along with the code and dvc.yaml.

Deploying the DVC Pipeline

After the local test passes, I’ll configure the DVC remote to deploy the pipeline.

Among storage types that DVC supports, I’ll choose AWS S3 bucket for the DVC remote, but choices depend on the project ecosystem and familiarity.

First, create a new S3 bucket in the AWS region of choice:

1$aws s3 mb s3://<PROJECT NAME>/<BUCKET NAME>  --region <AWS REGION>
2

*Make sure the IAM role has the following permissions: s3:ListBucket, s3:GetObject, s3:PutObject, and s3:DeleteObject.

Then, add the S3 bucket URI to the DVC remote:

1$dvc remote add -d <DVC REMOTE NAME> ss3://<PROJECT NAME>/<BUCKET NAME>
2

Finally, I’ll push the cached files stored in the .dvc/.cache folder to the DVC remote:

1$dvc push
2

Now, all cache files are stored in the S3 bucket:

Figure B. DVC remote on AWS S3

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure B. DVC remote on AWS S3

This final step is necessary for the AWS Lambda to access the cached data and preprocessor in production.

Step 2. Configure Data Drift Detection

Next, I'll configure the data drift detection using the Evently AI library.

What is Data Drift

Data drift is a change in the statistical properties like the mean, variance, or distribution of the data that the model is trained on.

Data drift is categorized into three main types:

  • Covariate Drift (Feature Drift): A change in the input feature distribution.
    i.e., The monthly average income of customers in the region (input feature) suddenly increases by 20% due to a new local industry opening.

  • Prior Probability Drift (Label Drift): A change in the target variable distribution.
    i.e. A new regulation on the product bans its common use, which shrinks its market size. Now, regardless of the price or other input features, the quantity sold (target variable) of the product declines.

  • Concept Drift: A change in the relationship between the input data and the target variable.
    i.e., A new competitor entering the market makes customers become more price-sensitive, changing the relationship between the price (input feature) and the quantity sold (target variable).

All types of data drift lead to poor generalization capabilities of the model over time.

So, detecting it in advance is critical.

Drift Detection in Action

After retrieving an API token from the Evently AI workplace, draft the report_data_drift script:

src/data_handling/report_data_drift.py

1import os
2import sys
3import json
4import pandas as pd
5from dotenv import load_dotenv
6
7from evidently import Dataset, DataDefinition, Report
8from evidently.presets import DataDriftPreset
9from evidently.ui.workspace import CloudWorkspace
10
11import src.data_handling.scripts as scripts
12from src._utils import main_logger
13
14
15if __name__ == '__main__':
16    # initiate evently cloud workspace
17    load_dotenv(override=True)
18    ws = CloudWorkspace(token=os.getenv('EVENTLY_API_TOKEN'), url='https://app.evidently.cloud')
19
20    # retrieve evently project
21    project = ws.get_project('01998e47-e3d7-7ee7-ae5e-b6fdf3f80fff')
22
23    # retrieve paths from the command line args
24    REFERENCE_DATA_PATH = sys.argv[1]
25    CURRENT_DATA_PATH = sys.argv[2]
26    REPORT_OUTPUT_PATH = sys.argv[3]
27    METRICS_OUTPUT_PATH = sys.argv[4]
28    STOCKCODE = sys.argv[5]
29
30    # extract datasets
31    reference_data_full = pd.read_csv(REFERENCE_DATA_PATH)
32    reference_data_stockcode = reference_data_full[reference_data_full['stockcode'] == STOCKCODE]
33    current_data_stockcode = pd.read_parquet(CURRENT_DATA_PATH)
34
35    # define data schema
36    nums, cats = scripts.categorize_num_cat_cols(df=reference_data_stockcode)
37    for col in nums: current_data_stockcode[col] = pd.to_numeric(current_data_stockcode[col], errors='coerce')
38
39    schema = DataDefinition(numerical_columns=nums, categorical_columns=cats)
40
41    # define evently dataset w/ the data schema
42    eval_data_1 = Dataset.from_pandas(reference_data_stockcode, data_definition=schema)
43    eval_data_2 = Dataset.from_pandas(current_data_stockcode, data_definition=schema)
44
45    # run
46    report = Report(metrics=[DataDriftPreset()])
47    data_eval = report.run(reference_data=eval_data_1, current_data=eval_data_2)
48    data_eval.save_html(REPORT_OUTPUT_PATH)
49    main_logger.info(f'... drift report saved to {REPORT_OUTPUT_PATH} ...')
50
51
52    # extract metrics from the report
53    report_dict = json.loads(data_eval.json())
54    num_drifts = report_dict['metrics'][0]['value']['count']
55    shared_drifts = report_dict['metrics'][0]['value']['share']
56    metrics = dict(drift_detected=bool(num_drifts > 0.0), num_drifts=num_drifts, shared_drifts=shared_drifts)        
57
58    # store the metrics in json for dvc to track
59    with open(METRICS_OUTPUT_PATH, 'w') as f:
60        json.dump(metrics, f, indent=4)
61        main_logger.info(f'... drift metrics saved to {METRICS_OUTPUT_PATH}... ')
62
63
64    # add eval to the project
65    if project is not None: ws.add_run(project_id=project.id, run=data_eval, include_data=False, name=STOCKCODE)
66
67    # raise an error if drift is detected and stop the dvc pipeline
68    if num_drifts > 0.0: sys.exit('❌ FATAL: data drift detected. stopping pipeline')
69

This script stores the detection results in an HTML file, a JSON file, and the Evently workplace.

To ensure tracking, only the JSON file is cached by DVC, and when a detection is found, the DVC pipeline is immediately halted by the final sys.exit command.

To make this works, the data_drift_check stage is added to the DVC pipeline before the preprocess stage:

dvc.yaml

1stages:
2    etl_pipeline:
3        ### (same as Step 1) 
4
5    data_drift_check: # run by stockcode
6        cmd: >
7          python src/data_handling/report_data_drift.py
8          data/processed/processed_df.csv
9          data/processed_df_${params.stockcode}.parquet
10          reports/drift_report_${params.stockcode}.html
11          metrics/drift_metrics_${params.stockcode}.json
12          ${params.stockcode}
13
14        params:
15          - params.stockcode
16
17        deps:
18          - src/data_handling/report_data_drift.py
19          - src/data_handling/scripts/
20          - src/_utils/
21          - data/
22
23        plots:
24          - reports/drift_report_${params.stockcode}.html: # dvc plots the results
25              cache: false # html report is large - skip caching
26
27        metrics:
28          - metrics/drift_metrics_${params.stockcode}.json: # dvc tracks and versions the metrics
29              type: json
30
31    preprocecss:
32        ### (same as Step 1)
33

The data_drift_check stage does not have any outputs, but the metrics in a JSON format are tracked by DVC.

Testing in Local

Lastly, I’ll test the full DVC pipeline:

1$dvc repro
2$dvc push
3

Upon the successful run, the detection results are on the dashboard of the Evently workplace:

Figure C. Evently workspace dashboard

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure C. Evently workspace dashboard

And the metrics are automatically stored in a JSON file for DVC to track:

metrics/data_drift.json

1{
2    "drift_detected": false,
3    "num_drifts": 0.0,
4    "shared_drifts": 0.0
5}
6

DVC skips the execution of the etl_pipeline stage because no data and source code have changed after Step 1.

That’s all for the DVC configuration.

I’ll move onto the infrastructure updates related to the DVC integration.

Step 3. Updating the Infrastructure

After the DVC pipeline integration, the Dockerfiles and the Flask app script must be updated.

The specific process in this step depends on the infrastructure.

But the common point is that DVC eliminates the needs to store the large Parquet or CSV files directly in the feature store because it caches them as lightweight hashed files.

First, I’ll simplify the loading logic of the Flask app script by using the dvc.api framework:

app.py

1### ... the rest components remain the same  ...
2
3import dvc.api
4
5DVC_REMOTE_NAME=<REMOTE NAME IN .dvc/config file>
6
7def configure_dvc_for_lambda():
8    # set dvc directories to /tmp
9    os.environ.update({
10        'DVC_CACHE_DIR': '/tmp/dvc-cache',
11        'DVC_DATA_DIR': '/tmp/dvc-data',
12        'DVC_CONFIG_DIR': '/tmp/dvc-config',
13        'DVC_GLOBAL_CONFIG_DIR': '/tmp/dvc-global-config',
14        'DVC_SITE_CACHE_DIR': '/tmp/dvc-site-cache'
15    })
16    for dir_path in ['/tmp/dvc-cache', '/tmp/dvc-data', '/tmp/dvc-config']:
17        os.makedirs(dir_path, exist_ok=True)
18
19def load_x_test():
20    global X_test
21    if not os.environ.get('PYTEST_RUN', False):
22        main_logger.info("... loading x_test ...")
23
24        # config dvc directories
25        configure_dvc_for_lambda()
26
27        try:
28            with dvc.api.open(X_TEST_PATH, remote=DVC_REMOTE_NAME, mode='rb') as fd:
29                X_test = pd.read_parquet(fd)
30                main_logger.info('✅ successfully loaded x_test via dvc api')
31
32        except Exception as e:
33            main_logger.error(f'❌ general loading error: {e}', exc_info=True)
34
35def load_preprocessor():
36    global preprocessor
37    if not os.environ.get('PYTEST_RUN', False):
38        main_logger.info("... loading preprocessor ...")
39        configure_dvc_for_lambda()
40        try:
41            with dvc.api.open(PREPROCESSOR_PATH, remote=DVC_REMOTE_NAME, mode='rb') as fd:
42                preprocessor = joblib.load(fd)
43                main_logger.info('✅ successfully loaded preprocessor via dvc api')
44
45        except Exception as e:
46            main_logger.error(f'❌ general loading error: {e}', exc_info=True)
47
48### ... the rest components remain the same  ...
49

Then, update the Dockerfile so that Docker refers to the DVC components:

Dockerfile.lambda.production

1# use an official python runtime
2FROM public.ecr.aws/lambda/python:3.12
3
4# set environment variables (adding dvc related env variables)
5ENV JOBLIB_MULTIPROCESSING=0
6ENV DVC_HOME="/tmp/.dvc"
7ENV DVC_CACHE_DIR="/tmp/.dvc/cache"
8ENV DVC_REMOTE_NAME="storage"
9ENV DVC_GLOBAL_SITE_CACHE_DIR="/tmp/dvc_global"
10
11# copy requirements file and install dependencies
12COPY requirements.txt ${LAMBDA_TASK_ROOT}
13RUN python -m pip install --upgrade pip
14RUN pip install --no-cache-dir -r requirements.txt
15RUN pip install --no-cache-dir dvc dvc-s3
16
17# setup dvc
18RUN dvc init --no-scm
19RUN dvc config core.no_scm true
20
21# copy the code to the lambda task root
22COPY . ${LAMBDA_TASK_ROOT}
23
24CMD [ "app.handler" ]
25

For the local Dockerfile configuration, make sure the .dvc folder is copied in the work directory:

Dockerfile.lambda.local

1FROM public.ecr.aws/lambda/python:3.12
2
3ENV JOBLIB_MULTIPROCESSING=0
4
5##  set the working directory in the container
6WORKDIR /app
7
8# copy dvc config and dvc.yaml
9COPY .dvc/config /app/.dvc/
10COPY dvc.yaml /app/
11
12# copy the entire repository
13COPY . /app/
14
15# install deps
16RUN pip install --no-cache-dir -r requirements.txt
17RUN pip install --no-cache-dir dvc dvc-s3
18
19# setup dvc (disabling git remote check)
20RUN dvc config core.no_scm true
21
22ENTRYPOINT [ "python" ]
23
24CMD [ "-m", "awslambdaric", "app.handler" ]
25

Lastly, ensure the large data files are ignored from the Docker container image:

.dockerignore

1### ... the rest components remain the same  ...
2
3# dvc cache contains large files
4.dvc/cache
5.dvcignore
6
7# add large original files of data, preprocessor, and models
8data/*.parquet
9data/*.csv
10
11preprocessors/*.pkl
12
13models/production/*.pth
14models/production/*.pkl
15

Test in Local

Finally, build the Docker image and test run in local:

1$docker build -t my-app -f Dockerfile.lambda.local .
2$docker run -p 5002:5002 -e ENV=local my-app app.py
3

If everything is fine, the waitress server will run the Flask application.

Step 4. Configuring Scheduled Run with Prefect

The final step is to configure the scheduled run of the DVC pipeline with Prefect.

What is Prefect

Prefect is an open-source workflow orchestration tool designed to build, schedule, and monitor data pipelines.

Prefect uses a work pool to decouple the orchestration logic from the infrastructure.

The work pool acts as a base configuration that all Prefect workflows must use, standardizing the deployment environment.

In the work pool, a Docker container image is run to further guarantee the standardized execution environment for the flow.

Workflow as Docker Image

First, I’ll configure the Docker image registry:

  • A container registry in the Docker Hub for local development, and

  • AWS ECR for production deployment.

So, the process starts by creating a new ECR:

1$aws ecr create-repository --repository-name <REGISTORY NAME> --region <AWS REGION>
2

(Make sure the IAM role has access to this new ECR URI.)

Then, for local deployment, authenticate the Docker client:

1$docker login
2

And grant a user permission to run Docker commands without sudo:

1$sudo dscl . -append /Groups/docker GroupMembership $USER
2

Configure Prefect Tasks and Flows

Next, I’ll add the Prefect task and flow:

  • The Prefect task executes the dvc repro and dvc push commands to run the DVC pipeline.

  • While the Prefect flow (Weekly Data Pipeline) weekly executes the Prefect task.

src/data_handling/prefect_flows.py

1import os
2import sys
3import subprocess
4from datetime import timedelta, datetime
5from dotenv import load_dotenv
6from prefect import flow, task
7from prefect.schedules import Schedule
8from prefect_aws import AwsCredentials
9
10from src._utils import main_logger
11
12# add project root to the python path - enabling prefect to find the script
13sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
14
15
16# define the prefect task
17@task(retries=3, retry_delay_seconds=30)
18def run_dvc_etl_pipeline():
19    # execute the dvc pipeline 
20    result = subprocess.run(["dvc", "repro"], capture_output=True, text=True, check=True)
21
22    # push the updated data
23    subprocess.run(["dvc", "push"], check=True)
24
25
26@flow(name="Weekly Data Pipeline")
27def weekly_data_flow():
28    run_dvc_etl_pipeline()
29
30
31if __name__ == '__main__':
32    # docker image registry (either docker hub or aws ecr)
33    load_dotenv(override=True)
34    ENV = os.getenv('ENV', 'production')
35    DOCKER_HUB_REPO = os.getenv('DOCKER_HUB_REPO')
36    ECR_FOR_PREFECT_PATH = os.getenv('S3_BUCKET_FOR_PREFECT_PATH')
37    image_repo = f'{DOCKER_HUB_REPO}:ml-sales-pred-data-latest' if ENV == 'local' else f'{ECR_FOR_PREFECT_PATH}:latest'
38
39    # define weekly schedule
40    weekly_schedule = Schedule(
41        interval=timedelta(weeks=1),
42        anchor_date=datetime(2025, 9, 29, 9, 0, 0),
43        active=True,
44    )
45
46    # aws credentials to access ecr
47    AwsCredentials(
48        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
49        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), # type: ignore
50        region_name=os.getenv('AWS_REGION_NAME'),
51    ).save('aws', overwrite=True)
52
53    # deploy the prefect flow
54    weekly_data_flow.deploy(
55        name='weekly-data-flow',
56        schedule=weekly_schedule, # schedule
57        work_pool_name="wp-ml-sales-pred", # work pool where the docker image (flow) runs
58        image=image_repo, # create a docker image at docker hub (local) or ecr (production)
59        concurrency_limit=3,
60        push=True # push the docker image to the image_repo
61    )
62

Test in Local

Next, I’ll test the workflow by running the Prefect server in local:

1$uv run prefect server start
2
3$export PREFECT_API_URL="http://127.0.0.1:4200/api"
4

Then, run the prefect_flows.py script:

1$uv run src/data_handling/prefect_flows.py
2

Upon the successful execution, the Prefect dashboard indicates the workflow is scheduled to run three times:

Figure D. Prefect dashboard

Kernel Labs | Kuriko IWAI | kuriko-iwai.com

Figure D. Prefect dashboard

And that’s all for the data CI/CD pipeline deployment.

Wrapping Up

Building robust ML applications requires comprehensive data versioning, tracking, and workflow orchestration to ensure reliability and detect failures early.

In this article, we demonstrated how to establish a strong Data CI/CD pipeline by integrating open-source services like DVC and Prefect.

Other options, such as MLflow, Airflow, or cloud-native services like AWS Step Functions or Google Cloud Composer, can be explored depending on the existing infrastructure and specific system architecture requirements.

Moving forward, the focus should shift to enhancing the system with automated model drift detection and implementing real-time data quality monitoring within the Prefect workflows.

This ensures continued model performance and data integrity in the production environment.

Continue Your Learning

If you enjoyed this blog, these related entries will complete the picture:

Related Books for Further Understanding

These books cover the wide range of theories and practices; from fundamentals to PhD level.

Linear Algebra Done Right

Linear Algebra Done Right

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps

Share What You Learned

Kuriko IWAI, "Building a Production-Ready Data CI/CD Pipeline: Versioning, Drift Detection, and Orchestration" in Kernel Labs

https://kuriko-iwai.com/data-cicd-pipelines

Looking for Solutions?

Written by Kuriko IWAI. All images, unless otherwise noted, are by the author. All experimentations on this blog utilize synthetic or licensed data.