From Notebook to Production: Building a Resilient ML Pipeline on AWS Lambda
A practical step-by-step guide to launching the full-stack machine learning system for price prediction
By Kuriko IWAI

Table of Contents
IntroductionWhat We’re BuildingIntroduction
We’ve built a fantastic machine learning (ML) model that performs perfectly in notebooks.
But shipping ML models requires more than just code; it demands a robust infrastructure and continuous adaptation to real-world data.
In this article, I’ll explore a step-by-step guide to ship a production-ready ML system in serverless production.
What We’re Building
◼ “AI Pricing for Retailers”
This project aims to help a middle-sized retailer compete with large players like Amazon.
With their inability to afford significant price discounts, the business faces a challenge in finding optimal price points as they expand their product lines.
Our goal is to leverage AI models to recommend the best price for a selected product to maximize sales for the retailer, and display it on a client-side user interface (UI):

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure A. UI
◼ The Models
I’ll train and tune multiple models so that when the primary model fails, a backup model gets loaded to serve predictions.
Primary Model: Multi-layered feedforward network (on the PyTorch library)
Backup Models (Backups): LightGBM, SVR, and Elastic Net (on the Scikit-Learn library)
The backup models are prioritized based on learning capabilities.
◼ Tuning & Training
The primary model was trained on a dataset of around 500,000 samples (source) and fine-tuned using Optuna's Bayesian Optimization, with grid search available for further refinement.
The backups are also trained on the same samples and tuned using the Scikit-Optimize framework.
◼ The Prediction
All models serve predictions on logged quantity values.
Logarithmic transformations of the quantity data make the distribution denser, which helps models learn patterns more effectively.
This is because logarithms reduce the impact of extreme values, or outliers, and can help normalize skewed data.
◼ Performance Validation
Model performance will be evaluated using different metrics for the transformed and original data, with a lower value always indicating better performance.
Logged values: Mean Squared Error (MSE)
Actual values: Root Mean Squared Log Error (RMSLE) and Mean Absolute Error (MAE)
The System Architecture
I’ll build a complete ecosystem around a AWS Lambda function to create a scalable ML system:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure B. The system architecture (Created by Kuriko IWAI)
AWS Lambda is a serverless production where a service provider can run the application without managing servers. Once they upload the code, AWS takes on the responsibility of managing the underlying infrastructure.
In the serverless production, the code is deployed as a stateless function that runs only when it is triggered by an event like HTTP requests or scheduled tasks.
This event-driven nature makes serverless production extremely efficient in resource allocation:
No Server Management: The cloud provider takes care of operational tasks.
Automatic Scaling: Serverless applications automatically scale up or down based on demand.
Pay-per-Use Billing: Charged for the exact amount of compute resources the application consumes.
Notably, other cloud ecosystems like Google Cloud Platform (GCP) and Microsoft Azure offer comprehensive alternatives to AWS. The choice depends on budget, project type, and our familiarity with each ecosystem.
The system architecture focuses on the following points:
The application is fully containerized on Docker for universal accessibility.
The container image is stored in AWS Elastic Container Registry (ECR).
The API Gateway’s REST API endpoints trigger an event to invoke the Lambda function.
The Lambda function loads the container image from ECR and perform inference.
Trained models, processors, and input features are stored in AWS S3 buckets.
A Redis client serves cached analytical data and past predictions stored in the ElastiCache.
◼ Core AWS Resources
To build the system, the following AWS resources are used:
Lamda: Serves a function to perform inference.
API Gateway: Routes API calls to the Lambda function.
S3 Storage: Serves feature store and model store.
ElastiCache: Store cached predictions and analytical data.
ECR: Stores Docker container images to allow Lambda to pull the image.
Each resource requires configuration. I’ll explore details in the next section.
The Deployment Workflow in Action
The deployment workflow involves the following steps:
Draft data preparation, model training & serialization scripts,
Configure designated feature store and model store in S3,
Create a Flask application with API endpoints,
Publish a Docker image to ECR,
Create a Lambda function, and
Configure related AWS resources.
Let us take a look.
* For your reference, here is the repository structure:
1.
2.venv/ [.gitignore] # stores uv venv
3│
4└── data/ [.gitignore]
5│ └──raw/ # stores raw data
6│ └──preprocessed/ # stores processed data after imputation and engineering
7│
8└── models/ [.gitignore] # stores serialized model after training and tuning
9│ └──dfn/ # deep feedforward network
10│ └──gbm/ # light gbm
11│ └──en/ # elastic net
12│ └──production/ # models to be stored in S3 for production use
13|
14└── notebooks/ # stores experimentation notebooks
15│
16└── src/ # core functions
17│ └──_utils/ # utility functions
18│ └──data_handling/ # functions to engineer features
19│ └──model/ # functions to train, tune, validate models
20│ │ └── sklearn_model
21│ │ └── torch_model
22│ │ └── ...
23│ └──main.py # main script to run the inference locally
24│
25└──app.py # Flask application (API endpoints)
26└──pyproject.toml # project configuration
27└──.env [.gitignore] # environment variables
28└──uv.lock # dependency locking
29└──Dockerfile # for Docker container image
30└──.dockerignore
31└──requirements.txt
32└──.python-version # python version locking (3.12)
33
Step 1. Draft Python Scripts
The first step is to draft Python scripts for data preparation, model training and tuning.
I’ll run these scripts in a batch process because these are resource-intensive and stateful tasks, unsuitable for serverless functions optimized for short-lived, stateless, and event-driven tasks.
Serverless functions also can experience cold start. With heavy tasks in the function, the API gateway would timeout before serving predictions.
src/main.py
1import os
2import torch
3import warnings
4import pickle
5import joblib
6import numpy as np
7import lightgbm as lgb
8from sklearn.linear_model import ElasticNet
9from sklearn.svm import SVR
10from skopt.space import Real, Integer, Categorical
11from dotenv import load_dotenv
12
13import src.data_handling as data_handling
14import src.model.torch_model as t
15import src.model.sklearn_model as sk
16
17
18if __name__ == '__main__':
19 load_dotenv(override=True)
20 os.makedirs(PRODUCTION_MODEL_FOLDER_PATH, exist_ok=True)
21
22 # create train, validation, test datasets
23 X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = data_handling.main_script()
24
25 # store the trained preprocessor in local storage
26 joblib.dump(preprocessor, PREPROCESSOR_PATH)
27
28 # model tuning and training
29 best_dfn_full_trained, checkpoint = t.main_script(X_train, X_val, y_train, y_val)
30
31 # serialize the trained model
32 torch.save(checkpoint, DFN_FILE_PATH)
33
34 # svr
35 best_svr_trained, best_hparams_svr = sk.main_script(
36 X_train, X_val, y_train, y_val, **sklearn_models[1]
37 )
38 if best_svr_trained is not None:
39 with open(SVR_FILE_PATH, 'wb') as f:
40 pickle.dump({ 'best_model': best_svr_trained, 'best_hparams': best_hparams_svr }, f)
41
42 # elastic net
43 best_en_trained, best_hparams_en = sk.main_script(
44 X_train, X_val, y_train, y_val, **sklearn_models[0]
45 )
46 if best_en_trained is not None:
47 with open(EN_FILE_PATH, 'wb') as f:
48 pickle.dump({ 'best_model': best_en_trained, 'best_hparams': best_hparams_en }, f)
49
50 # light gbm
51 best_gbm_trained, best_hparams_gbm = sk.main_script(
52 X_train, X_val, y_train, y_val, **sklearn_models[2]
53 )
54
55 if best_gbm_trained is not None:
56 with open(GBM_FILE_PATH, 'wb') as f:
57 pickle.dump({'best_model': best_gbm_trained, 'best_hparams': best_hparams_gbm }, f)
58
Running the script to train and serialize the models using the uv package management:
1$uv venv
2$source .venv/bin/activate
3$uv run src/main.py
4
The main.py script includes several key components.
◼ Data Handling Scripts
The data handling scripts involve loading original data, structuring missing values, and engineering necessary features for the prediction.
src/data_handling/main.py
1import os
2import joblib
3import numpy as np
4import pandas as pd
5from sklearn.model_selection import train_test_split
6
7import src.data_handling.scripts as scripts
8from src._utils import main_logger
9
10
11# load and save the original data frame in parquet
12df = scripts.load_original_dataframe()
13df.to_parquet(ORIGINAL_DF_PATH, index=False)
14
15# imputation
16df = scripts.structure_missing_values(df=df)
17
18# feature engineering
19df = scripts.handle_feature_engineering(df=df)
20
21# save processed df in csv and parquet
22scripts.save_df_to_csv(df=df)
23df.to_parquet(PROCESSED_DF_PATH, index=False)
24
25# categolize numerical and categorical columns for preprocessing
26num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)
27if cat_cols:
28 for col in cat_cols: df[col] = df[col].astype('string')
29
30# creates training, validation, and test datasets (test dataset is for inference only)
31y = df[target_col]
32X = df.copy().drop(target_col, axis='columns')
33test_size, random_state = 50000, 42
34X_tv, X_test, y_tv, y_test = train_test_split(
35 X, y, test_size=test_size, random_state=random_state
36)
37X_train, X_val, y_train, y_val = train_test_split(
38 X_tv, y_tv, test_size=test_size, random_state=random_state
39)
40
41# transform the input datasets
42X_train, X_val, X_test, preprocessor = scripts.transform_input(
43 X_train, X_val, X_test, num_cols=num_cols, cat_cols=cat_cols
44)
45
46# retrain and serialize the preprocessor
47if preprocessor is not None: preprocessor.fit(X)
48joblib.dump(preprocessor, PREPROCESSOR_PATH)
49
◼ Model Training & Tuning Scripts
The scripts involve initiating the model, searching optimal neural architecture and hyperparameters, and serializing the fully-trained model so that the system can load the trained model when performing inference.
Because the primary model are on PyTorch and backups are on Scikit-Learn, I drafted the scripts separately.
◼ 1. PyTorch Models
The training script contains training the model with the validation over a subset of training data.
It contains the early stopping logic when the loss history is not improved for a given consecutive epochs (i.e., 10 epochs).
src/model/torch_model/scripts/training.py
1import torch
2import torch.nn as nn
3import optuna
4from sklearn.model_selection import train_test_split
5
6from src._utils import main_logger
7
8# device
9device_type = device_type if device_type else 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
10device = torch.device(device_type)
11
12# gradient scaler for stability (only applicable for cuba)
13scaler = torch.GradScaler(device=device_type) if device_type == 'cuba' else None
14
15# start training
16best_val_loss = float('inf')
17epochs_no_improve = 0
18for epoch in range(num_epochs):
19 model.train()
20 for batch_X, batch_y in train_data_loader:
21 batch_X, batch_y = batch_X.to(device), batch_y.to(device)
22 optimizer.zero_grad()
23
24 try:
25 # pytorch's AMP system automatically handles the casting of tensors to Float16 or Float32
26 with torch.autocast(device_type=device_type):
27 outputs = model(batch_X)
28 loss = criterion(outputs, batch_y)
29
30 # break the training loop when models return nan or inf
31 if torch.any(torch.isnan(outputs)) or torch.any(torch.isinf(outputs)):
32 main_logger.error(
33 'pytorch model returns nan or inf. break the training loop.'
34 )
35 break
36
37 # create scaled gradients of losses
38 if scaler is not None:
39 scaler.scale(loss).backward()
40 scaler.unscale_(optimizer) # cliping grad
41 nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
42 scaler.step(optimizer) # unscales the gradients
43 scaler.update() # updates the scale
44
45 else:
46 loss.backward()
47 nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # cliping grad
48 optimizer.step()
49
50 except:
51 outputs = model(batch_X)
52 loss = criterion(outputs, batch_y)
53 loss.backward()
54 optimizer.step()
55
56
57 # run validation on a subset of the training dataset)
58 model.eval()
59 val_loss = 0.0
60
61 # switch the torch mode
62 with torch.inference_mode():
63 for batch_X_val, batch_y_val in val_data_loader:
64 batch_X_val, batch_y_val = batch_X_val.to(device), batch_y_val.to(device)
65 outputs_val = model(batch_X_val)
66 val_loss += criterion(outputs_val, batch_y_val).item()
67
68 val_loss /= len(val_data_loader)
69
70 # check if early stop
71 if val_loss < best_val_loss - min_delta:
72 best_val_loss = val_loss
73 epochs_no_improve = 0
74 else:
75 epochs_no_improve += 1
76 if epochs_no_improve >= patience:
77 main_logger.info(f'early stopping at epoch {epoch + 1}')
78 break
79
The tuning script uses the study component from the Optuna library to run the Bayesian Optimization.
The study component choose a neural architecture and hyperparameter set to test from the global search space.
Then, it builds, trains, and validates the model to find the optimal neural architecture that can minimize the loss (MSE).
src/model/torch_model/scripts/tuning.py
1import itertools
2import pandas as pd
3import numpy as np
4import optuna
5import torch
6import torch.nn as nn
7import torch.optim as optim
8from torch.utils.data import DataLoader, TensorDataset
9from sklearn.model_selection import train_test_split
10
11from src.model.torch_model.scripts.pretrained_base import DFN
12from src.model.torch_model.scripts.training import train_model
13from src._utils import main_logger
14
15
16device_type = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
17device = torch.device(device_type)
18
19# loss function
20criterion = nn.MSELoss()
21
22# define objective function for optuna
23def objective(trial):
24 # search space for the model
25 num_layers = trial.suggest_int('num_layers', 1, 20)
26 batch_norm = trial.suggest_categorical('batch_norm', [True, False])
27 dropout_rates = []
28 hidden_units_per_layer = []
29 for i in range(num_layers):
30 dropout_rates.append(trial.suggest_float(f'dropout_rate_layer_{i}', 0.0, 0.6))
31 hidden_units_per_layer.append(trial.suggest_int(f'n_units_layer_{i}', 8, 256)) # hidden units per layer
32
33 model = DFN(
34 input_dim=X_train.shape[1],
35 num_layers=num_layers,
36 dropout_rates=dropout_rates,
37 batch_norm=batch_norm,
38 hidden_units_per_layer=hidden_units_per_layer
39 ).to(device)
40
41 # search space for the optimizer
42 learning_rate = trial.suggest_float('learning_rate', 1e-10, 1e-1, log=True)
43 optimizer_name = trial.suggest_categorical('optimizer', ['adam', 'rmsprop', 'sgd', 'adamw', 'adamax', 'adadelta', 'radam'])
44 optimizer = _handle_optimizer(optimizer_name=optimizer_name, model=model, lr=learning_rate)
45
46 # data loaders
47 batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
48 test_size = 10000 if len(X_train) > 15000 else int(len(X_train) * 0.2)
49 X_train_search, X_val_search, y_train_search, y_val_search = train_test_split(X_train, y_train, test_size=test_size, random_state=42)
50 train_data_loader = create_torch_data_loader(X=X_train_search, y=y_train_search, batch_size=batch_size)
51 val_data_loader = create_torch_data_loader(X=X_val_search, y=y_val_search, batch_size=batch_size)
52
53 # training
54 num_epochs = 3000 # ensure enough epochs (early stopping would stop the loop when overfitting)
55 _, best_val_loss = train_model(
56 train_data_loader=train_data_loader,
57 val_data_loader=val_data_loader,
58 model=model,
59 optimizer=optimizer,
60 criterion = criterion,
61 num_epochs=num_epochs,
62 trial=trial,
63 )
64 return best_val_loss
65
66
67# start to optimize hyperparameters and architecture
68study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler())
69study.optimize(objective, n_trials=50, timeout=600)
70
71# best
72best_trial = study.best_trial
73best_hparams = best_trial.params
74
75# construct the model based on the tuning results
76best_lr = best_hparams['learning_rate']
77best_batch_size = best_hparams['batch_size']
78input_dim = X_train.shape[1]
79best_model = DFN(
80 input_dim=input_dim,
81 num_layers=best_hparams['num_layers'],
82 hidden_units_per_layer=[v for k, v in best_hparams.items() if 'n_units_layer_' in k],
83 batch_norm=best_hparams['batch_norm'],
84 dropout_rates=[v for k, v in best_hparams.items() if 'dropout_rate_layer_' in k],
85).to(device)
86
87# construct an optimizer based on the tuning results
88best_optimizer_name = best_hparams['optimizer']
89best_optimizer = _handle_optimizer(
90 optimizer_name=best_optimizer_name, model=best_model, lr=best_lr
91)
92
93# create torch data loaders
94train_data_loader = create_torch_data_loader(
95 X=X_train, y=y_train, batch_size=best_batch_size
96)
97val_data_loader = create_torch_data_loader(
98 X=X_val, y=y_val, batch_size=best_batch_size
99)
100
101# retrain the best model with full training dataset applying the optimal batch size and optimizer
102best_model, _ = train_model(
103 train_data_loader=train_data_loader,
104 val_data_loader=val_data_loader,
105 model=best_model,
106 optimizer=best_optimizer,
107 criterion = criterion,
108 num_epochs=1000
109)
110
111# create a checkpoint for serialization (reconstruct the model using the checkpoint)
112checkpoint = {
113 'state_dict': best_model.state_dict(),
114 'hparams': best_hparams,
115 'input_dim': X_train.shape[1],
116 'optimizer': best_optimizer,
117 'batch_size': best_batch_size
118}
119
120# serialize the model w/ checkpoint
121torch.save(checkpoint, FILE_PATH)
122
Performance Note:
The global search space for should be set broadly in the Bayesian Optimization to leverage its stochastic nature.
In the objective function, the search space is defined:
Number of hidden layers: Any integer between 1 to 20
Hidden units per layer: Any integer between 8 to 256
Batch norm: False or True
Dropout rate: Any float from 0.0 to 0.6
Optimizer learning rate: Any float from 1e-10 to 1e-1
Optimizer: 'adam', 'rmsprop', 'sgd', 'adamw', 'adamax', 'adadelta', or 'radam'
Batch size: Either 32, 64, 128, or 256
◼ 2. Scikit-Learn Models (Backups)
For the Scikit-Learn models, I’ll run k-fold cross validation during training to prevent overfitting.
K-fold cross-validation is a technique for evaluating a machine learning model's performance by training and testing it on different subsets of training data.
I defined the run_kfold_validation function where the model is trained and validated using 5-fold cross-validation.
src/model/sklearn_model/scripts/tuning.py
1from sklearn.model_selection import KFold
2from sklearn.metrics import mean_squared_error
3
4def run_kfold_validation(
5 X_train,
6 y_train,
7 base_model,
8 hparams: dict,
9 n_splits: int = 5, # the number of folds
10 early_stopping_rounds: int = 10,
11 max_iters: int = 200
12 ) -> float:
13
14 mses = 0.0
15
16 # create k-fold component
17 kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
18
19 for fold, (train_index, val_index) in enumerate(kf.split(X_train)):
20 # create a subset of training and validation datasets from the entire training data
21 X_train_fold, X_val_fold = X_train.iloc[train_index], X_train.iloc[val_index]
22 y_train_fold, y_val_fold = y_train.iloc[train_index], y_train.iloc[val_index]
23
24 # reconstruct a model
25 model = base_model(**hparams)
26
27 # start the cross validation
28 best_val_mse = float('inf')
29 patience_counter = 0
30 best_model_state = None
31 best_iteration = 0
32
33 for iteration in range(max_iters):
34 # train on a subset of the training data
35 try:
36 model.train_one_step(X_train_fold, y_train_fold, iteration)
37 except:
38 model.fit(X_train_fold, y_train_fold)
39
40 # make a prediction on validation data
41 y_pred_val_kf = model.predict(X_val_fold)
42
43 # compute validation loss (MSE)
44 current_val_mse = mean_squared_error(y_val_fold, y_pred_val_kf)
45
46 # check if epochs should be stopped (early stopping)
47 if current_val_mse < best_val_mse:
48 best_val_mse = current_val_mse
49 patience_counter = 0
50 best_model_state = model.get_params()
51 best_iteration = iteration
52 else:
53 patience_counter += 1
54
55 # execute early stopping when patience_counter exceeds early_stopping_rounds
56 if patience_counter >= early_stopping_rounds:
57 main_logger.info(f"Fold {fold}: Early stopping triggered at iteration {iteration} (best at {best_iteration}). Best MSE: {best_val_mse:.4f}")
58 break
59
60
61 # after training epochs, reconstruct the best performing model
62 if best_model_state: model.set_params(**best_model_state)
63
64 # make prediction
65 y_pred_val_kf = model.predict(X_val_fold)
66
67 # add MSEs
68 mses += mean_squared_error(y_pred_val_kf, y_val_fold)
69
70 # compute the final loss (avarage of MSEs across folds)
71 ave_mse = mses / n_splits
72 return ave_mse
73
Then, for the tuning script, I used the gp_minimize function from the Scikit-Optimize library.
The gp_minimize function is used to tune hyperparameters with Bayesian optimization.
This function intelligently searches the best hyperparameter set that can minimize the model's error, which is calculated using the run_kfold_validation function defined earlier.
The best-performing hyperparameters are then used to reconstruct and train the final model.
src/model/sklearn_model/scripts/tuning.py
1from functools import partial
2from skopt import gp_minimize
3
4
5# define the objective function for Bayesian Optimization using Scikit-Optimize
6def objective(params, X_train, y_train, base_model, hparam_names):
7 hparams = {item: params[i] for i, item in enumerate(hparam_names)}
8 ave_mse = run_kfold_validation(X_train=X_train, y_train=y_train, base_model=base_model, hparams=hparams)
9 return ave_mse
10
11# create the search space
12hparam_names = [s.name for s in space]
13objective_partial = partial(objective, X_train=X_train, y_train=y_train, base_model=base_model, hparam_names=hparam_names)
14
15# search the optimal hyperparameters
16results = gp_minimize(
17 func=objective_partial,
18 dimensions=space,
19 n_calls=n_calls,
20 random_state=42,
21 verbose=False,
22 n_initial_points=10,
23)
24# results
25best_hparams = dict(zip(hparam_names, results.x))
26best_mse = results.fun
27
28# reconstruct the model with the best hyperparameters
29best_model = base_model(**best_hparams)
30
31# retrain the model with full training dataset
32best_model.fit(X_train, y_train)
33
Step 2. Configure Feature/Model Stores on S3
The trained models and processed data are stored in the S3 bucket as a Parquet file.
I’ll draft the s3_upload function where the Boto3 client, a low-level interface to an AWS service, initiates connection to S3:
1import os
2import boto3
3from dotenv import load_dotenv
4
5from src._utils import main_logger
6
7def s3_upload(file_path: str):
8 # initiate the boto3 client
9 load_dotenv(override=True)
10 S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME') # the bucket created in s3
11 s3_client = boto3.client('s3', region_name=os.environ.get('AWS_REGION_NAME')) # your default region
12
13 if s3_client:
14 # create s3 key and upload the file to the bucket
15 s3_key = file_path if file_path[0] != '/' else file_path[1:]
16 s3_client.upload_file(file_path, S3_BUCKET_NAME, s3_key)
17 main_logger.info(f"file uploaded to s3://{S3_BUCKET_NAME}/{s3_key}")
18 else:
19 main_logger.error('failed to create an S3 client.')
20
◼ Model Store
The trained PyTorch models are serialized (converted) into .pth files.
Then, these files are uploaded to the S3 bucket, enabling the system to load the trained model when it performs inference in production.
1import torch
2
3from src._utils import s3_upload
4
5# model serialization, store in local
6torch.save(trained_model.state_dict(), MODEL_FILE_PATH)
7
8# upload to s3 model store
9s3_upload(file_path=MODEL_FILE_PATH)
10
◼ Feature Store
The processed data is converted into a CSV and Parquet file format.
Then, the Parquet files are uploaded to the S3 bucket, enabling the system to load the lightweight data when it creates prediction data to perform inference in production.
1from src._utils import s3_upload
2
3# store csv and parquet files in local
4df.to_csv(file_path, index=False)
5df.to_parquet(DATA_FILE_PATH, index=False)
6
7# store in s3 feature store
8s3_upload(file_path=DATA_FILE_PATH)
9
10# trained preprocessor is also stored to transform the prediction data
11s3_upload(file_path=PROCESSOR_PATH)
12
Step 3: Create a Flask Application with API Endpoints
Next, we’ll create a Flask application with API endpoints.
Flask needs to configure Python scripts in the app.py file located at the root of the project repository.
As showed in the code snippets, the app.py file needs to contain the components in order of:
AWS Boto3 client setup,
Flask app configuration and API endpoint setup,
Loading the trained preprocessor, processed input data X_test, and trained models,
Invoke the Lambda function via API Gateway, and
The local test section.
Note that X_test should never be used during model training to avoid data leakage.
app.py
1from flask import Flask
2from flask_cors import cross_origin
3from waitress import serve
4from dotenv import load_dotenv
5
6from src._utils import main_logger
7
8# global variables (will be loaded from the S3 buckets)
9_redis_client = None
10X_test = None
11preprocessor = None
12model = None
13backup_model = None
14
15# load env if local else skip (lambda refers to env in production)
16AWS_LAMBDA_RUNTIME_API = os.environ.get('AWS_LAMBDA_RUNTIME_API', None)
17if AWS_LAMBDA_RUNTIME_API is None: load_dotenv(override=True)
18
19
20#### <---- 1. AWS BOTO3 CLIENT ---->
21# boto3 client
22S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', 'ml-sales-pred')
23s3_client = boto3.client('s3', region_name=os.environ.get('AWS_REGION_NAME', 'us-east-1'))
24try:
25 # test connection to boto3 client
26 sts_client = boto3.client('sts')
27 identity = sts_client.get_caller_identity()
28 main_logger.info(f"Lambda is using role: {identity['Arn']}")
29except Exception as e:
30 main_logger.error(f"Lambda credentials/permissions error: {e}")
31
32#### <---- 2. FLASK CONFIGURATION & API ENDPOINTS ---->
33# configure the flask app
34app = Flask(__name__)
35app.config['CORS_HEADERS'] = 'Content-Type'
36
37# add a simple API endpoint to serve the prediction by price point to test
38@app.route('/v1/predict-price/<string:stockcode>', methods=['GET', 'OPTIONS'])
39@cross_origin(origins=origins, methods=['GET', 'OPTIONS'], supports_credentials=True)
40def predict_price(stockcode):
41 df_stockcode = None
42
43 # fetch request params
44 data = request.args.to_dict()
45
46 try:
47 # fetch cache
48 if _redis_client is not None:
49 # returns cached prediction results if any without performing inference
50 cached_prediction_result = _redis_client.get(cache_key_prediction_result_by_stockcode)
51 if cached_prediction_result:
52 return jsonify(json.loads(json.dumps(cached_prediction_result)))
53
54 # historical data of the selected product
55 cached_df_stockcode = _redis_client.get(cache_key_df_stockcode)
56 if cached_df_stockcode: df_stockcode = json.loads(json.dumps(cached_df_stockcode))
57
58
59 # define the price range to make predictions. can be a request param, or historical min/max prices
60 min_price = float(data.get('unitprice_min', df_stockcode['unitprice_min'][0]))
61 max_price = float(data.get('unitprice_max', df_stockcode['unitprice_max'][0]))
62
63 # create bins in the price range. when the number of the bins increase, the prediction becomes more smooth, but requires more computational cost
64 NUM_PRICE_BINS = int(data.get('num_price_bins', 100))
65 price_range = np.linspace(min_price, max_price, NUM_PRICE_BINS)
66
67 # create a prediction dataset by merging X_test (dataset never used in model training) and df_stockcode
68 price_range_df = pd.DataFrame({ 'unitprice': price_range })
69 test_sample = X_test.sample(n=1000, random_state=42)
70 test_sample_merged = test_sample.merge(price_range_df, how='cross') if X_test is not None else price_range_df
71 test_sample_merged.drop('unitprice_x', axis=1, inplace=True)
72 test_sample_merged.rename(columns={'unitprice_y': 'unitprice'}, inplace=True)
73
74 # preprocess the dataset
75 X = preprocessor.transform(test_sample_merged) if preprocessor else test_sample_merged
76
77 # perform inference
78 y_pred_actual = None
79 epsilon = 0
80 # try using the primary model
81 if model:
82 input_tensor = torch.tensor(X, dtype=torch.float32)
83 model.eval()
84 with torch.inference_mode():
85 y_pred = model(input_tensor)
86 y_pred = y_pred.cpu().numpy().flatten()
87 y_pred_actual = np.exp(y_pred + epsilon)
88
89 # if not, use backups
90 elif backup_model:
91 y_pred = backup_model.predict(X)
92 y_pred_actual = np.exp(y_pred + epsilon)
93
94
95 # finalize the outcome for client app
96 df_ = test_sample_merged.copy()
97 df_['quantity'] = np.floor(y_pred_actual) # quantity must be an integer
98 df_['sales'] = df_['quantity'] * df_['unitprice'] # compute sales
99 df_ = df_.sort_values(by='unitprice')
100
101 # aggregate the results by the unitprice in the price range
102 df_results = df_.groupby('unitprice').agg(
103 quantity=('quantity', 'median'),
104 quantity_min=('quantity', 'min'),
105 quantity_max=('quantity', 'max'),
106 sales=('sales', 'median'),
107 ).reset_index()
108
109 # find the optimal price point
110 optimal_row = df_results.loc[df_results['sales'].idxmax()]
111 optimal_price = optimal_row['unitprice']
112 optimal_quantity = optimal_row['quantity']
113 best_sales = optimal_row['sales']
114
115 all_outputs = []
116 for _, row in df_results.iterrows():
117 current_output = {
118 "stockcode": stockcode,
119 "unit_price": float(row['unitprice']),
120 'quantity': int(row['quantity']),
121 'quantity_min': int(row['quantity_min']),
122 'quantity_max': int(row['quantity_max']),
123 "predicted_sales": float(row['sales']),
124 }
125 all_outputs.append(current_output)
126
127 # store the prediction results in cache
128 if all_outputs and _redis_client is not None:
129 serialized_data = json.dumps(all_outputs)
130 _redis_client.set(
131 cache_key_prediction_result_by_stockcode,
132 serialized_data,
133 ex=3600 # expire in an hour
134 )
135
136 # return a list of all outputs
137 return jsonify(all_outputs)
138
139 except Exception as e: return jsonify([])
140
141
142# request header management (for the process from API gateway to the Lambda)
143@app.after_request
144def add_header(response):
145 response.headers['Cache-Control'] = 'public, max-age=0'
146 response.headers['Access-Control-Allow-Origin'] = CLIENT_A
147 response.headers['Access-Control-Allow-Headers'] = 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,Origin'
148 response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONSS'
149 response.headers['Access-Control-Allow-Credentials'] = 'true'
150 return response
151
152#### <---- 3. LOADING PROCESSOR, DATASET, AND MODELS ---->
153load_processor()
154load_x_test()
155load_model()
156
157#### <---- 4. INVOKE LAMBDA ---->
158def handler(event, context):
159 logger.info("lambda handler invoked.")
160 try:
161 # connecting the redis client after the lambda is invoked
162 get_redis_client()
163 except Exception as e:
164 logger.critical(f"failed to establish initial Redis connection in handler: {e}")
165 return {
166 'statusCode': 500,
167 'body': json.dumps({'error': 'Failed to initialize Redis client. Check environment variables and network config.'})
168 }
169
170 # use the awsgi package to convert JSON to WSGI
171 return awsgi.response(app, event, context)
172
173
174#### <---- 5. FOR LOCAL TEST ---->
175# serve the application locally on WSGI server, waitress
176# lambda will ignore this section.
177if __name__ == '__main__':
178 if os.getenv('ENV') == 'local':
179 main_logger.info("...start the operation (local)...")
180 serve(app, host='0.0.0.0', port=5002)
181 else:
182 app.run(host='0.0.0.0', port=8080)
183
I’ll test the endpoint locally using the uv package manager:
1$uv run app.py --cache-clear
2
3$curl http://localhost:5002/v1/predict-price/{STOCKCODE}
4
The system provided a list of sales predictions for each price point:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure D. Local response of the Flask API
◼ Key Points on Flask App Configuration
There are various points you should take into consideration when configuring a Flask application with Lambda. Let’s go over them now:
1. A Few API Endpoints Per Container
Adding many API endpoints to a single serverless instance can lead to monolithic function concern where issues in one endpoint impact others.
In this project, we’ll focus on a single endpoint per container – and if needed, we can add separate Lambda functions to the system.
2. Understanding the handler Function and AWSGI
The handler function is invoked every time the Lambda function receives a client request from the API Gateway.
The function takes the event argument that includes the request details in a JSON dictionary and passes it to the Flask application.
AWSGI acts as an adapter, translating a Lambda event in JSON format into a WSGI request that a Flask application can understand, and converts the application’s response back into a JSON format that Lambda and API Gateway can process.
3. Connecting Cache Storage
The get_redis_client function is called once the handler function is called by the API Gateway. This allows the Flask application to store or fetch a cache from the Redis client:
1import redis
2import redis.cluster
3from redis.cluster import ClusterNode
4
5_redis_client = None
6
7def get_redis_client():
8 global _redis_client
9 if _redis_client is None:
10 REDIS_HOST = os.environ.get("REDIS_HOST")
11 REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
12 REDIS_TLS = os.environ.get("REDIS_TLS", "true").lower() == "true"
13 try:
14 startup_nodes = [ClusterNode(host=REDIS_HOST, port=REDIS_PORT)]
15 _redis_client = redis.cluster.RedisCluster(
16 startup_nodes=startup_nodes,
17 decode_responses=True,
18 skip_full_coverage_check=True,
19 ssl=REDIS_TLS, # elasticache has encryption in transit: enabled -> must be true
20 ssl_cert_reqs=None,
21 socket_connect_timeout=5,
22 socket_timeout=5,
23 health_check_interval=30,
24 retry_on_timeout=True,
25 retry_on_error=[
26 redis.exceptions.ConnectionError,
27 redis.exceptions.TimeoutError
28 ],
29 max_connections=10, # limit connections for Lambda
30 max_connections_per_node=2 # limit per node
31 )
32 _redis_client.ping()
33 main_logger.info("successfully connected to ElastiCache Redis Cluster (Configuration Endpoint)")
34 except Exception as e:
35 main_logger.error(f"an unexpected error occurred during Redis Cluster connection: {e}", exc_info=True)
36 _redis_client = None
37 return _redis_client
38
4. Handling Heavy Tasks Outside of the handler Function
Serverless functions can experience a cold start duration.
While a Lambda function can run for up to 15 minutes, its associated API Gateway has a timeout of 29 seconds (29,000 ms) for a RESTful API.
So, any heavy tasks like loading preprocessors, input data, or models should be performed once outside of the handler function, ensuring they are ready before the API endpoint is called.
Here are the loading functions called in app.py.
app.py
1import joblib
2
3from src._utils import s3_load, s3_load_to_temp_file
4
5preprocessor = None
6X_test = None
7model = None
8backup_model = None
9
10
11# load processor
12def load_preprocessor():
13 global preprocessor
14 preprocessor_tempfile_path = s3_load_to_temp_file(PREPROCESSOR_PATH)
15 preprocessor = joblib.load(preprocessor_tempfile_path)
16 os.remove(preprocessor_tempfile_path)
17
18
19# load input data
20def load_x_test():
21 global X_test
22 x_test_io = s3_load(file_path=X_TEST_PATH)
23 X_test = pd.read_parquet(x_test_io)
24
25
26# load model
27def load_model():
28 global model, backup_model
29 # try loading & reconstructing the primary model
30 try:
31 # first load io file from the s3 bucket
32 model_data_bytes_io_ = s3_load(file_path=DFN_FILE_PATH)
33 # convert to checkpoint dictionary (containing hyperparameter set)
34 checkpoint_ = torch.load(
35 model_data_bytes_io_,
36 weights_only=False,
37 map_location=device
38 )
39 # reconstruct the model
40 model = t.scripts.load_model(checkpoint=checkpoint_, file_path=DFN_FILE_PATH)
41 # set the model evaluation mode
42 model.eval()
43
44 # else, backup model
45 except:
46 load_artifacts_backup_model()
47
Step 4. Publish a Docker Image to ECR
After configuring the Flask application, I’ll containerize the entire application on Docker.
Containerization makes a package of the application, including models, its dependencies, and configuration in machine learning context, as a container.
Docker creates a container image based on the instructions defined in a Dockerfile, and the Docker engine uses the image to run the isolated container.
In this project, the Docker container image is uploaded to ECR, so the Lambda function can access it in production.
This process follows by defining the .dockerignore file to optimize the container image:
.dockerignore
1# any irrelevant data
2__pycache__/
3.ruff_cache/
4.DS_Store/
5.venv/
6dist/
7.vscode
8*.psd
9*.pdf
10[a-f]*.log
11tmp/
12awscli-bundle/
13
14# add any experimental models, unnecessary data
15dfn_bayesian/
16dfn_grid/
17data/
18notebooks/
19
Dockerfile
1# serve from aws ecr
2FROM public.ecr.aws/lambda/python:3.12
3
4# define a working directory in the container
5WORKDIR /app
6
7# copy the entire repository (except .dockerignore) into the container at /app
8COPY . /app/
9
10# install dependencies defined in the requirements.txt
11RUN pip install --no-cache-dir -r requirements.txt
12
13# define commands
14ENTRYPOINT [ "python" ]
15CMD [ "-m", "awslambdaric", "app.handler" ]
16
◼ Test in Local
I’ll test the Docker image by building the container named my-app locally:
1$docker build -t my-app -f Dockerfile .
2
Then, run the container with the waitress server in local:
1$docker run -p 5002:5002 -e ENV=local my-app app.py
2
The -e ENV=local flag sets the environment variable inside the container, which will trigger the waitress.serve() call in the app.py.
In terminal, you’ll find a message saying:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
You can also call the endpoint created to see the results returned:
1$uv run app.py --cache-clear
2
3$curl http://localhost:5002/v1/predict-price/{STOCKCODE}
4
▫ Publish the Docker Image to ECR
To publish the Docker image, we first need to configure the default AWS credentials and region:
From the AWS account console, issue an access token and check the default region.
Store them in the ~/aws/credentials and ~/aws/config files:
~/aws/credentials
1[default]
2aws_secret_access_key=
3aws_access_key_id=
4
~/aws/config
1[default]
2region=
3
After the configuration, I’ll publish the Docker image to ECR.
1# authenticate the docker client to ECR
2$aws ecr get-login-password --region <your-aws-region> | docker login --username AWS --password-stdin <your-aws-account-id>.dkr.ecr.<your-aws-region>.amazonaws.com
3
4# create repository
5$aws ecr create-repository --repository-name <your-repo-name> --region <your-aws-region>
6
7# tag the docker image
8$docker tag <your-repo-name>:<your-app-version> <your-aws-account-id>.dkr.ecr.<your-aws-region>.amazonaws.com/<your-app-name>:<your-app-version>
9
10# push
11$docker push <your-aws-account-id>.dkr.ecr.<your-aws-region>.amazonaws.com/<your-repo-name>:<your-app-version>
12
<your-aws-region>: Your default AWS region (e.g., us-east-1 ).
<your-aws-account-id>: 12-digit AWS account ID.
<your-repo-name>: Your desired repository name.
<your-app-version>: Your desired tag name (e.g., v1.0).
Step 5. Create a Lambda Function
Next, I’ll create a Lambda function.
From the Lambda console, choose:
The Container Image option,
The container image URL from the pull down list,
A function name of our choice, and
An architecture type (arm64 is recommended for a better price-performance).

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure E. AWS Lambda function configuration
The Lambda function my-app is successfully launched.
◼ Connect the Lambda function to API Gateway
Next, I’ll add API gateway as an event trigger to the Lambda function.
First, visit the API Gateway console and create REST API methods using the ARN of the Lambda function:
Press enter or click to view image in full size

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Fig. Screenshot of the AWS API Gateway configuration
Then, add resources to the created API gateway to create an endpoint:
API Gateway > APIs > Resources > Create Resource
Align the resource endpoint with the API endpoint defined in the app.py.
Configure CORS (e.g., accept specific origins).
Deploy the resource to the stage.
Going back to the Lambda console, you’ll find the API Gateway is connected as an event trigger:
Lambda > Function > my-app (your function name)

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure F. Screenshot of the AWS Lambda dashboard
Step 6. Configure AWS Resources
Lastly, I’ll configure related AWS resources to make the system work in production.
This process involves the following steps:
◼ The IAM Role: Controls Who to Access Resources
AWS requires IAM roles to grant temporary, secure permissions to users, mitigating security risks related to long-term credentials like passwords.
The IAM role leverages policies to grant accesses to the selected service. Policies can be issued by AWS or customized by the user by defining the inline policy.
It is important to avoid overly permissive access rights for the IAM role.
In the Lambda function console, check the execution role:
Lambda > Function > <FUNCTION> > Permission > The execution role.Set up the following policies to allow the Lambda’s IAM role to handle necessary operations:
Lambda AWSLambdaExecute: Allows executing the function.
EC2 Inline policy: Allows controlling the security group and the VPC of the Lambda function.
ECR AmazonElasticContainerRegistryPublicFullAccess + Inline policy: Allows storing and pulling the Docker image.
ElastiCache AmazonElastiCacheFullAccess + Inline policy: Allows storing and pulling caches.
S3: AmazonS3ReadOnlyAccess + Inline policy: Allows reading and storing contents.
Now, the IAM role can access these resources and perfo the allowed actions.
◼ The Security Group: Controls Network Traffic
A security group is a virtual firewall that controls inbound and outbound network traffic for AWS resources.
It uses stateful (allowing return traffic automatically) “allow-only” rules based on protocol, port, and IP address, where it denies all traffic by default.
Create a new security group for the Lambda function:
EC2 > Security Groups > <YOUR SECURITY GROUP>Setup inbound / outbound traffic rules:
The inbound rules:
S3 → Lambda:Type*: HTTPS /* Protocol*: TCP /* Port range*: 443 / Source: Custom**
ElastiCache → Lambda:Type*: Custom TCP /* Port range*: 6379 / Source: Custom**
*Choose the created security group for the Lambda function as a custom source.
The outbound rules:
Lambda → Internet: Type*: HTTPS /* Protocol*: TCP /* Port range*: 443 /* Destination*: 0.0.0.0/0*
ElastiCache → Internet: Type*: All Traffic /* Destination*: 0.0.0.0/0*
◼ The Virtual Private Cloud (VPC)
A Virtual Private Cloud (VPC) provides a logically isolated private network for the AWS resources, acting as our own private data center within AWS.
AWS can create a Hyperplane ENI (Elastic Network Interface) for the Lambda function and its connected resources in the subnets of the VPC.
Though it is optional, I’ll use the VPC to connect the Lambda function to the S3 storage and ElastiCache.
This process involves:
Create a VPC endpoint from the VPC console:VPC > Create VPC.
Create an STS (Security Token Service) endpoint:
VPC > PrivateLink and Lattice > Endpoints > Create Endpoint >Type*: AWS Service*
Service name*: com.amazonaws.<YOUR REGION>.sts*
Type*: Interface*
VPC: Select the VPC created earlier.
Subnets*: Select all subnets.*
Security groups*: Select the security group of the Lambda function.*
Policy*: Full access*
Enable DNS names
The VPC must have a dedicate endpoint for STS to receive temporary credentials from STS.
3. Create an S3 endpoint in the VPC:
VPC > PrivateLink and Lattice > Endpoints > Create Endpoint >
Type*: AWS Service*
Service name*: com.amazonaws.<YOUR REGION>.s3*
Type*: Gateway*
VPC: Select the VPC created earlier.
Subnets*: Select all subnets.*
Security groups*: Select the security group of the Lambda function.*
Policy*: Full access*
Lastly, check the security group of the Lambda function and ensure that its VPC ID directs to the VPC created: EC2 > Security Group > <YOUR SECURITY GROUP FOR THE LAMDA FUNCTION> > VPC ID.
That’s all for the deployment flow.
We can test the API endpoint in production:
Copy the Invoke URL of the deployed API endpoint: API Gateway > APIs > Stages > Invoke URL
Call the API endpoint and check if it responds predictions:
1$curl -H 'Authorization: Bearer YOUR_API_TOKEN' -H 'Accept: application/json' \
2 '<INVOKE URL>/<ENDPOINT>'
3
For logging and debugging, I’ll use the LiveTail of CloudWatch: CloudWatch > LiveTail.
Wrapping Up
Building a machine learning system requires thoughtful project scoping and architecture design.
In this article, we built a dynamic pricing system as a simple single interface on the containerized serverless architecture.
Moving forward, we need to consider potential drawbacks of this minimal architecture:
Increase in cold start duration: The WSGI adapter awsgi layer adds a small overhead. Loading a larger container image takes longer time.
Monolithic function: Adding endpoints to the Lambda function can lead to a monolithic function where an issue in one endpoint impacts others.
Less granular observability: AWS CloudWatch cannot provide individual invocation/error metrics per API endpoint without custom instrumentation.
To scale the application effectively, extracting functionalities into a new microservice can be a good strategy to the next step.
Continue Your Learning
If you enjoyed this blog, these related entries will complete the picture:
Architecting Production ML: A Deep Dive into Deployment and Scalability
Data Pipeline Architecture: From Traditional DWH to Modern Lakehouse
Building an Automated CI/CD Pipeline for Serverless Machine Learning on AWS
Related Books for Further Understanding
These books cover the wide range of theories and practices; from fundamentals to PhD level.

Linear Algebra Done Right

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps
Share What You Learned
Kuriko IWAI, "From Notebook to Production: Building a Resilient ML Pipeline on AWS Lambda" in Kernel Labs
https://kuriko-iwai.com/building-dynamic-pricing-system
Looking for Solutions?
- Deploying ML Systems 👉 Book a briefing session
- Hiring an ML Engineer 👉 Drop an email
- Learn by Doing 👉 Enroll AI Engineering Masterclass
Written by Kuriko IWAI. All images, unless otherwise noted, are by the author. All experimentations on this blog utilize synthetic or licensed data.


