MODULE 33 Data Quality Ecosystem
1 / 8 sections
MODULE 33 — OVERVIEW

Data Quality Ecosystem

Bad data costs companies millions. Data Quality (DQ) is the practice of systematically detecting, measuring, and preventing data problems before they reach consumers. This module covers the full ecosystem — open-source tools, cloud-native solutions, and patterns every senior data engineer must know.

🧪
Great Expectations
The most popular open-source DQ framework. Define expectations, validate DataFrames, publish beautiful data docs.
⚖️
Deequ (AWS)
AWS open-source library built on Spark. Analyzers + Constraints for large-scale data quality at column level.
🥤
Soda
YAML-based checks with SodaCL. Simple to write, integrates with Spark, Snowflake, BigQuery, and Airflow.
🏔️
Monte Carlo
SaaS data observability. ML-powered anomaly detection for freshness, volume, distribution, and schema drift.
👁️
Data Observability
5 pillars: freshness, volume, distribution drift, schema drift, and lineage. Know when data is broken before users do.
🔧
Custom DQ Framework
Build a reusable rule engine, quality scoring, quarantine pattern, and alerting integration from scratch in PySpark.
MODULE 33 — WHAT YOU WILL LEARN 33.1 Great Expectations → Expectations, Data Context, Checkpoints, Data Docs, Spark integration 33.2 Deequ (AWS) → Analyzers, Constraints, Constraint verification, Metrics repository, Spark integration 33.3 Soda → Soda Core, SodaCL checks, Soda Cloud, Spark + Soda, writing checks 33.4 Monte Carlo → Data observability platform, integration patterns, anomaly detection 33.5 Data Observability → Freshness, Volume, Distribution Drift, Schema Drift (all 4 pillars in depth) 33.6 Custom DQ Framework → Rule engine, threshold validation, business rule registry, quality scoring, alerting
Why this matters: Senior/Lead DE interviews always ask: "How do you ensure data quality in production?" Knowing one tool thoroughly + understanding the observability pillars is what separates a mid-level from a senior engineer.
Tool choice in practice: Most companies use one of these, not all. GE is most common in open-source shops. Deequ is popular on AWS/EMR. Soda in Snowflake-centric orgs. Monte Carlo is enterprise SaaS. The patterns behind all of them are the same — learn those first.
33.1 — GREAT EXPECTATIONS

Great Expectations

Great Expectations (GE) is the most widely-used open-source data quality library. You define what good data looks like (expectations), then validate your DataFrames against those rules and publish human-readable reports called Data Docs.

🧪
Expectations
CORE CONCEPT
Concept
What is an Expectation?
An Expectation is a declarative assertion about your data — a testable statement of what your data should look like. Think of it as a unit test, but for data.

Examples: "Column user_id should never be null", "Column age should be between 0 and 120", "Column status should only contain ['active','inactive','pending']".
Expectation Anatomy: expect_column_values_to_not_be_null("user_id") ↑ ↑ ↑ method name column name optional params Returns: ExpectationValidationResult success: True / False result: { "element_count": 1000, "unexpected_count": 0 } meta: { "expectation_type": "...", "kwargs": {...} }
Most commonly used expectations:
ExpectationWhat it checks
expect_column_values_to_not_be_nullNo nulls in column
expect_column_values_to_be_uniqueAll values unique
expect_column_values_to_be_in_setValues in allowed list
expect_column_values_to_be_betweenNumeric range check
expect_column_values_to_match_regexRegex pattern check
expect_table_row_count_to_be_betweenRow count range
expect_column_to_existColumn presence check
expect_column_values_to_be_of_typeData type check
python — defining expectations
import great_expectations as ge
import pandas as pd

# Wrap a Pandas DataFrame in a GE DataFrame
df = ge.from_pandas(pd.DataFrame({
    "user_id": [1, 2, 3, None],
    "age": [25, 200, 30, 22],
    "status": ["active", "inactive", "unknown", "active"]
}))

# Expectation 1: user_id should never be null
result = df.expect_column_values_to_not_be_null("user_id")
print(result["success"])  # False — one null found

# Expectation 2: age must be between 0 and 120
result = df.expect_column_values_to_be_between("age", 0, 120)
print(result["success"])  # False — 200 is out of range

# Expectation 3: status must be in allowed set
result = df.expect_column_values_to_be_in_set(
    "status", ["active", "inactive", "pending"]
)
print(result["success"])  # False — "unknown" is not allowed

# Expectation 4: row count should be between 100 and 10000
result = df.expect_table_row_count_to_be_between(100, 10000)
print(result["success"])  # False — only 4 rows
Concept
Expectation Suites
An Expectation Suite is a named collection of expectations that belong together — like a test suite in pytest. You create one suite per dataset (e.g., orders.warning or users.critical). Suites are stored as JSON files.
python — expectation suite
import great_expectations as ge

# Initialize a Data Context (project configuration)
context = ge.get_context()

# Create a new expectation suite
suite = context.create_expectation_suite(
    expectation_suite_name="orders.critical",
    overwrite_existing=True
)

# Get a validator to add expectations interactively
batch_request = {
    "datasource_name": "my_spark_datasource",
    "data_connector_name": "default_inferred_data_connector_name",
    "data_asset_name": "orders",
}
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders.critical"
)

# Add expectations to the suite
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("amount", 0, 100000)
validator.expect_column_values_to_be_in_set(
    "status", ["placed", "shipped", "delivered", "cancelled"]
)
validator.expect_table_row_count_to_be_between(1, 10_000_000)

# Save the suite
validator.save_expectation_suite(discard_failed_expectations=False)
print("Suite saved! All expectations stored to JSON.")
📁
Data Context
CONFIGURATION
Concept
What is a Data Context?
The Data Context is the entry point to GE. It is like a project configuration object that knows about your data sources, expectation suites, checkpoints, and where to store validation results. It reads from a great_expectations.yml config file.
great_expectations/ ├── great_expectations.yml ← main config (datasources, stores, plugins) ├── expectations/ │ └── orders.critical.json ← expectation suite (your rules) ├── checkpoints/ │ └── orders_checkpoint.yml ← when/how to run validations └── uncommitted/ └── validations/ ← validation run results (not committed to git) └── orders.critical/ └── 2024-01-15/result.json
python — data context
import great_expectations as ge

# Get the Data Context (reads great_expectations.yml)
context = ge.get_context()

# List available expectation suites
print(context.list_expectation_suite_names())
# Output: ['orders.critical', 'users.warning']

# List datasources configured
print(context.list_datasources())

# Get a specific suite
suite = context.get_expectation_suite("orders.critical")
print(len(suite.expectations), "expectations in suite")

# Ephemeral context (no filesystem, good for notebooks/testing)
context = ge.get_context(mode="ephemeral")
🚦
Checkpoints
PIPELINE GATE
Concept
What is a Checkpoint?
A Checkpoint is the mechanism that ties data batches to expectation suites and runs them together. Think of a checkpoint as a "validation job" — it says: "Validate THIS batch of data against THESE expectations and take THESE actions on failure."

Checkpoints are how you integrate GE into an Airflow DAG or Spark pipeline.
New data arrives
Checkpoint triggered
Data validated against Suite
Pass → continue pipeline
/
Fail → alert + stop
python — checkpoint
import great_expectations as ge

context = ge.get_context()

# Add a SimpleCheckpoint — simplest form
checkpoint = context.add_or_update_checkpoint(
    name="orders_daily_checkpoint",
    validations=[
        {
            "batch_request": {
                "datasource_name": "my_spark_datasource",
                "data_connector_name": "default_inferred_data_connector_name",
                "data_asset_name": "orders",
            },
            "expectation_suite_name": "orders.critical",
        }
    ],
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
        {
            "name": "send_slack_notification",  # on failure
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "${SLACK_WEBHOOK}",
                "notify_on": "failure",
            },
        },
    ],
)

# Run the checkpoint
result = checkpoint.run()

# Check if validation passed
if not result["success"]:
    raise Exception("Data quality validation failed! Stopping pipeline.")
else:
    print("✅ All expectations passed. Continuing pipeline.")
📄
Data Docs
REPORTING
Concept
What are Data Docs?
Data Docs are auto-generated HTML reports that show your expectations, validation results, and data statistics in a human-readable format. They are designed so that non-engineers (analysts, data owners) can understand the state of data quality without reading code.
python — data docs
# Data Docs are automatically built after checkpoint.run()
# You can also build them manually:
context.build_data_docs()

# Open in browser (local)
context.open_data_docs()

# Publish to S3 for team access
# Configure in great_expectations.yml:
# data_docs_sites:
#   s3_site:
#     class_name: SiteBuilder
#     store_backend:
#       class_name: TupleS3StoreBackend
#       bucket: my-data-docs-bucket
#       prefix: data_docs/

# What Data Docs show:
#  ✅ Each expectation and whether it passed/failed
#  📊 Column statistics (mean, min, max, null %, unique %)
#  📋 Sample unexpected values (rows that failed)
#  📅 Historical validation results (trend over time)
Spark Integration
PYSPARK
Implementation
Validating Spark DataFrames with GE
GE integrates with PySpark via a SparkDFDataset wrapper. You can validate a Spark DataFrame directly — GE will push down the computations as Spark jobs, so it scales to large datasets without collecting to the driver.
python — great expectations + pyspark
from pyspark.sql import SparkSession
import great_expectations as ge
from great_expectations.dataset import SparkDFDataset

spark = SparkSession.builder.appName("GE_Demo").getOrCreate()

# Load a Spark DataFrame
df = spark.read.parquet("s3://my-bucket/orders/2024/01/15/")

# Wrap in GE SparkDFDataset
ge_df = SparkDFDataset(df)

# Run expectations — they execute as Spark jobs!
results = []
results.append(ge_df.expect_column_values_to_not_be_null("order_id"))
results.append(ge_df.expect_column_values_to_not_be_null("customer_id"))
results.append(ge_df.expect_column_values_to_be_between("amount", 0, 100000))
results.append(ge_df.expect_column_values_to_match_regex(
    "email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
))
results.append(ge_df.expect_table_row_count_to_be_between(1000, 10_000_000))

# Check all results
failures = [r for r in results if not r["success"]]

if failures:
    for f in failures:
        print(f"❌ FAILED: {f['expectation_config']['expectation_type']}")
        print(f"   Unexpected: {f['result'].get('unexpected_count', 'N/A')}")
    raise Exception(f"{len(failures)} DQ checks failed!")
else:
    print("✅ All DQ checks passed!")
Production tip: Always run GE checks before writing to your Silver/Gold layer. If checks fail, write rejected records to a quarantine path and alert your team.
33.2 — DEEQU (AWS)

Deequ (AWS)

Deequ is an open-source library by AWS, built on top of Apache Spark. It is designed for large-scale data quality validation on massive datasets — think billions of rows on EMR. It uses a two-phase model: first compute metrics, then verify constraints.

🔬
Analyzers
METRICS
Concept
What are Analyzers?
Analyzers are Deequ's metric computers. They scan your DataFrame and compute statistics: completeness, distinctness, mean, standard deviation, min, max, etc. Analyzers are run efficiently — Deequ combines them into a single Spark pass over the data to avoid redundant scans.
python (pydeequ) — analyzers
# pip install pydeequ
from pyspark.sql import SparkSession
from pydeequ.analyzers import (
    AnalysisRunner, AnalyzerContext,
    Completeness, Distinctness, Mean,
    StandardDeviation, Minimum, Maximum,
    Size, Compliance, Uniqueness
)

spark = SparkSession.builder \
    .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.3-spark-3.3") \
    .getOrCreate()

df = spark.read.parquet("s3://my-bucket/orders/")

# Run multiple analyzers in a single Spark scan
analysis_result = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("order_id")) \
    .addAnalyzer(Completeness("customer_id")) \
    .addAnalyzer(Distinctness("order_id")) \
    .addAnalyzer(Uniqueness(["order_id"])) \
    .addAnalyzer(Mean("amount")) \
    .addAnalyzer(StandardDeviation("amount")) \
    .addAnalyzer(Minimum("amount")) \
    .addAnalyzer(Maximum("amount")) \
    .addAnalyzer(Compliance("valid_status", "status IN ('placed','shipped','delivered')")) \
    .run()

# Convert results to a DataFrame
metrics_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)
metrics_df.show(truncate=False)

# Output looks like:
# | entity  | instance     | name         | value  |
# | Dataset | *            | Size         | 125000 |
# | Column  | order_id     | Completeness | 1.0    |
# | Column  | amount       | Mean         | 87.42  |
# | Column  | valid_status | Compliance   | 0.987  |
📏
Constraints & Constraint Verification
VALIDATION
Concept
What are Constraints?
Constraints are rules you assert on top of the metrics computed by analyzers. You define them with VerificationSuite. Constraints say things like: "Completeness of order_id must equal 1.0" or "Mean of amount must be between 50 and 200".
python (pydeequ) — constraint verification
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

# Define a Check (named group of constraints)
check = Check(spark, CheckLevel.Error, "Orders DQ Check")

check_result = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check
        .hasSize(lambda x: x > 0)                           # table not empty
        .isComplete("order_id")                              # no nulls
        .isComplete("customer_id")
        .isUnique("order_id")                               # primary key
        .isContainedIn("status", ["placed", "shipped", "delivered", "cancelled"])
        .isNonNegative("amount")                            # no negative amounts
        .satisfies("amount < 100000", "Amount below cap")  # business rule
        .hasCompleteness("email", lambda c: c >= 0.95)      # 95% not null
        .hasDistinctness(["order_id"], lambda d: d == 1.0) # all distinct
    ) \
    .run()

# Get results as DataFrame
result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
result_df.show(truncate=False)

# Fail pipeline if any constraint violated
if check_result.status != "Success":
    failures = result_df.filter(result_df.constraint_status != "Success")
    failures.show(truncate=False)
    raise Exception("Deequ constraints failed! Halting pipeline.")
🗃️
Metrics Repository
HISTORY
Concept
Storing DQ Metrics Over Time
The Metrics Repository lets Deequ save computed metrics to a persistent store (S3, in-memory) so you can track quality trends over time. You can compare today's completeness score to yesterday's and alert if it drops.
python (pydeequ) — metrics repository
from pydeequ.repository import FileSystemMetricsRepository, ResultKey
import time

# Create a file-system backed metrics repository
metrics_path = "s3://my-bucket/dq-metrics/orders/"
repository = FileSystemMetricsRepository(spark, metrics_path)

# Tag this run with a timestamp (used to query historical metrics)
result_key = ResultKey(spark, int(time.time() * 1000), {
    "pipeline": "orders_daily",
    "environment": "production"
})

# Run analysis and save to repository
AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Completeness("order_id")) \
    .addAnalyzer(Mean("amount")) \
    .useRepository(repository) \
    .saveOrAppendResult(result_key) \
    .run()

# Load historical metrics
previous_metrics = repository.load() \
    .forAnalyzers([Completeness("order_id")]) \
    .getSuccessMetricsAsDataFrame()

previous_metrics.show()
# Shows completeness score for every day you've run this pipeline
33.3 — SODA

Soda

Soda makes data quality checks as simple as writing YAML. With SodaCL (Soda Check Language), non-engineers can read and write quality checks. Soda Core is the open-source library; Soda Cloud is the SaaS dashboard for teams.

🥤
Soda Core
OPEN SOURCE
Concept
What is Soda Core?
Soda Core is the open-source Python library that runs SodaCL checks against your data. It connects to many data sources: Spark, Snowflake, BigQuery, Redshift, PostgreSQL, and more. You write checks in YAML, and Soda translates them to SQL or Spark queries.
bash — install soda
# Install for Spark
pip install soda-core-spark

# Install for Snowflake
pip install soda-core-snowflake

# Install for BigQuery
pip install soda-core-bigquery
📝
SodaCL — Soda Check Language
YAML DSL
Concept
Writing Checks in SodaCL
SodaCL is a YAML-based domain-specific language for data quality. It is intentionally simple — you write checks in plain English-like syntax. Each check maps to a metric that Soda computes against your data.
yaml — checks.yml (sodacl)
# checks.yml — define all your DQ rules here

checks for orders:                  # checks for table/dataset named "orders"

  # Row count must be positive
  - row_count > 0

  # order_id must never be null
  - missing_count(order_id) = 0:
      name: Order ID must not be null

  # customer_id must not be null
  - missing_count(customer_id) = 0

  # order_id must be unique (primary key)
  - duplicate_count(order_id) = 0

  # amount must be between 0 and 100000
  - min(amount) >= 0
  - max(amount) <= 100000
  - avg(amount) between 10 and 10000

  # Status must only have valid values
  - invalid_count(status) = 0:
      valid values: [placed, shipped, delivered, cancelled]

  # Email format validation (regex)
  - invalid_count(email) = 0:
      valid format: email

  # Freshness check — data must be at most 1 day old
  - freshness(order_date) < 1d:
      name: Orders must arrive within 1 day

  # Schema check — these columns must exist
  - schema:
      name: Expected columns present
      fail:
        when required column missing: [order_id, customer_id, amount, status]
Implementation
Running Soda Checks in Python
python — running soda with spark
from pyspark.sql import SparkSession
from soda.scan import Scan

spark = SparkSession.builder.appName("SodaDemo").getOrCreate()

# Load DataFrame
df = spark.read.parquet("s3://my-bucket/orders/")
df.createOrReplaceTempView("orders")  # Soda queries via SQL on the view

# Create a Soda Scan
scan = Scan()
scan.set_scan_definition_name("orders_daily_check")
scan.set_data_source_name("spark_datasource")

# Add checks from file
scan.add_sodacl_yaml_file("checks.yml")

# Add the Spark session
scan.add_spark_session(spark)

# Run the scan
scan.execute()

# Check results
print(scan.get_logs_text())

if scan.has_check_fails():
    print("❌ Soda checks FAILED:")
    print(scan.get_checks_fail())
    raise Exception("Data quality checks failed!")
else:
    print("✅ All Soda checks passed!")
☁️
Soda Cloud
SAAS DASHBOARD
Concept
What is Soda Cloud?
Soda Cloud is the SaaS platform that stores your check results, shows historical trends, and enables collaboration. You push scan results from Soda Core to Soda Cloud using an API key.
yaml — soda cloud config
# ~/.soda/configuration.yml
soda_cloud:
  host: cloud.soda.io
  api_key_id: ${SODA_API_KEY_ID}
  api_key_secret: ${SODA_API_KEY_SECRET}
Once connected, Soda Cloud gives you: check result history, anomaly detection on metrics, team notifications, incident management, and a no-code check builder for analysts.
33.4 — MONTE CARLO

Monte Carlo

Monte Carlo is an enterprise data observability platform. Unlike GE/Deequ/Soda where you write rules manually, Monte Carlo uses ML to automatically learn what "normal" looks like for your data and alerts you when anomalies occur — without you having to write explicit thresholds.

🏔️
Data Observability Platform
ML-POWERED
Concept
How Monte Carlo Works
Monte Carlo connects to your data warehouse or lake (Snowflake, BigQuery, Redshift, Databricks, S3) and continuously monitors your tables for the 5 pillars of data observability: freshness, volume, distribution, schema, and lineage.

It learns baselines automatically (e.g., "orders table usually has 50k–100k rows per day") and alerts you when something deviates — before downstream consumers notice.
Monte Carlo Architecture: Your Data Sources Monte Carlo SaaS ┌─────────────────┐ ┌──────────────────────────────┐ │ Snowflake │──metadata──► │ ML Anomaly Detection Engine │ │ BigQuery │──metadata──► │ (learns normal patterns) │ │ Redshift │──metadata──► │ │ │ Databricks │──metadata──► │ Monitors: │ │ S3 + Glue │──metadata──► │ ✓ Freshness │ └─────────────────┘ │ ✓ Volume │ │ ✓ Distribution │ Note: Monte Carlo reads │ ✓ Schema changes │ only metadata & stats, │ ✓ Field health │ NOT your actual data └──────────────┬───────────────┘ │ Alerts → Slack / PagerDuty Lineage graph → Data Portal
Integration
Integration Patterns
Monte Carlo integrates without code changes to your pipelines. You connect it to your warehouse, and it starts monitoring automatically. For custom events, you can use the Monte Carlo API.
python — monte carlo api (custom events)
from pycarlo.core import Client, Session

# pip install pycarlo
session = Session(mcd_id="YOUR_MCD_ID", mcd_token="YOUR_MCD_TOKEN")
client = Client(session=session)

# Send a custom event to Monte Carlo
# (e.g., notify MC when your pipeline finishes)
client.create_or_update_lineage_node(
    node_name="orders",
    object_type="table",
    resource_name="snowflake"
)

# Report a custom metric to MC
client.report_dataset_status(
    status="success",
    dataset_name="orders",
    row_count=125000
)
Concept
Anomaly Detection
Monte Carlo uses dynamic thresholds trained on your historical data. It detects:
  • Volume anomaly: Table had 80k rows yesterday, today only 200 — silent pipeline failure?
  • Freshness anomaly: Orders table usually updates every hour, but it's been 6 hours since the last update.
  • Distribution anomaly: Revenue column suddenly has negative values or extreme outliers.
  • Schema change: Column user_email was silently dropped from a table.
  • Field health: Null rate for shipping_address jumped from 2% to 45%.
GE vs Monte Carlo: GE requires you to manually write rules (you define thresholds). Monte Carlo learns thresholds automatically. Use GE/Soda for explicit business rules; use Monte Carlo for anomaly detection and broader observability coverage.
33.5 — DATA OBSERVABILITY

Data Observability

Data Observability is the ability to understand the health of your data at any point in time. It has 4 pillars — freshness, volume, distribution drift, and schema drift. Implementing these lets you know when something is broken before your users do.

🕐
Freshness
PILLAR 1
Concept
Table Freshness Checks
Freshness measures how recently your data was updated. If a pipeline is supposed to run every hour but hasn't updated a table in 5 hours, you have a staleness problem. Users making decisions on stale data is often worse than having no data at all.
Freshness Check Logic: max_timestamp = max(updated_at) from orders current_time = now() staleness = current_time - max_timestamp IF staleness > threshold: → ALERT: "orders table is N hours stale!"
python — freshness check in pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("Freshness").getOrCreate()
df = spark.read.parquet("s3://my-bucket/orders/")

# ── Freshness check ──────────────────────────────────
max_ts = df.select(F.max("updated_at")).collect()[0][0]
now = datetime.utcnow()
staleness_hours = (now - max_ts).total_seconds() / 3600

threshold_hours = 2   # alert if data is older than 2 hours

if staleness_hours > threshold_hours:
    msg = (
        f"⚠️ FRESHNESS ALERT: orders table is "
        f"{staleness_hours:.1f}h stale! "
        f"Max updated_at = {max_ts}"
    )
    print(msg)
    # send_slack_alert(msg)  or sns.publish(...)
    raise Exception(msg)
else:
    print(f"✅ Freshness OK — data is {staleness_hours:.1f}h old")

# ── Last updated timestamp monitoring ────────────────
# Store the max_ts in a control table for SLA tracking
freshness_record = spark.createDataFrame([{
    "table_name": "orders",
    "max_updated_at": max_ts,
    "staleness_hours": staleness_hours,
    "check_time": now,
    "status": "OK" if staleness_hours <= threshold_hours else "STALE"
}])
freshness_record.write.mode("append").saveAsTable("dq.freshness_log")
📊
Volume
PILLAR 2
Concept
Row Count Monitoring
Volume monitoring tracks the number of rows added to a table per pipeline run. Sudden drops (silent failures, upstream truncations) or unexpected spikes (data duplication) are both problems.
python — volume monitoring
from pyspark.sql import functions as F
import boto3, json
from datetime import date

df = spark.read.parquet("s3://my-bucket/orders/date=2024-01-15/")
today_count = df.count()

# ── Compare against historical baseline ──────────────
# Load yesterday's row count from DynamoDB control table
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("pipeline_volume_history")

yesterday = (date.today() - timedelta(days=1)).isoformat()
response = table.get_item(Key={"table_name": "orders", "date": yesterday})
yesterday_count = response.get("Item", {}).get("row_count", today_count)

# ── Anomaly detection with z-score logic ─────────────
change_pct = abs(today_count - yesterday_count) / max(yesterday_count, 1) * 100
threshold_pct = 30  # alert if count changes by more than 30%

if change_pct > threshold_pct:
    msg = (
        f"⚠️ VOLUME ANOMALY: orders row count changed by {change_pct:.1f}%! "
        f"Today: {today_count:,} | Yesterday: {yesterday_count:,}"
    )
    print(msg)
    # Publish CloudWatch metric for alerting
    cloudwatch = boto3.client("cloudwatch")
    cloudwatch.put_metric_data(
        Namespace="DataQuality",
        MetricData=[{
            "MetricName": "VolumeChangePercent",
            "Dimensions": [{"Name": "TableName", "Value": "orders"}],
            "Value": change_pct,
            "Unit": "Percent"
        }]
    )

# Save today's count to history
table.put_item(Item={
    "table_name": "orders",
    "date": date.today().isoformat(),
    "row_count": today_count
})
print(f"✅ Volume check done: {today_count:,} rows")
📈
Distribution Drift
PILLAR 3
Concept
Statistical Distribution Checks
Distribution drift happens when the statistical properties of a column change over time — even if the row count and schema are fine. Example: revenue column's mean drops from $87 to $3 because a bug is setting prices to zero for some products.
python — distribution drift detection
from pyspark.sql import functions as F

# ── Compute today's distribution stats ───────────────
stats = df.select(
    F.count("amount").alias("count"),
    F.mean("amount").alias("mean"),
    F.stddev("amount").alias("stddev"),
    F.min("amount").alias("min"),
    F.max("amount").alias("max"),
    F.percentile_approx("amount", 0.5).alias("median"),
    F.percentile_approx("amount", 0.95).alias("p95"),
    F.sum(F.when(F.col("amount").isNull(), 1).otherwise(0)).alias("null_count")
).collect()[0]

today_mean = stats["mean"]
today_std  = stats["stddev"]

# ── Z-score monitoring against baseline ──────────────
baseline_mean = 87.42  # from historical average
baseline_std  = 12.3

z_score = abs(today_mean - baseline_mean) / max(baseline_std, 0.001)

if z_score > 3:  # more than 3 standard deviations = anomaly
    print(f"⚠️ DISTRIBUTION DRIFT: amount mean = {today_mean:.2f} "
          f"(baseline: {baseline_mean:.2f}, z-score: {z_score:.1f})")

# ── Histogram comparison ─────────────────────────────
# Today's value distribution by bucket
buckets = df.select(
    F.when(F.col("amount") < 50, "0-50")
    .when(F.col("amount") < 200, "50-200")
    .when(F.col("amount") < 1000, "200-1000")
    .otherwise("1000+").alias("bucket")
).groupBy("bucket").count()
buckets.show()
🏗️
Schema Drift
PILLAR 4
Concept
Schema Change Detection
Schema drift is when the structure of your data changes unexpectedly — a column is dropped, renamed, or its type changes. This silently breaks downstream pipelines. Detecting schema drift early is critical.
python — schema drift detection
from pyspark.sql.types import StructType
import json

# ── Expected schema (stored from last known-good run) ─
expected_schema_json = """
{
  "fields": [
    {"name": "order_id",     "type": "string",    "nullable": false},
    {"name": "customer_id",  "type": "integer",   "nullable": false},
    {"name": "amount",       "type": "double",    "nullable": true},
    {"name": "status",       "type": "string",    "nullable": true},
    {"name": "order_date",   "type": "timestamp", "nullable": true}
  ]
}
"""
expected_fields = {f["name"]: f["type"] for f in json.loads(expected_schema_json)["fields"]}

# ── Actual schema from today's data ──────────────────
actual_fields = {f.name: f.dataType.simpleString() for f in df.schema.fields}

# ── Column addition/removal alerts ───────────────────
added_cols   = set(actual_fields.keys()) - set(expected_fields.keys())
removed_cols = set(expected_fields.keys()) - set(actual_fields.keys())

if added_cols:
    print(f"ℹ️  NEW columns detected: {added_cols}")

if removed_cols:
    print(f"❌ REMOVED columns: {removed_cols}")
    raise Exception(f"Schema drift! Columns removed: {removed_cols}")

# ── Data type change monitoring ───────────────────────
type_changes = []
for col in expected_fields:
    if col in actual_fields and actual_fields[col] != expected_fields[col]:
        type_changes.append(f"{col}: {expected_fields[col]} → {actual_fields[col]}")

if type_changes:
    print(f"⚠️  DATA TYPE CHANGES: {type_changes}")

if not removed_cols and not type_changes:
    print("✅ Schema drift check passed — no breaking changes")
33.6 — CUSTOM DATA QUALITY FRAMEWORK

Custom DQ Framework

In many production environments you'll build your own lightweight DQ framework on top of PySpark — especially when you need tight integration with your pipeline's audit tables, alerting system, and quarantine logic. This section covers building one from scratch.

⚙️
Rule Engine Design
ARCHITECTURE
Design
Building a Rule Engine
A rule engine is a system where DQ rules are defined as data (not code). Rules are stored in a registry (table or YAML), and a single generic runner applies all rules to any DataFrame. This makes it metadata-driven and maintainable.
python — rule engine design
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class Severity(Enum):
    WARNING = "WARNING"
    ERROR   = "ERROR"   # fail the pipeline

@dataclass
class DQRule:
    """A single data quality rule."""
    rule_id:     str
    description: str
    check_expr:  str           # SQL expression that returns True for VALID rows
    column:      Optional[str] # None means table-level rule
    severity:    Severity = Severity.ERROR
    threshold:   float = 1.0  # fraction of rows that must pass (1.0 = all)

class DQRuleEngine:
    """Generic rule engine that applies DQ rules to any DataFrame."""

    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.results = []

    def run(self, df: DataFrame, rules: List[DQRule], table_name: str) -> dict:
        df.createOrReplaceTempView("__dq_input__")
        total_rows = df.count()

        for rule in rules:
            # Count rows that PASS the rule
            pass_count = self.spark.sql(
                f"SELECT COUNT(*) FROM __dq_input__ WHERE {rule.check_expr}"
            ).collect()[0][0]

            pass_rate = pass_count / max(total_rows, 1)
            passed = pass_rate >= rule.threshold

            self.results.append({
                "table_name": table_name,
                "rule_id": rule.rule_id,
                "description": rule.description,
                "severity": rule.severity.value,
                "total_rows": total_rows,
                "pass_count": pass_count,
                "fail_count": total_rows - pass_count,
                "pass_rate": round(pass_rate, 4),
                "threshold": rule.threshold,
                "passed": passed,
            })

        return self.results

    def get_results_df(self) -> DataFrame:
        return self.spark.createDataFrame(self.results)

    def has_errors(self) -> bool:
        return any(
            not r["passed"] and r["severity"] == "ERROR"
            for r in self.results
        )
📋
Business Rule Registry & Threshold Validation
RULES
Implementation
Defining the Rule Registry
The business rule registry is where all DQ rules are defined. In a metadata-driven system, this lives in a database table. Here we show it as a Python list for clarity, but in production you'd load it from a DynamoDB or Delta table.
python — business rule registry + threshold validation
# ── Rule Registry ─────────────────────────────────────
# In production, load this from a Delta/DynamoDB table
orders_rules = [
    DQRule(
        rule_id="ORD-001",
        description="order_id must not be null",
        check_expr="order_id IS NOT NULL",
        column="order_id",
        severity=Severity.ERROR,
        threshold=1.0   # ALL rows must pass
    ),
    DQRule(
        rule_id="ORD-002",
        description="amount must be non-negative",
        check_expr="amount >= 0",
        column="amount",
        severity=Severity.ERROR,
        threshold=1.0
    ),
    DQRule(
        rule_id="ORD-003",
        description="status must be valid",
        check_expr="status IN ('placed','shipped','delivered','cancelled')",
        column="status",
        severity=Severity.ERROR,
        threshold=0.999   # allow 0.1% bad rows (noise in source)
    ),
    DQRule(
        rule_id="ORD-004",
        description="email format valid (warning only)",
        check_expr="email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'",
        column="email",
        severity=Severity.WARNING,
        threshold=0.95   # 95% of emails must be valid format
    ),
    DQRule(
        rule_id="ORD-005",
        description="extreme amounts flagged",
        check_expr="amount < 100000",
        column="amount",
        severity=Severity.WARNING,
        threshold=0.999
    ),
]

# ── Run the engine ────────────────────────────────────
engine = DQRuleEngine(spark)
engine.run(df, orders_rules, table_name="orders")

results_df = engine.get_results_df()
results_df.show(truncate=False)

if engine.has_errors():
    raise Exception("ERROR-severity DQ rules failed. Pipeline halted.")
🏆
Quality Scoring
SCORING
Implementation
Computing a DQ Score
A quality score is a single number (0–100) summarising the overall health of a dataset. It's useful for dashboards, trend tracking, and SLA reporting. You compute it as a weighted average of individual rule pass rates.
python — quality scoring
def compute_dq_score(results: list) -> float:
    """
    Compute a weighted quality score 0-100.
    ERROR rules have weight 2, WARNING rules weight 1.
    """
    total_weight = 0
    weighted_score = 0

    for r in results:
        weight = 2 if r["severity"] == "ERROR" else 1
        weighted_score += r["pass_rate"] * weight
        total_weight += weight

    score = (weighted_score / max(total_weight, 1)) * 100
    return round(score, 2)

# Compute score
dq_score = compute_dq_score(engine.results)
print(f"📊 Data Quality Score: {dq_score:.1f} / 100")

# Classify score
if dq_score >= 98:
    grade = "🟢 EXCELLENT"
elif dq_score >= 90:
    grade = "🟡 GOOD"
elif dq_score >= 70:
    grade = "🟠 FAIR"
else:
    grade = "🔴 POOR"

print(f"Grade: {grade}")

# Save score to audit table
audit_record = spark.createDataFrame([{
    "run_id": "run_20240115_001",
    "table_name": "orders",
    "dq_score": dq_score,
    "total_rules": len(engine.results),
    "failed_rules": sum(1 for r in engine.results if not r["passed"]),
    "run_timestamp": datetime.utcnow().isoformat()
}])
audit_record.write.mode("append").saveAsTable("dq.audit_log")
🚨
Alerting Integration
OPERATIONS
Implementation
Wiring DQ to Alerts
Your DQ framework is only as good as its alerting. When rules fail, engineers need to know immediately. Here we show a complete pattern: CloudWatch metric → Alarm → SNS → Slack.
python — dq alerting (sns + cloudwatch)
import boto3, json
from datetime import datetime

def publish_dq_metrics(results: list, dq_score: float, table_name: str):
    """Publish DQ metrics to CloudWatch for alarming."""
    cw = boto3.client("cloudwatch")
    sns = boto3.client("sns")

    # Publish overall score metric
    cw.put_metric_data(
        Namespace="DataQuality",
        MetricData=[
            {
                "MetricName": "DQScore",
                "Dimensions": [{"Name": "TableName", "Value": table_name}],
                "Value": dq_score,
                "Unit": "None",
            },
            {
                "MetricName": "FailedRules",
                "Dimensions": [{"Name": "TableName", "Value": table_name}],
                "Value": sum(1 for r in results if not r["passed"]),
                "Unit": "Count",
            }
        ]
    )

    # If errors found → publish SNS alert with details
    error_rules = [r for r in results if not r["passed"] and r["severity"] == "ERROR"]
    if error_rules:
        message = {
            "alert_type": "DQ_FAILURE",
            "table_name": table_name,
            "dq_score": dq_score,
            "timestamp": datetime.utcnow().isoformat(),
            "failed_rules": [
                {
                    "rule_id": r["rule_id"],
                    "description": r["description"],
                    "fail_count": r["fail_count"],
                    "pass_rate": r["pass_rate"]
                }
                for r in error_rules
            ]
        }
        sns.publish(
            TopicArn="arn:aws:sns:us-east-1:123456789:dq-alerts",
            Subject=f"🚨 DQ FAILURE — {table_name} score: {dq_score:.1f}",
            Message=json.dumps(message, indent=2)
        )
        print(f"🚨 Alert published to SNS for {len(error_rules)} failed rules")

# ── Full pipeline pattern ─────────────────────────────
engine = DQRuleEngine(spark)
engine.run(df, orders_rules, "orders")
score = compute_dq_score(engine.results)
publish_dq_metrics(engine.results, score, "orders")

if engine.has_errors():
    raise Exception("Pipeline stopped due to DQ errors.")

# Only write to Silver if DQ passed
df.write.mode("append").saveAsTable("silver.orders")
print("✅ Data written to Silver layer successfully.")
🔄
Complete End-to-End DQ Pipeline Pattern
PRODUCTION
Pattern
Where DQ fits in a Medallion Architecture
MEDALLION + DQ GATES Raw Source Data │ ▼ ┌─────────────────────────────────────────────┐ │ BRONZE LAYER │ │ - Ingest raw data as-is (no transforms) │ │ - DQ Gate 1: Schema drift check │ │ freshness check │ │ row count check (volume) │ └──────────────────────┬──────────────────────┘ │ pass ▼ ┌─────────────────────────────────────────────┐ │ SILVER LAYER │ │ - Clean, deduplicate, standardise │ │ - DQ Gate 2: Business rule validation │ │ null checks on keys │ │ range checks on amounts │ │ referential integrity │ │ distribution drift │ └──────────────────────┬──────────────────────┘ │ pass ▼ ┌─────────────────────────────────────────────┐ │ GOLD LAYER │ │ - Aggregated, dimensional models │ │ - DQ Gate 3: Reconciliation checks │ │ Silver count == Gold count │ │ sum(amounts) reconciliation │ └──────────────────────┬──────────────────────┘ │ pass ▼ Dashboards / ML / Reports ───────────────────────────────────────────── QUARANTINE: on DQ failure at any gate Bad records → s3://bad-records/<table>/<date>/ Failure event → SNS → Slack / PagerDuty DQ score → CloudWatch → Dashboard
Key interview insight: Always say DQ gates live between layers, not just at the end. Catching bad data at Bronze (before any transforms) is much cheaper than catching it after 3 hours of Gold processing.
MODULE 33 — QUIZ & SUMMARY

Quiz & Summary

Test your understanding of the Data Quality Ecosystem. These are the kinds of questions that come up in senior data engineering interviews.

Q1. In Great Expectations, what is a "Checkpoint"?
✅ Correct! A Checkpoint is the mechanism that ties a batch of data + an expectation suite + actions together. It's how you integrate GE into an Airflow DAG.
❌ The expectation suite is the JSON file with definitions. A Checkpoint is the validation runner that ties data + suite + actions together.
Q2. What is the key architectural difference between Deequ and Great Expectations?
✅ Correct! Deequ's two-phase model (Analyzers → Constraints) is designed for Spark and allows reusing computed metrics across multiple constraint checks efficiently.
❌ Deequ is open-source (AWS), both support Python, and they have fundamentally different APIs. Deequ's Analyzer→Constraint split is its key design pattern.
Q3. Which data observability pillar does this scenario describe: "The orders table usually has ~100k rows per day, but today only 200 rows were ingested"?
✅ Correct! A sudden drop in row count is a Volume anomaly — the table is there and fresh, but the data pipeline silently dropped most rows.
❌ This is a Volume anomaly — the count dropped dramatically. Freshness would be about the table not updating at all; distribution drift is about statistical properties like mean/std changing.
Q4. How does Monte Carlo differ from Great Expectations for detecting data anomalies?
✅ Correct! Monte Carlo's key differentiator is ML-powered, automatic anomaly detection. GE is great for explicit business rules; Monte Carlo catches unexpected anomalies you didn't think to write rules for.
❌ Monte Carlo learns patterns automatically — that's its key advantage. GE requires you to manually write each expectation with explicit thresholds.
Q5. In a Medallion architecture, where should DQ gates be placed?
✅ Correct! DQ gates at every layer boundary catch issues early and cheaply. Bad data at Bronze is free to fix; bad data that reaches Gold costs hours of reprocessing.
❌ DQ gates should exist at every layer boundary — it's far cheaper to catch problems early (at Bronze) than after expensive Gold processing has already run.
📌
Module 33 — Complete Summary
ToolTypeBest ForIntegration
Great Expectations Open Source Explicit rule-based validation, Data Docs, Airflow integration Spark, Pandas, SQL
Deequ Open Source (AWS) Large-scale Spark DQ, metric history, EMR workloads PySpark (pydeequ)
Soda Open Source + SaaS YAML-first checks, non-engineer readable, Snowflake-centric Spark, Snowflake, BigQuery
Monte Carlo Enterprise SaaS ML anomaly detection, automatic observability, no rule writing Connector-based (Snowflake, Databricks, Redshift)
Custom Framework DIY Full control, pipeline-native, metadata-driven rule registry Any PySpark pipeline
4 Observability Pillars
Freshness — is data recent? Volume — right number of rows? Distribution Drift — stats stable? Schema Drift — structure unchanged?
Custom Framework Components
DQRule dataclass DQRuleEngine runner Business Rule Registry Quality Score (0–100) CloudWatch + SNS Alerting Audit Log table
Interview cheat-sheet: When asked about DQ, say: "We validate at every layer boundary — schema/freshness/volume at Bronze entry, business rules (nulls, ranges, referential integrity) at Silver, and reconciliation at Gold. Failures route bad records to a quarantine path and publish metrics to CloudWatch for alarming. We track a quality score per run in an audit table."
🎉 Module 33 Complete! You now understand the full data quality ecosystem — Great Expectations (expectations, suites, checkpoints, Data Docs, Spark integration), Deequ (analyzers, constraints, metrics repository), Soda (SodaCL, Soda Core, Soda Cloud), Monte Carlo (ML-powered observability, integration patterns), all 4 observability pillars (freshness, volume, distribution drift, schema drift), and building a production custom DQ framework with rule engine, scoring, and alerting. Next: Module 34 — CI/CD and Production Deployment.