Engineering a Fully-Automated Lakehouse: From Raw Data to Gold Tables
Architecting an end-to-end data pipeline for scalable machine learning system
By Kuriko IWAI

Table of Contents
IntroductionWhat is a Lakehouse ArchitectureIntroduction
The Lakehouse architecture is a robust architecture that combines benefits of data lakes and data warehouses into a single platform.
But implementing the architecture involves complexity and discipline in data governance.
In this article, I'll walk you through building a fully-automated lakehouse architecture, combining:
AWS S3 for storage,
Delta Lake for logging,
Apache Spark for computation, and
Apache Airflow for Directed Acyclic Graph (DAG) scheduling,
taking stock price prediction for an example.
What is a Lakehouse Architecture
A lakehouse architecture is a data pipeline that combines the best features of data lakes and data warehouses into a single, unified platform, overcoming the limitations of having separate systems.
The below diagram illustrates key components in the lakehouse architecture with a medallion structure:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure A. Lakehouse architecture with a medallion structure (Created by Kuriko IWAI)
In the architecture, data is extracted from diverse sources:
Data structure (structured, semi-structured, or unstructured),
Delivery (batch or streaming), and
Location (internal or external).
Then, the layers in the lakehouse perform data processing:
Bronze Layer: Stores all types of data in their raw, original formats. A low-cost, scalable storage layer acting as a permanent, immutable archive of the raw data.
Silver Layer: Performs data cleansing and initial transformations like imputation, standardizing data formats, and correcting data types.
Gold Layer: A final layer to engineering relatable features for predictions or BI reporting.
In the architecture, Bronze Layer works as a data lake where raw data is stored without transformation.
Silver and Gold Layers act as a data warehouse where a metadata layer sits on top of the data lake (bronze layer), ensuring:
ACID (Atomicity, Consistency, Isolation, Durability) transaction: Ensures data reliability and integrity for concurrent read/write operations,
Schema enforcement: Enforce data consistency by conforming data to a predefined schema,
Data governance: Provide robust controls for data access, lineage, and compliance, and
Indexing: Optimize query performance via indexing and caching.
◼ Why Lakehouse Architecture
Traditionally, organizations used a two-tiered architecture with:
A data lake for raw, unstructured data, and
A data warehouse for structured, cleansed data for fast BI queries.
This posed challenges like operational complexity in managing two separate systems and data duplication when transforming data from the data lake to the warehouse.
A lakehouse architecture addresses these challenges by creating a centralized platform.
Key benefits include:
Simplified architecture and lower overhead with a single platform of all data types,
Improved data quality based on schema enforcement and ACID transactions,
Supporting diverse workloads for both traditional BI and advanced AI applications, and
Avoiding vendor lock-in because lakehouses use open file formats like Apache Parquet and open table formats like Delta Lake.
To fully leverage the benefits, I’ll build a Delta Lake Lakehouse where data is stored, versioned, and processed seamlessly in open-source frameworks.
Building Delta Lake Lakehouse
Delta Lake Lakehouse (DLL) is a type of lakehouse architectures that leverages open-source data storage and management frameworks.
The below diagram shows the DLL architecture I’ll build:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure B. DLL architecture (Created by Kuriko IWAI)
First, DLL handles the entire data processing pipeline from extraction to feature engineering, preparing data for model-specific preprocessing like standardization or encoding.
It has three horizontal layers: storage, transactions, and computation, each of which is managed by a service provider (I selected major players, but options for each layer vary):
Storage Layer: Amazon S3 stores raw data in Bronze layer and Delta Lake tables in Silver and Gold Layer.
Transaction Layer: Delta Lake, an open-source storage layer, adds a transaction log, enabling ACID transactions, schema enforcement, and versioning.
Computation Layer: Apache Spark, an unified analytics engine used for processing the data, processes computation necessary for structuring data in the Delta Lake tables.
In the orchestration layer, the orchestration framework, Apache Airflow is used to schedule DAGs that continuously extract the latest stock price data from the API and proactively detect data drift.
DAG is a fundamental concept that represents a collection of tasks (nodes) with a specific order, but without any loops, which stands for:
Directed: The relationships between the nodes (tasks) have a clear, one-way direction, like an arrow,
Acyclic: “without cycles” or loops, and
Graph: A network structure consisting of nodes and edges (connections between nodes).
Now, let us take a look at building process.
Workflow in Action
I’ll take the following steps to build the Delta Lake Lakehouse architecture:
Extraction: Fetch raw data from API endpoints.
Bronze layer: Load the raw data to the S3 bucket.
Setting Up Apache Spark.
Configure the SparkSession.
Silver layer: Transform and load the Bronze data into a Delta Lake table in S3.
Gold layer: Transform and load the Silver data into a Delta Lake table in S3.
Test run.
Scheduling: Add Apache Airflow DAGs.
CI/CD integration: Add PyTest and Github Workflow.
◼ Step 1. Extraction
The first step is to extract raw data from diverse data sources.
For demonstration, I’ll fetch stock price data from Alpha Vantage.
First, claim your API key from the official site and add the key to .env file:
1touch .env > API_KEY=<YOUR_API_KEY>
2
Then, I’ll define the extract_daily_stock_data function that fetches stock price data of a selected ticker:
src/data_handling/extract.py
1import os
2import json
3import requests
4
5def extract_daily_stock_data(ticker, function: str = 'TIME_SERIES_DAILY') -> dict:
6 API_URL = 'https://www.alphavantage.co/query'
7 api_key = os.environ.get('API_KEY')
8 params = {
9 'function': function,
10 'symbol': ticker,
11 'outputsize': 'full',
12 'apikey': api_key
13 }
14
15 res = requests.get(API_URL, params=params)
16 res.raise_for_status()
17 data = res.json()
18 stock_price_data = data['Time Series (Daily)']
19 return stock_price_data
20
The dictionary stock_price_data stores daily stock data, with each key being the date and the value being a pair of price and transaction volume.
1{
2 "2025-09-04": {
3 "1. open": "170.5700",
4 "2. high": "171.8600",
5 "3. low": "169.4100",
6 "4. close": "171.6600",
7 "5. volume": "141670144"
8 },
9 ...
10}
11
◼ Step 2. The Bronze Layer
Bronze layer loads the raw data to S3.
I’ll first configure AWS credentials and an S3 bucket:
Create an S3 bucket from the Amazon S3 console.
In the .env file, add AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION_NAME of the Role that creates the S3 bucket.
Learn More: How to configure AWS Resources
Then, I’ll define the load_to_s3 function, which loads the raw data in a JSON string format to the S3 bucket, using the s3a:// schema.
Here, the Boto3 client, a low-level interface for interacting with AWS services, calls the S3 PutObject API action directly to store the raw data in the S3.
src/data_handling/lakehouse/bronze.py
1import os
2import json
3import shutil
4import boto3
5
6def load_to_s3(data, ticker: str = 'NVDA') -> str:
7 # define the file path
8 folder_path = os.path.join('data', 'bronze', ticker)
9 if os.path.exists(folder_path): shutil.rmtree(folder_path) # clean up previous run's data for a fresh start
10
11 file_name = f'{ticker}_bronze.json'
12 file_path = os.path.join(folder_path, file_name)
13
14 # s3 load using s3a schema
15 S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', 'ml-stockprice-pred')
16 s3_key = file_path
17 bronze_s3_path = f's3a://{S3_BUCKET_NAME}/{s3_key}'
18
19 # convert to a json string
20 json_string = json.dumps(data)
21
22 # initiate s3 boto3 client
23 s3_client = boto3.client('s3')
24 s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=s3_key, Body=json_string)
25 return bronze_s3_path
26
◼ Step 3. Setting Up Apache Spark
Silver and Gold Layers run computation using Apache Spark.
First, check if Spark and Java exist in the local system:
1$echo $JAVA_HOME
2$echo $SPARK_HOME
3
If both exist, you can skip this step. If not, let us go through the following steps.
▫ 3-1. Package download
First, install packages:
1# for macos
2$brew install openjdk@17
3$pip install pyspark
4
5# for linux
6$sudo apt install openjdk-17-jdk pyspark
7
And download the Spark .tgz file spark-4.0.0-bin-hadoop3.tgz from here.
▫ 3-2. Verifying Apache Release Signatures
Next, verify the signature of an Apache release before decompressing the spark-4.0.0-bin-hadoop3.tgz file.
This process relies on Pretty Good Privacy (PGC) or GNU Privacy Guard (GBG).
First, download the files from the Apache distribution directory.
The release file (e.g., httpd-2.4.65.tar.gz): Contains all the necessary files to compile and install the Apache web server version 2.4.65.
The detached signature file (e.g., httpd-2.4.65.tar.gz.asc): An ASCII-armored PGP or GPG signature, used to verify the authenticity and integrity of the corresponding .tar.gz file.
Run the gpg --verify command from the folder where the release file is downloaded:
1$gpg --verify httpd-2.4.65.tar.gz.asc httpd-2.4.65.tar.gz
2
- If gpg raises warning like gpg: Can't check signature: public key not found, download the KEYS file from the Apache distribution directory and run:
1$gpg --import KEYS.txt
2
- Run the verification command again:
1$gpg --verify httpd-2.4.65.tar.gz.asc httpd-2.4.65.tar.gz
2
This time, the output should be gpg: Good signature.
A "good signature" means the file has not been altered since it was signed. No attackers use a fake key to sign a malicious file.
Decompress the spark-4.0.0-bin-hadoop3.tgz file and store it in a directory of your choice.
Add JAVA_HOME and SPARK_HOME to the .zshrc or .bash file:
~/.zshrc
1# adding java_home
2export JAVA_HOME="/opt/homebrew/opt/openjdk@17"
3export PATH="$JAVA_HOME/bin:$PATH"
4
5# adding spark_home
6export SPARK_HOME="/Library/spark-4.0.0-bin-hadoop3"
7export PATH="$SPARK_HOME/bin:$PATH"
8
- Activate the updates:
1$. ~/.zshrc
2
◼ Step 4. Configuring Spark Session
Next, I’ll configure the SparkSession.
To make DLL works, Apache Spark must download necessary packages:
Delta Lake,
S3, and
Hadoop: Enables Apache Spark to read and write data from S3
before the SparkSession starts because the PyPI SDK pip install pyspark does not include these packages.
Without the necessary packages, the gateway fails immediately because Apache Spark cannot ind the required dependencies in the classpath during its Java process startup.
I’ll defined the spark_packages variable to list up the packages and added it to the configuration setting.
src/data_handling/spark.py
1import os
2from pyspark.sql import SparkSession
3
4
5def config_and_start_spark_session() -> SparkSession:
6 os.environ['SPARK_HOME'] = <DIRECTORY/TO/SPARK>
7 os.environ['JAVA_HOME'] = <DIREVTORY/TO/OPENJDK>
8 os.environ['PYSPARK_PYTHON'] = 'python'
9
10 # aws credentials
11 AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
12 AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
13 AWS_REGION = os.environ.get('AWS_REGION', 'us-east-1')
14
15 # define the necessary packages
16 spark_packages = ','.join([
17 'io.delta:delta-spark_2.13:4.0.0', # delta lake package
18 'org.apache.hadoop:hadoop-aws:3.4.0', # hadoop for Spark to use the s3a filesystem
19 'com.amazonaws:aws-java-sdk-bundle:1.12.262', # aws sdk for java
20 ])
21
22 # config spark session including the spark pakages and aws credentials
23 spark = SparkSession.builder.appName('Silver') \
24 # add necessary packages
25 .config('spark.jars.packages', spark_packages) \
26 # extends spark's sql capabilities to understand delta lake commands and syntax
27 .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
28 # changes spark's default catalog to enable spark to manage and read delta lake tables
29 .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
30 # increase memory
31 .config('spark.driver.memory', '8g') \
32 # add aws credentials for s3
33 .config('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID) \
34 .config('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY) \
35 # s3
36 .config('spark.hadoop.fs.s3a.endpoint', f's3.{AWS_REGION}.amazonaws.com') \
37 # credentials
38 .config('spark.hadoop.fs.s3a.aws.credentials.provider',
39 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
40 .getOrCreate()
41
42 return spark
43
▫ Tackling Version Conflicts
Version conflicts are a common cause of errors.
I’ll identify the corresponding versions from the official Marven repository:
Scala - Spark: Spark is compiled and released for specific versions of Scala. For instance, spark-4.0.0 is compiled for Scala 2.13 (Ref. Official Marven MVN repository).
Hadoop - Spark: Hadoop version must match with the version defined in the $SPARK_HOME env (Ref. MVN repository).
AWS SDK: The WS SDK version must match with JAVA version defined in the $JAVA_HOME env (Ref. MVN repository).
◼ Step 5. The Silver Layer
Silver layer transforms and loads the Bronze data into a Delta Lake table stored in S3.
First, I’ll define the process function, which processes the raw data in Bronze layer:
src/data_handling/lake_house/silver.py
1from pyspark.sql.functions import col, expr, to_date
2from pyspark.sql.types import StructType, StructField, DateType, FloatType, IntegerType
3
4def process(delta_table, spark):
5 # get all the date-like column names
6 date_columns = delta_table.columns
7
8 # build the expression for the stack function
9 stack_expr = f'stack({len(date_columns)}, '
10 for date_col in date_columns:
11 stack_expr += f'"{date_col}", `{date_col}`, '
12
13 stack_expr = stack_expr.strip(', ') + ')'
14
15 # use stack to unpivot the data from wide to tall format
16 _silver_df = delta_table.select(expr(stack_expr).alias('dt_string', 'values'))
17
18 # process the unpivoted df to cast types and rename columns
19 silver_df = _silver_df.select(
20 to_date(col('dt_string'), 'yyyy-MM-dd').alias('dt'),
21 col('values').getItem('1. open').cast('float').alias('open'),
22 col('values').getItem('2. high').cast('float').alias('high'),
23 col('values').getItem('3. low').cast('float').alias('low'),
24 col('values').getItem('4. close').cast('float').alias('close'),
25 col('values').getItem('5. volume').cast('integer').alias('volume')
26 ).where(col('dt').isNotNull())
27
28 # explicitly define schema
29 schema = StructType([
30 StructField('dt', DateType(), False), # explicitly set nullable = false
31 StructField('open', FloatType(), False),
32 StructField('high', FloatType(), False),
33 StructField('low', FloatType(), False),
34 StructField('close', FloatType(), False),
35 StructField('volume', IntegerType(), False)
36 ])
37
38 # finalize df
39 silver_df = spark.createDataFrame(silver_df.collect(), schema=schema)
40
41 main_logger.info(f'... transformed data schema in the silver layer:\n')
42 silver_df.printSchema()
43 return silver_df
44
The silver_df.printSchema() operation will return:
1root
2 |-- dt: date (nullable = false)
3 |-- open: float (nullable = false)
4 |-- high: float (nullable = false)
5 |-- low: float (nullable = false)
6 |-- close: float (nullable = false)
7 |-- volume: integer (nullable = false)
8
This indicates that Delta Lake together with Apache Spark successfully creates the data schema.
Then, I’ll define the load function that loads the processed Delta Table into the S3 bucket, using the s3a:// schema:
src/data_handling/lake_house/silver.py
1import os
2import shutil
3
4def load(df, ticker: str = 'NVDA') -> str:
5 # define the folder path
6 silver_local_path = os.path.join('data', 'silver', ticker)
7
8 # clean up previous run's data for a fresh start
9 if os.path.exists(silver_local_path):
10 main_logger.info(f'... cleaning up existing Silver layer data at {silver_local_path} ...')
11 shutil.rmtree(silver_local_path)
12
13 # write the df in the silver layer as a new delta table and store it in s3
14 S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', 'ml-stockprice-pred')
15 silver_s3_path = f's3a://{S3_BUCKET_NAME}/data/silver/{ticker}'
16 df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(silver_s3_path)
17 return silver_s3_path
18
◼ Step 6. The Gold Layer
Gold layer performs feature engineering on the data stored in Silver layer.
First, I’ll define the retrieve_delta_table function:
src/data_handling/delta.py
1def retrieve_delta_table(spark, s3_path):
2 # read delta table stored in s3
3 try:
4 delta_table = spark.read.format('delta').load(s3_path)
5 return delta_table
6
7 except:
8 spark.stop()
9 exit(1)
10
▫ Leveraging Delta Lake’s Transactional Capabilities
The primary benefit of Delta Lake is that it can automatically choose the latest version of the file by just pointing to the base directory.
The spark.read.format("delta").load(s3_path) line handles Spark and Delta Lake connectors work together by reading the latest transaction log file from the _delta_log directory.
The _delta_log directory stores all transaction history in a file like 00000000000000000012.json.
Based on the history, Delta Lake first identifies correct files, and then Spark reads only the necessary files identified by Delta Lake.
This way, Delta Lake provides an ACID-compliant layer on top of the raw data, ensuring data integrity and consistency, even with many concurrent write operations.
In the S3 bucket, Delta Lake automatically creates _delta_log directory:

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure C. Bronze layer in the S3 bucket
Next, I’ll define process function that computes the 30-day-moving average (ma) and a simple average of the prices:
src/data_handling/lake_house/gold.py
1from pyspark.sql.functions import col, avg, sum
2from pyspark.sql.window import Window
3from pyspark.sql.types import StructType, StructField, DateType, FloatType, IntegerType
4
5def process(delta_table):
6 # add averages
7 _gold_df = delta_table.groupBy('dt').agg(
8 avg(col('open')).alias('ave_open'),
9 avg(col('high')).alias('ave_high'),
10 avg(col('low')).alias('ave_low'),
11 avg(col('close')).alias('ave_close'),
12 sum(col('volume')).alias('total_volume')
13 )
14
15 # merge
16 gold_df = delta_table.join(_gold_df, on='dt', how='inner')
17
18 # add moving average
19 window_spec = Window.orderBy('dt').rowsBetween(-29, 0)
20 gold_df = gold_df.withColumn('30_day_ma_close', avg(col('ave_close')).over(window_spec))
21
22 # add year, month, date cols
23 gold_df = gold_df.withColumn('year', year(gold_df['dt']))
24 gold_df = gold_df.withColumn('month', month(gold_df['dt']))
25 gold_df = gold_df.withColumn('date', dayofmonth(gold_df['dt']))
26
27 # define schema
28 schema = StructType([
29 StructField('dt', DateType(), False), # explicitly set nullable = false
30 StructField('open', FloatType(), False),
31 StructField('high', FloatType(), False),
32 StructField('low', FloatType(), False),
33 StructField('close', FloatType(), False),
34 StructField('volume', IntegerType(), False),
35 StructField('ave_open', FloatType(), False),
36 StructField('ave_high', FloatType(), False),
37 StructField('ave_low', FloatType(), False),
38 StructField('ave_close', FloatType(), False),
39 StructField('total_volume', IntegerType(), False),
40 StructField('30_day_ma_close', FloatType(), False),
41 StructField('year', StringType(), False),
42 StructField('month', StringType(), False),
43 StructField('date', StringType(), False),
44 ])
45
46 # finalize df
47 gold_df = spark.createDataFrame(gold_df.collect(), schema=schema)
48
49 # sort by dt
50 gold_df = gold_df.orderBy(col('dt').asc())
51
52 return gold_df
53
The gold_df.printSchema() operation will return:
1root
2 |-- dt: date (nullable = true)
3 |-- open: float (nullable = true)
4 |-- high: float (nullable = true)
5 |-- low: float (nullable = true)
6 |-- close: float (nullable = true)
7 |-- volume: integer (nullable = true)
8 |-- ave_open: double (nullable = true)
9 |-- ave_high: double (nullable = true)
10 |-- ave_low: double (nullable = true)
11 |-- ave_close: double (nullable = true)
12 |-- total_volume: long (nullable = true)
13 |-- 30_day_ma_close: double (nullable = true)
14
Similar to Silver layer, I’ll define the load function that loads the processed Delta Table into the S3 bucket, using the s3a:// schema:
src/data_handling/lake_house/gold.py
1import os
2import shutil
3
4def load(df, ticker: str = 'NVDA') -> str:
5 # define the folder path
6 gold_local_path = os.path.join('data', 'gold', ticker)
7
8 # clean up the previous file if any
9 if os.path.exists(gold_local_path): shutil.rmtree(gold_local_path)
10
11 # store in s3
12 S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', 'ml-stockprice-pred')
13 gold_s3_path = f's3a://{S3_BUCKET_NAME}/data/gold/{ticker}'
14 df.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save(gold_s3_path)
15 return gold_s3_path
16
◼ Step 7. Test Run
The entire execution flows:
src/main.py
1import sys
2import data_handling
3
4def run_lakehouse(ticker:str ='NVDA'):
5 # extract
6 stock_price_data = data_handling.extract_daily_stock_data(ticker=ticker)
7
8 # bronze
9 bronze_s3_path = data_handling.bronze.load_to_s3(data=stock_price_data, ticker=ticker)
10
11 # start spark session
12 spark = data_handling.config_and_start_spark_session()
13
14 # silver
15 bronze_delta_table = spark.read.json(bronze_s3_path, multiLine=True)
16 silver_df = data_handling.silver.process(delta_table=bronze_delta_table)
17 silver_s3_path = data_handling.silver.load(df=silver_df, ticker=ticker)
18
19 # gold
20 silver_delta_table = data_handling.retrieve_delta_table(spark=spark, s3_path=silver_s3_path)
21 gold_df = data_handling.gold.process(delta_table=silver_delta_table)
22 gold_s3_path = data_handling.gold.load(df=gold_df, ticker=ticker)
23
24 # terminate spark session
25 if gold_s3_path: spark.stop()
26
27if __name__ == '__main__':
28 # fetch ticker
29 TICKER = sys.argv[1] if len(sys.argv) > 2 and sys.argv[1] else 'NVDA'
30
31 # run
32 run_lakehouse(ticker=TICKER)
33
Run the uv spark-submit command, adding the necessary JAR files using the --packages or --jars option:
1$uv run spark-submit --packages io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.0,com.amazonaws:aws-java-sdk-bundle:1.12.262 src/main.py {TICKER} --cache-clear
2
*Replace {TICKER} with a ticker of your choice.
Or, configure the packages and JARs by adding PYSPARK_SUBMIT_ARGS to the env file like .zshrc.
~/.zshrc
1export PYSPARK_SUBMIT_ARGS=--packages io.delta:delta-spark_2.13:4.0.0,org.apache.hadoop:hadoop-aws:3.4.0,software.amazon.awssdk:bundle:2.23.19
2
Now, the S3 bucket has three folders. The gold and silver folders contain Delta Lake tables, while the bronze folder contains raw JSON files.

Kernel Labs | Kuriko IWAI | kuriko-iwai.com
Figure D. Lakehouse on Amazon S3
◼ Step 8. Scheduling
After successful test run, I’ll create Apache Airflow DAGs to schedule PySpark tasks in sequence:
Daily task run_pyspark_task: Extracts and transforms the latest stock price data.
Weekly task: check_data_quality_task: Detect any data drift.
For the data drift detection, I’ll use the PyDeequ package to detect irregular data.
src/data_handling/dag/dag.py
1import datetime
2from airflow import DAG
3from airflow.operators.python import PythonOperator
4from pydeequ.checks import Check, CheckLevel
5from pydeequ.verification import VerificationSuite, VerificationResult
6
7from src.main import run_lakehouse
8from src.data_handling.spark import config_and_start_spark_session
9
10
11# define function to detect data drift
12def check_data_quality():
13 # initialize a sparksession to use deequ
14 spark = config_and_start_spark_session(session_name='deequ_quality_check')
15
16 # fetch delta table in the gold layer
17 gold_data_path = "s3a://ml-stockprice-pred/data/gold"
18 gold_delta_table = spark.read.format("delta").load(gold_data_path)
19
20 # use deequ to run quality check
21 check_result = (VerificationSuite(spark)
22 .onData(gold_delta_table)
23 .addCheck(
24 Check(CheckLevel.Warning, "Weekly Stock Data Check")
25 .hasSize(lambda s: s > 0) # check if the df is not empty
26 .hasCompleteness("close", lambda s: s == 1.0) # check for missing vals
27 .hasCompleteness("volume", lambda s: s == 1.0) # check for missing vals
28 .hasMin("open", lambda x: x > 0) # check if val is positive
29 .hasMax("volume", lambda x: x < 10000000000) # check for max val
30 )
31 .run()
32 )
33
34 # convert the deequ result to a spark df for analysis and logging
35 result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
36 result_df.show(truncate=False)
37
38 # stop the spark session
39 spark.stop()
40
41
42# define the airflow dag
43default_args = {
44 'owner': 'kuriko iwai',
45 'start_date': datetime.datetime.now(),
46 'retries': 1,
47}
48
49with DAG(
50 dag_id='pyspark_data_handling',
51 default_args=default_args,
52 description='A daily ETL job to process stock data into the gold layer.',
53 schedule='@daily',
54 catchup=False,
55 tags=['pyspark', 'elt', 'lakehouse'],
56) as dag:
57
58 # daily load of the latest stockprice
59 run_pyspark_task = PythonOperator(
60 task_id='elt_lakehouse',
61 python_callable=run_lakehouse,
62 )
63
64 # weekly data quality check for data drift
65 data_quality_check_task = PythonOperator(
66 task_id='data_quality_check',
67 python_callable=check_data_quality,
68 trigger_rule='all_success',
69 depends_on_past=True, # ensures it only runs after the previous successful run
70 schedule='@weekly', # weekly
71 )
72
73 # set task dependencies
74 run_pyspark_task >> data_quality_check_task
75
The with DAG(...) as dag: block defines an Airflow DAG with key parameters like dag_id, default_args, and schedule.
The schedule set to '@daily' tells Airflow to trigger the DAG once a day at midnight.
Then, an Airflow operator PythonOperator is used to execute the run_pyspark_job and data_quality_check_task functions.
The script is saved as a Python file in the dag folder, which enable Airflow to automatically discover and schedule the DAG.
◼ Step 9. CI/CD Integration
Lastly, I’ll add PyTest to the CI/CD pipeline to verify the DAG's setup, using a mock PySpark job.
tests/dag_test.py
1import pytest # type: ignore
2import datetime
3from unittest import mock
4from airflow.models.dag import DAG # type: ignore
5from airflow.operators.python import PythonOperator # type: ignore
6
7# mock func to called by the pythonoperator (avoid running actual pyspark task)
8def _run_pyspark_job():
9 main_logger.info('... [MOCK] this is a mock run of the PySpark job ...')
10
11# create and return dag instance
12def create_test_dag():
13 default_args = {
14 'owner': 'kuriko iwai',
15 'start_date': datetime.datetime(2025, 10, 9), # use a fixed date for reproducible tests
16 'retries': 1,
17 }
18 with DAG(
19 dag_id='pyspark_gold_layer_elt',
20 default_args=default_args,
21 description='mock',
22 schedule='@daily',
23 catchup=False,
24 tags=['pyspark', 'etl', 'gold-layer'],
25 ) as dag:
26
27 # define a task
28 run_pyspark_task = PythonOperator(
29 task_id='process_data_to_gold',
30 python_callable=_run_pyspark_job,
31 )
32 return dag
33
34
35def test_dag_properties():
36 # test the dag's basic properties are defined correctly
37 dag = create_test_dag()
38 assert dag is not None
39 assert dag.dag_id == 'pyspark_gold_layer_elt'
40 assert dag.owner == 'kuriko iwai'
41 assert dag.description == 'mock'
42 assert dag.schedule == '@daily'
43 assert 'pyspark' in dag.tags
44 assert 'etl' in dag.tags
45 assert 'gold-layer' in dag.tags
46
47
48def test_dag_contains_expected_task():
49 dag = create_test_dag()
50
51 # check if a task with the specific id exists in the dag
52 assert 'process_data_to_gold' in dag.task_ids
53
54 # get task instance from the dag and check if the task is correct type with correct properties, and correct callable
55 task = dag.get_task('process_data_to_gold')
56 assert isinstance(task, PythonOperator)
57 assert task.task_id == 'process_data_to_gold'
58 assert task.owner == 'kuriko iwai'
59 assert task.python_callable.__name__ == '_run_pyspark_job'
60
1$uv run pytest --cache-clear
2
This returns “ 3 passed in xx s“.
Then, I'll define a GitHub Action to automate the PyTest workflow on every pull request and push.
.github/workflows/run_tests.yml
1name: Run Tests
2
3on: [pull_request, push]
4
5permissions:
6 contents: write
7
8env:
9 # set up in the github repo -> Settings -> Secrets and variables -> Actions -> Repository secrets
10 AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
11 AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
12 AWS_REGION_NAME: ${{ secrets.AWS_REGION_NAME }}
13
14jobs:
15 run_test:
16 runs-on: ubuntu-latest
17 timeout-minutes: 15
18
19 steps:
20 - name: Checkout code
21 uses: actions/checkout@v4
22
23 - name: Set up Python
24 uses: actions/setup-python@v5
25 with:
26 python-version: "3.12"
27
28 - name: Install dependencies
29 run: |
30 pip install uv
31 uv sync
32
33 - name: Run tests with pytest
34 run: uv run pytest --cache-clear
35
Finally, I'll push the change and see if the workflow runs:
1$git init
2$git add .
3$git commit -m'init'
4$git branch -M main
5$git remote add origin <REPO>
6$git push origin main
7
That’s all for the workflow.
Conclusion
The Lakehouse architecture enables organizations to handle diverse data on a single platform, eliminating data silos and simplifying data management.
In our project, we built a Delta Lake Lakehouse where data warehouses are formed in Delta Lake tables, enabling versioning and transactional updates to support both batch and streaming workloads efficiently.
Moving forward, adding more data sources and complex transformation based on EDA can further polish the data pipeline.
Continue Your Learning
If you enjoyed this blog, these related entries will complete the picture:
Architecting Production ML: A Deep Dive into Deployment and Scalability
Data Pipeline Architecture: From Traditional DWH to Modern Lakehouse
Building a Production-Ready Data CI/CD Pipeline: Versioning, Drift Detection, and Orchestration
Related Books for Further Understanding
These books cover the wide range of theories and practices; from fundamentals to PhD level.

Linear Algebra Done Right

Foundations of Machine Learning, second edition (Adaptive Computation and Machine Learning series)

Designing Machine Learning Systems: An Iterative Process for Production-Ready Applications

Machine Learning Design Patterns: Solutions to Common Challenges in Data Preparation, Model Building, and MLOps
Share What You Learned
Kuriko IWAI, "Engineering a Fully-Automated Lakehouse: From Raw Data to Gold Tables" in Kernel Labs
https://kuriko-iwai.com/building-lakehouse-architectures
Looking for Solutions?
- Deploying ML Systems 👉 Book a briefing session
- Hiring an ML Engineer 👉 Drop an email
- Learn by Doing 👉 Enroll AI Engineering Masterclass
Written by Kuriko IWAI. All images, unless otherwise noted, are by the author. All experimentations on this blog utilize synthetic or licensed data.


