Data Quality Ecosystem
Bad data costs companies millions. Data Quality (DQ) is the practice of systematically detecting, measuring, and preventing data problems before they reach consumers. This module covers the full ecosystem — open-source tools, cloud-native solutions, and patterns every senior data engineer must know.
Great Expectations
Great Expectations (GE) is the most widely-used open-source data quality library. You define what good data looks like (expectations), then validate your DataFrames against those rules and publish human-readable reports called Data Docs.
Examples: "Column
user_id should never be null", "Column age should be between 0 and 120", "Column status should only contain ['active','inactive','pending']".
| Expectation | What it checks |
|---|---|
expect_column_values_to_not_be_null | No nulls in column |
expect_column_values_to_be_unique | All values unique |
expect_column_values_to_be_in_set | Values in allowed list |
expect_column_values_to_be_between | Numeric range check |
expect_column_values_to_match_regex | Regex pattern check |
expect_table_row_count_to_be_between | Row count range |
expect_column_to_exist | Column presence check |
expect_column_values_to_be_of_type | Data type check |
import great_expectations as ge
import pandas as pd
# Wrap a Pandas DataFrame in a GE DataFrame
df = ge.from_pandas(pd.DataFrame({
"user_id": [1, 2, 3, None],
"age": [25, 200, 30, 22],
"status": ["active", "inactive", "unknown", "active"]
}))
# Expectation 1: user_id should never be null
result = df.expect_column_values_to_not_be_null("user_id")
print(result["success"]) # False — one null found
# Expectation 2: age must be between 0 and 120
result = df.expect_column_values_to_be_between("age", 0, 120)
print(result["success"]) # False — 200 is out of range
# Expectation 3: status must be in allowed set
result = df.expect_column_values_to_be_in_set(
"status", ["active", "inactive", "pending"]
)
print(result["success"]) # False — "unknown" is not allowed
# Expectation 4: row count should be between 100 and 10000
result = df.expect_table_row_count_to_be_between(100, 10000)
print(result["success"]) # False — only 4 rows
orders.warning or users.critical). Suites are stored as JSON files.
import great_expectations as ge
# Initialize a Data Context (project configuration)
context = ge.get_context()
# Create a new expectation suite
suite = context.create_expectation_suite(
expectation_suite_name="orders.critical",
overwrite_existing=True
)
# Get a validator to add expectations interactively
batch_request = {
"datasource_name": "my_spark_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "orders",
}
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders.critical"
)
# Add expectations to the suite
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("amount", 0, 100000)
validator.expect_column_values_to_be_in_set(
"status", ["placed", "shipped", "delivered", "cancelled"]
)
validator.expect_table_row_count_to_be_between(1, 10_000_000)
# Save the suite
validator.save_expectation_suite(discard_failed_expectations=False)
print("Suite saved! All expectations stored to JSON.")
great_expectations.yml config file.
import great_expectations as ge
# Get the Data Context (reads great_expectations.yml)
context = ge.get_context()
# List available expectation suites
print(context.list_expectation_suite_names())
# Output: ['orders.critical', 'users.warning']
# List datasources configured
print(context.list_datasources())
# Get a specific suite
suite = context.get_expectation_suite("orders.critical")
print(len(suite.expectations), "expectations in suite")
# Ephemeral context (no filesystem, good for notebooks/testing)
context = ge.get_context(mode="ephemeral")
Checkpoints are how you integrate GE into an Airflow DAG or Spark pipeline.
import great_expectations as ge
context = ge.get_context()
# Add a SimpleCheckpoint — simplest form
checkpoint = context.add_or_update_checkpoint(
name="orders_daily_checkpoint",
validations=[
{
"batch_request": {
"datasource_name": "my_spark_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "orders",
},
"expectation_suite_name": "orders.critical",
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
{
"name": "send_slack_notification", # on failure
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK}",
"notify_on": "failure",
},
},
],
)
# Run the checkpoint
result = checkpoint.run()
# Check if validation passed
if not result["success"]:
raise Exception("Data quality validation failed! Stopping pipeline.")
else:
print("✅ All expectations passed. Continuing pipeline.")
# Data Docs are automatically built after checkpoint.run()
# You can also build them manually:
context.build_data_docs()
# Open in browser (local)
context.open_data_docs()
# Publish to S3 for team access
# Configure in great_expectations.yml:
# data_docs_sites:
# s3_site:
# class_name: SiteBuilder
# store_backend:
# class_name: TupleS3StoreBackend
# bucket: my-data-docs-bucket
# prefix: data_docs/
# What Data Docs show:
# ✅ Each expectation and whether it passed/failed
# 📊 Column statistics (mean, min, max, null %, unique %)
# 📋 Sample unexpected values (rows that failed)
# 📅 Historical validation results (trend over time)
from pyspark.sql import SparkSession
import great_expectations as ge
from great_expectations.dataset import SparkDFDataset
spark = SparkSession.builder.appName("GE_Demo").getOrCreate()
# Load a Spark DataFrame
df = spark.read.parquet("s3://my-bucket/orders/2024/01/15/")
# Wrap in GE SparkDFDataset
ge_df = SparkDFDataset(df)
# Run expectations — they execute as Spark jobs!
results = []
results.append(ge_df.expect_column_values_to_not_be_null("order_id"))
results.append(ge_df.expect_column_values_to_not_be_null("customer_id"))
results.append(ge_df.expect_column_values_to_be_between("amount", 0, 100000))
results.append(ge_df.expect_column_values_to_match_regex(
"email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
))
results.append(ge_df.expect_table_row_count_to_be_between(1000, 10_000_000))
# Check all results
failures = [r for r in results if not r["success"]]
if failures:
for f in failures:
print(f"❌ FAILED: {f['expectation_config']['expectation_type']}")
print(f" Unexpected: {f['result'].get('unexpected_count', 'N/A')}")
raise Exception(f"{len(failures)} DQ checks failed!")
else:
print("✅ All DQ checks passed!")
Deequ (AWS)
Deequ is an open-source library by AWS, built on top of Apache Spark. It is designed for large-scale data quality validation on massive datasets — think billions of rows on EMR. It uses a two-phase model: first compute metrics, then verify constraints.
# pip install pydeequ
from pyspark.sql import SparkSession
from pydeequ.analyzers import (
AnalysisRunner, AnalyzerContext,
Completeness, Distinctness, Mean,
StandardDeviation, Minimum, Maximum,
Size, Compliance, Uniqueness
)
spark = SparkSession.builder \
.config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.3-spark-3.3") \
.getOrCreate()
df = spark.read.parquet("s3://my-bucket/orders/")
# Run multiple analyzers in a single Spark scan
analysis_result = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("order_id")) \
.addAnalyzer(Completeness("customer_id")) \
.addAnalyzer(Distinctness("order_id")) \
.addAnalyzer(Uniqueness(["order_id"])) \
.addAnalyzer(Mean("amount")) \
.addAnalyzer(StandardDeviation("amount")) \
.addAnalyzer(Minimum("amount")) \
.addAnalyzer(Maximum("amount")) \
.addAnalyzer(Compliance("valid_status", "status IN ('placed','shipped','delivered')")) \
.run()
# Convert results to a DataFrame
metrics_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysis_result)
metrics_df.show(truncate=False)
# Output looks like:
# | entity | instance | name | value |
# | Dataset | * | Size | 125000 |
# | Column | order_id | Completeness | 1.0 |
# | Column | amount | Mean | 87.42 |
# | Column | valid_status | Compliance | 0.987 |
VerificationSuite. Constraints say things like: "Completeness of order_id must equal 1.0" or "Mean of amount must be between 50 and 200".
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
# Define a Check (named group of constraints)
check = Check(spark, CheckLevel.Error, "Orders DQ Check")
check_result = VerificationSuite(spark) \
.onData(df) \
.addCheck(
check
.hasSize(lambda x: x > 0) # table not empty
.isComplete("order_id") # no nulls
.isComplete("customer_id")
.isUnique("order_id") # primary key
.isContainedIn("status", ["placed", "shipped", "delivered", "cancelled"])
.isNonNegative("amount") # no negative amounts
.satisfies("amount < 100000", "Amount below cap") # business rule
.hasCompleteness("email", lambda c: c >= 0.95) # 95% not null
.hasDistinctness(["order_id"], lambda d: d == 1.0) # all distinct
) \
.run()
# Get results as DataFrame
result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
result_df.show(truncate=False)
# Fail pipeline if any constraint violated
if check_result.status != "Success":
failures = result_df.filter(result_df.constraint_status != "Success")
failures.show(truncate=False)
raise Exception("Deequ constraints failed! Halting pipeline.")
from pydeequ.repository import FileSystemMetricsRepository, ResultKey
import time
# Create a file-system backed metrics repository
metrics_path = "s3://my-bucket/dq-metrics/orders/"
repository = FileSystemMetricsRepository(spark, metrics_path)
# Tag this run with a timestamp (used to query historical metrics)
result_key = ResultKey(spark, int(time.time() * 1000), {
"pipeline": "orders_daily",
"environment": "production"
})
# Run analysis and save to repository
AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Completeness("order_id")) \
.addAnalyzer(Mean("amount")) \
.useRepository(repository) \
.saveOrAppendResult(result_key) \
.run()
# Load historical metrics
previous_metrics = repository.load() \
.forAnalyzers([Completeness("order_id")]) \
.getSuccessMetricsAsDataFrame()
previous_metrics.show()
# Shows completeness score for every day you've run this pipeline
Soda
Soda makes data quality checks as simple as writing YAML. With SodaCL (Soda Check Language), non-engineers can read and write quality checks. Soda Core is the open-source library; Soda Cloud is the SaaS dashboard for teams.
# Install for Spark
pip install soda-core-spark
# Install for Snowflake
pip install soda-core-snowflake
# Install for BigQuery
pip install soda-core-bigquery
# checks.yml — define all your DQ rules here
checks for orders: # checks for table/dataset named "orders"
# Row count must be positive
- row_count > 0
# order_id must never be null
- missing_count(order_id) = 0:
name: Order ID must not be null
# customer_id must not be null
- missing_count(customer_id) = 0
# order_id must be unique (primary key)
- duplicate_count(order_id) = 0
# amount must be between 0 and 100000
- min(amount) >= 0
- max(amount) <= 100000
- avg(amount) between 10 and 10000
# Status must only have valid values
- invalid_count(status) = 0:
valid values: [placed, shipped, delivered, cancelled]
# Email format validation (regex)
- invalid_count(email) = 0:
valid format: email
# Freshness check — data must be at most 1 day old
- freshness(order_date) < 1d:
name: Orders must arrive within 1 day
# Schema check — these columns must exist
- schema:
name: Expected columns present
fail:
when required column missing: [order_id, customer_id, amount, status]
from pyspark.sql import SparkSession
from soda.scan import Scan
spark = SparkSession.builder.appName("SodaDemo").getOrCreate()
# Load DataFrame
df = spark.read.parquet("s3://my-bucket/orders/")
df.createOrReplaceTempView("orders") # Soda queries via SQL on the view
# Create a Soda Scan
scan = Scan()
scan.set_scan_definition_name("orders_daily_check")
scan.set_data_source_name("spark_datasource")
# Add checks from file
scan.add_sodacl_yaml_file("checks.yml")
# Add the Spark session
scan.add_spark_session(spark)
# Run the scan
scan.execute()
# Check results
print(scan.get_logs_text())
if scan.has_check_fails():
print("❌ Soda checks FAILED:")
print(scan.get_checks_fail())
raise Exception("Data quality checks failed!")
else:
print("✅ All Soda checks passed!")
# ~/.soda/configuration.yml
soda_cloud:
host: cloud.soda.io
api_key_id: ${SODA_API_KEY_ID}
api_key_secret: ${SODA_API_KEY_SECRET}
Monte Carlo
Monte Carlo is an enterprise data observability platform. Unlike GE/Deequ/Soda where you write rules manually, Monte Carlo uses ML to automatically learn what "normal" looks like for your data and alerts you when anomalies occur — without you having to write explicit thresholds.
It learns baselines automatically (e.g., "orders table usually has 50k–100k rows per day") and alerts you when something deviates — before downstream consumers notice.
from pycarlo.core import Client, Session
# pip install pycarlo
session = Session(mcd_id="YOUR_MCD_ID", mcd_token="YOUR_MCD_TOKEN")
client = Client(session=session)
# Send a custom event to Monte Carlo
# (e.g., notify MC when your pipeline finishes)
client.create_or_update_lineage_node(
node_name="orders",
object_type="table",
resource_name="snowflake"
)
# Report a custom metric to MC
client.report_dataset_status(
status="success",
dataset_name="orders",
row_count=125000
)
- Volume anomaly: Table had 80k rows yesterday, today only 200 — silent pipeline failure?
- Freshness anomaly: Orders table usually updates every hour, but it's been 6 hours since the last update.
- Distribution anomaly: Revenue column suddenly has negative values or extreme outliers.
- Schema change: Column
user_emailwas silently dropped from a table. - Field health: Null rate for
shipping_addressjumped from 2% to 45%.
Data Observability
Data Observability is the ability to understand the health of your data at any point in time. It has 4 pillars — freshness, volume, distribution drift, and schema drift. Implementing these lets you know when something is broken before your users do.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta
spark = SparkSession.builder.appName("Freshness").getOrCreate()
df = spark.read.parquet("s3://my-bucket/orders/")
# ── Freshness check ──────────────────────────────────
max_ts = df.select(F.max("updated_at")).collect()[0][0]
now = datetime.utcnow()
staleness_hours = (now - max_ts).total_seconds() / 3600
threshold_hours = 2 # alert if data is older than 2 hours
if staleness_hours > threshold_hours:
msg = (
f"⚠️ FRESHNESS ALERT: orders table is "
f"{staleness_hours:.1f}h stale! "
f"Max updated_at = {max_ts}"
)
print(msg)
# send_slack_alert(msg) or sns.publish(...)
raise Exception(msg)
else:
print(f"✅ Freshness OK — data is {staleness_hours:.1f}h old")
# ── Last updated timestamp monitoring ────────────────
# Store the max_ts in a control table for SLA tracking
freshness_record = spark.createDataFrame([{
"table_name": "orders",
"max_updated_at": max_ts,
"staleness_hours": staleness_hours,
"check_time": now,
"status": "OK" if staleness_hours <= threshold_hours else "STALE"
}])
freshness_record.write.mode("append").saveAsTable("dq.freshness_log")
from pyspark.sql import functions as F
import boto3, json
from datetime import date
df = spark.read.parquet("s3://my-bucket/orders/date=2024-01-15/")
today_count = df.count()
# ── Compare against historical baseline ──────────────
# Load yesterday's row count from DynamoDB control table
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("pipeline_volume_history")
yesterday = (date.today() - timedelta(days=1)).isoformat()
response = table.get_item(Key={"table_name": "orders", "date": yesterday})
yesterday_count = response.get("Item", {}).get("row_count", today_count)
# ── Anomaly detection with z-score logic ─────────────
change_pct = abs(today_count - yesterday_count) / max(yesterday_count, 1) * 100
threshold_pct = 30 # alert if count changes by more than 30%
if change_pct > threshold_pct:
msg = (
f"⚠️ VOLUME ANOMALY: orders row count changed by {change_pct:.1f}%! "
f"Today: {today_count:,} | Yesterday: {yesterday_count:,}"
)
print(msg)
# Publish CloudWatch metric for alerting
cloudwatch = boto3.client("cloudwatch")
cloudwatch.put_metric_data(
Namespace="DataQuality",
MetricData=[{
"MetricName": "VolumeChangePercent",
"Dimensions": [{"Name": "TableName", "Value": "orders"}],
"Value": change_pct,
"Unit": "Percent"
}]
)
# Save today's count to history
table.put_item(Item={
"table_name": "orders",
"date": date.today().isoformat(),
"row_count": today_count
})
print(f"✅ Volume check done: {today_count:,} rows")
from pyspark.sql import functions as F
# ── Compute today's distribution stats ───────────────
stats = df.select(
F.count("amount").alias("count"),
F.mean("amount").alias("mean"),
F.stddev("amount").alias("stddev"),
F.min("amount").alias("min"),
F.max("amount").alias("max"),
F.percentile_approx("amount", 0.5).alias("median"),
F.percentile_approx("amount", 0.95).alias("p95"),
F.sum(F.when(F.col("amount").isNull(), 1).otherwise(0)).alias("null_count")
).collect()[0]
today_mean = stats["mean"]
today_std = stats["stddev"]
# ── Z-score monitoring against baseline ──────────────
baseline_mean = 87.42 # from historical average
baseline_std = 12.3
z_score = abs(today_mean - baseline_mean) / max(baseline_std, 0.001)
if z_score > 3: # more than 3 standard deviations = anomaly
print(f"⚠️ DISTRIBUTION DRIFT: amount mean = {today_mean:.2f} "
f"(baseline: {baseline_mean:.2f}, z-score: {z_score:.1f})")
# ── Histogram comparison ─────────────────────────────
# Today's value distribution by bucket
buckets = df.select(
F.when(F.col("amount") < 50, "0-50")
.when(F.col("amount") < 200, "50-200")
.when(F.col("amount") < 1000, "200-1000")
.otherwise("1000+").alias("bucket")
).groupBy("bucket").count()
buckets.show()
from pyspark.sql.types import StructType
import json
# ── Expected schema (stored from last known-good run) ─
expected_schema_json = """
{
"fields": [
{"name": "order_id", "type": "string", "nullable": false},
{"name": "customer_id", "type": "integer", "nullable": false},
{"name": "amount", "type": "double", "nullable": true},
{"name": "status", "type": "string", "nullable": true},
{"name": "order_date", "type": "timestamp", "nullable": true}
]
}
"""
expected_fields = {f["name"]: f["type"] for f in json.loads(expected_schema_json)["fields"]}
# ── Actual schema from today's data ──────────────────
actual_fields = {f.name: f.dataType.simpleString() for f in df.schema.fields}
# ── Column addition/removal alerts ───────────────────
added_cols = set(actual_fields.keys()) - set(expected_fields.keys())
removed_cols = set(expected_fields.keys()) - set(actual_fields.keys())
if added_cols:
print(f"ℹ️ NEW columns detected: {added_cols}")
if removed_cols:
print(f"❌ REMOVED columns: {removed_cols}")
raise Exception(f"Schema drift! Columns removed: {removed_cols}")
# ── Data type change monitoring ───────────────────────
type_changes = []
for col in expected_fields:
if col in actual_fields and actual_fields[col] != expected_fields[col]:
type_changes.append(f"{col}: {expected_fields[col]} → {actual_fields[col]}")
if type_changes:
print(f"⚠️ DATA TYPE CHANGES: {type_changes}")
if not removed_cols and not type_changes:
print("✅ Schema drift check passed — no breaking changes")
Custom DQ Framework
In many production environments you'll build your own lightweight DQ framework on top of PySpark — especially when you need tight integration with your pipeline's audit tables, alerting system, and quarantine logic. This section covers building one from scratch.
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
class Severity(Enum):
WARNING = "WARNING"
ERROR = "ERROR" # fail the pipeline
@dataclass
class DQRule:
"""A single data quality rule."""
rule_id: str
description: str
check_expr: str # SQL expression that returns True for VALID rows
column: Optional[str] # None means table-level rule
severity: Severity = Severity.ERROR
threshold: float = 1.0 # fraction of rows that must pass (1.0 = all)
class DQRuleEngine:
"""Generic rule engine that applies DQ rules to any DataFrame."""
def __init__(self, spark: SparkSession):
self.spark = spark
self.results = []
def run(self, df: DataFrame, rules: List[DQRule], table_name: str) -> dict:
df.createOrReplaceTempView("__dq_input__")
total_rows = df.count()
for rule in rules:
# Count rows that PASS the rule
pass_count = self.spark.sql(
f"SELECT COUNT(*) FROM __dq_input__ WHERE {rule.check_expr}"
).collect()[0][0]
pass_rate = pass_count / max(total_rows, 1)
passed = pass_rate >= rule.threshold
self.results.append({
"table_name": table_name,
"rule_id": rule.rule_id,
"description": rule.description,
"severity": rule.severity.value,
"total_rows": total_rows,
"pass_count": pass_count,
"fail_count": total_rows - pass_count,
"pass_rate": round(pass_rate, 4),
"threshold": rule.threshold,
"passed": passed,
})
return self.results
def get_results_df(self) -> DataFrame:
return self.spark.createDataFrame(self.results)
def has_errors(self) -> bool:
return any(
not r["passed"] and r["severity"] == "ERROR"
for r in self.results
)
# ── Rule Registry ─────────────────────────────────────
# In production, load this from a Delta/DynamoDB table
orders_rules = [
DQRule(
rule_id="ORD-001",
description="order_id must not be null",
check_expr="order_id IS NOT NULL",
column="order_id",
severity=Severity.ERROR,
threshold=1.0 # ALL rows must pass
),
DQRule(
rule_id="ORD-002",
description="amount must be non-negative",
check_expr="amount >= 0",
column="amount",
severity=Severity.ERROR,
threshold=1.0
),
DQRule(
rule_id="ORD-003",
description="status must be valid",
check_expr="status IN ('placed','shipped','delivered','cancelled')",
column="status",
severity=Severity.ERROR,
threshold=0.999 # allow 0.1% bad rows (noise in source)
),
DQRule(
rule_id="ORD-004",
description="email format valid (warning only)",
check_expr="email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'",
column="email",
severity=Severity.WARNING,
threshold=0.95 # 95% of emails must be valid format
),
DQRule(
rule_id="ORD-005",
description="extreme amounts flagged",
check_expr="amount < 100000",
column="amount",
severity=Severity.WARNING,
threshold=0.999
),
]
# ── Run the engine ────────────────────────────────────
engine = DQRuleEngine(spark)
engine.run(df, orders_rules, table_name="orders")
results_df = engine.get_results_df()
results_df.show(truncate=False)
if engine.has_errors():
raise Exception("ERROR-severity DQ rules failed. Pipeline halted.")
def compute_dq_score(results: list) -> float:
"""
Compute a weighted quality score 0-100.
ERROR rules have weight 2, WARNING rules weight 1.
"""
total_weight = 0
weighted_score = 0
for r in results:
weight = 2 if r["severity"] == "ERROR" else 1
weighted_score += r["pass_rate"] * weight
total_weight += weight
score = (weighted_score / max(total_weight, 1)) * 100
return round(score, 2)
# Compute score
dq_score = compute_dq_score(engine.results)
print(f"📊 Data Quality Score: {dq_score:.1f} / 100")
# Classify score
if dq_score >= 98:
grade = "🟢 EXCELLENT"
elif dq_score >= 90:
grade = "🟡 GOOD"
elif dq_score >= 70:
grade = "🟠 FAIR"
else:
grade = "🔴 POOR"
print(f"Grade: {grade}")
# Save score to audit table
audit_record = spark.createDataFrame([{
"run_id": "run_20240115_001",
"table_name": "orders",
"dq_score": dq_score,
"total_rules": len(engine.results),
"failed_rules": sum(1 for r in engine.results if not r["passed"]),
"run_timestamp": datetime.utcnow().isoformat()
}])
audit_record.write.mode("append").saveAsTable("dq.audit_log")
import boto3, json
from datetime import datetime
def publish_dq_metrics(results: list, dq_score: float, table_name: str):
"""Publish DQ metrics to CloudWatch for alarming."""
cw = boto3.client("cloudwatch")
sns = boto3.client("sns")
# Publish overall score metric
cw.put_metric_data(
Namespace="DataQuality",
MetricData=[
{
"MetricName": "DQScore",
"Dimensions": [{"Name": "TableName", "Value": table_name}],
"Value": dq_score,
"Unit": "None",
},
{
"MetricName": "FailedRules",
"Dimensions": [{"Name": "TableName", "Value": table_name}],
"Value": sum(1 for r in results if not r["passed"]),
"Unit": "Count",
}
]
)
# If errors found → publish SNS alert with details
error_rules = [r for r in results if not r["passed"] and r["severity"] == "ERROR"]
if error_rules:
message = {
"alert_type": "DQ_FAILURE",
"table_name": table_name,
"dq_score": dq_score,
"timestamp": datetime.utcnow().isoformat(),
"failed_rules": [
{
"rule_id": r["rule_id"],
"description": r["description"],
"fail_count": r["fail_count"],
"pass_rate": r["pass_rate"]
}
for r in error_rules
]
}
sns.publish(
TopicArn="arn:aws:sns:us-east-1:123456789:dq-alerts",
Subject=f"🚨 DQ FAILURE — {table_name} score: {dq_score:.1f}",
Message=json.dumps(message, indent=2)
)
print(f"🚨 Alert published to SNS for {len(error_rules)} failed rules")
# ── Full pipeline pattern ─────────────────────────────
engine = DQRuleEngine(spark)
engine.run(df, orders_rules, "orders")
score = compute_dq_score(engine.results)
publish_dq_metrics(engine.results, score, "orders")
if engine.has_errors():
raise Exception("Pipeline stopped due to DQ errors.")
# Only write to Silver if DQ passed
df.write.mode("append").saveAsTable("silver.orders")
print("✅ Data written to Silver layer successfully.")
Quiz & Summary
Test your understanding of the Data Quality Ecosystem. These are the kinds of questions that come up in senior data engineering interviews.
| Tool | Type | Best For | Integration |
|---|---|---|---|
| Great Expectations | Open Source | Explicit rule-based validation, Data Docs, Airflow integration | Spark, Pandas, SQL |
| Deequ | Open Source (AWS) | Large-scale Spark DQ, metric history, EMR workloads | PySpark (pydeequ) |
| Soda | Open Source + SaaS | YAML-first checks, non-engineer readable, Snowflake-centric | Spark, Snowflake, BigQuery |
| Monte Carlo | Enterprise SaaS | ML anomaly detection, automatic observability, no rule writing | Connector-based (Snowflake, Databricks, Redshift) |
| Custom Framework | DIY | Full control, pipeline-native, metadata-driven rule registry | Any PySpark pipeline |