MODULE 32 Data Governance
1 / 7 sections
MODULE 32 β€” OVERVIEW

Data Governance

Data Governance answers the question: "Who owns what data, where did it come from, who can access it, and how do we ensure it's trustworthy?" It is the set of policies, tools, and processes that make data reliable, discoverable, and compliant at enterprise scale.

πŸ”—
Data Lineage
Track the full journey of data β€” source β†’ transform β†’ target β€” using OpenLineage and Marquez.
πŸ—„οΈ
Metadata Management
DataHub and Apache Atlas give you a searchable, tagged catalog of every dataset in your org.
πŸ“š
Data Catalogs
Glue Catalog, Hive Metastore, Unity Catalog β€” the table registry where Spark finds schemas and partitions.
🏷️
Classification & Best Practices
PII tagging, sensitivity labels, data ownership, access reviews β€” operationalise governance at scale.
MODULE 32 β€” WHAT YOU WILL LEARN 32.1 Data Lineage β†’ OpenLineage standard, emitting lineage from Spark, Marquez server + lineage graph visualisation 32.2 Metadata Management β†’ DataHub (architecture, ingestion, lineage, discovery) Apache Atlas (connector, classification, tagging) 32.3 Data Catalogs β†’ Glue Catalog (tables, partitions, schema versions) Hive Metastore (managed vs external tables) Unity Catalog (3-level namespace, lineage, search) 32.4 Data Classification β†’ PII tagging, sensitivity labels, tag propagation, automated classification 32.5 Governance Best Prax β†’ Data ownership, access reviews, policy enforcement, audit trail requirements
Why this matters for your career: Data governance questions appear in senior DE and lead engineer rounds. Interviewers want to know how you track where data came from, how you classify PII, and how you enforce access policies β€” not just how you write Spark code.
32.1 β€” DATA LINEAGE

Data Lineage

Data lineage is the ability to answer: "Where did this column's value come from?" and "Which downstream tables are broken if I change this source?" It is the backbone of trustworthy data engineering.

πŸ”—
OpenLineage Standard
OPEN STANDARD β–Ό
Concept
What is OpenLineage?
OpenLineage is a vendor-neutral open standard for collecting and sharing data lineage metadata. Think of it as a common language that every data tool (Spark, Airflow, dbt, Flink) can use to say: "I just ran a job that read from table A and wrote to table B."

Without a standard, every tool invents its own lineage format β†’ silos. With OpenLineage, you get a unified lineage graph across all tools.
Without OpenLineage: Spark ──(proprietary)──► Spark lineage store (can't see Airflow jobs) Airflow ─(proprietary)─► Airflow lineage store (can't see Spark jobs) dbt ────(proprietary)──► dbt lineage store (isolated) With OpenLineage: Spark ──(OpenLineage event)──┐ Airflow ─(OpenLineage event)─┼──► Marquez / DataHub (UNIFIED GRAPH) dbt ────(OpenLineage event)β”€β”€β”˜
Key idea: OpenLineage defines the event format (a JSON schema). It does NOT store events itself β€” you need a backend like Marquez to receive and store them.
Event Format
OpenLineage Event Structure
Every OpenLineage event is a JSON object emitted when a job starts, completes, or fails. The key fields are:
{ "eventType": "COMPLETE", ← START / COMPLETE / FAIL / ABORT "eventTime": "2024-03-01T10:00Z", "run": { "runId": "uuid-of-this-run" }, "job": { "namespace": "my-spark-cluster", "name": "etl_orders_to_gold" ← the job name }, "inputs": [ ← what was READ { "namespace": "s3://my-bucket", "name": "bronze/orders", "facets": { "schema": { ... } ← schema of input } } ], "outputs": [ ← what was WRITTEN { "namespace": "s3://my-bucket", "name": "gold/orders_agg", "facets": { "schema": { ... }, "outputStatistics": { "rowCount": 1000000 } } } ] }
Facets are extensible metadata blocks (schema, statistics, column-level lineage, etc.). OpenLineage's facet system is what makes it rich and extensible.
Integration
Emitting Lineage from Spark
The OpenLineage Spark integration is a listener that automatically emits lineage events at the end of each Spark job β€” zero code changes needed in your PySpark scripts.
python β€” spark-submit config
# Option 1: Pass as spark-submit flags
# spark-submit \
#   --conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \
#   --conf spark.openlineage.transport.type=http \
#   --conf spark.openlineage.transport.url=http://marquez:5000 \
#   --conf spark.openlineage.transport.endpoint=/api/v1/lineage \
#   --conf spark.openlineage.namespace=my-spark-cluster \
#   my_etl_job.py

# Option 2: SparkSession config in code
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("orders_etl")
    .config("spark.extraListeners",
             "io.openlineage.spark.agent.OpenLineageSparkListener")
    .config("spark.openlineage.transport.type", "http")
    .config("spark.openlineage.transport.url", "http://marquez:5000")
    .config("spark.openlineage.transport.endpoint", "/api/v1/lineage")
    .config("spark.openlineage.namespace", "production-spark")
    .getOrCreate()
)

# Your normal PySpark code β€” lineage emitted AUTOMATICALLY
df = spark.read.parquet("s3://bucket/bronze/orders/")
agg = df.groupBy("customer_id").agg({"amount": "sum"})
agg.write.parquet("s3://bucket/gold/orders_agg/")

# OpenLineage listener will emit:
# START event when the job begins
# COMPLETE event when it finishes (with input/output datasets + schemas)
Dependency: Add the OpenLineage Spark jar to your cluster. Maven coord: io.openlineage:openlineage-spark_2.12:1.x.x
Column-Level Lineage
Column-Level Lineage Facet
Beyond table-level lineage ("job read table A, wrote table B"), OpenLineage can track column-level lineage β€” "output column total_revenue was derived from input columns amount and discount."
python β€” column lineage example
from pyspark.sql import functions as F

# Column lineage is captured automatically by the listener when you do transforms
df = spark.read.parquet("s3://bucket/bronze/sales")

result = df.withColumn(
    "total_revenue",
    F.col("amount") - F.col("discount")   # OpenLineage captures this derivation
).withColumn(
    "profit_margin",
    F.col("total_revenue") / F.col("cost")
)

result.write.parquet("s3://bucket/gold/revenue_report")

# The emitted lineage event will contain a ColumnLineageDatasetFacet:
# total_revenue  ← [bronze/sales.amount, bronze/sales.discount]
# profit_margin  ← [bronze/sales.total_revenue, bronze/sales.cost]
# This lets analysts trace where any output column came from.
πŸ—ΊοΈ
Marquez β€” Open Source Lineage Server
OPEN SOURCE β–Ό
Concept
What is Marquez?
Marquez is the open-source backend server that receives OpenLineage events and stores them in a lineage graph. Think of OpenLineage as the language, and Marquez as the database + UI that speaks it.

It was created by WeWork and donated to the Linux Foundation. It is one of two major OpenLineage backends (the other being DataHub).
HOW MARQUEZ FITS IN Spark Job ──OpenLineage event──► Marquez API (port 5000) Airflow DAG ──OpenLineage event──► β”‚ dbt run ──OpenLineage event──► β”‚ β–Ό Marquez Postgres DB β”‚ β–Ό Marquez UI (port 3000) [lineage graph + search]
Setup
Marquez Server Setup (Docker)
yaml β€” docker-compose
# docker-compose.yml β€” run Marquez locally
version: '3'
services:
  marquez:
    image: marquezproject/marquez:latest
    ports:
      - "5000:5000"     # API port
      - "5001:5001"     # Admin port
    environment:
      - MARQUEZ_PORT=5000
      - MARQUEZ_ADMIN_PORT=5001

  marquez-web:
    image: marquezproject/marquez-web:latest
    ports:
      - "3000:3000"     # UI port β€” open http://localhost:3000
    environment:
      - MARQUEZ_HOST=marquez
      - MARQUEZ_PORT=5000

  postgres:
    image: postgres:14
    environment:
      POSTGRES_USER: marquez
      POSTGRES_PASSWORD: password
      POSTGRES_DB: marquez
Quick start: docker-compose up β†’ open http://localhost:3000 β†’ run a Spark job with OpenLineage listener configured β†’ watch the lineage graph appear in real time.
Marquez API
Querying Marquez API
Marquez exposes a REST API so you can query lineage programmatically β€” useful for building custom lineage dashboards or audit tools.
python β€” querying marquez API
import requests

MARQUEZ = "http://localhost:5000"

# List all namespaces
resp = requests.get(f"{MARQUEZ}/api/v1/namespaces")
print(resp.json())

# List datasets in a namespace
resp = requests.get(f"{MARQUEZ}/api/v1/namespaces/production-spark/datasets")
datasets = resp.json()['datasets']
for ds in datasets:
    print(ds['name'], ds['description'])

# Get lineage graph for a specific dataset
resp = requests.get(
    f"{MARQUEZ}/api/v1/lineage",
    params={
        "nodeId": "dataset:production-spark:gold/orders_agg",
        "depth": 3     # how many hops upstream/downstream
    }
)
lineage = resp.json()
print("Upstream nodes:", lineage['graph'])
32.2 β€” METADATA MANAGEMENT

Metadata Management

Metadata is data about your data β€” schema, owner, tags, lineage, quality scores. Without a metadata management platform, data engineers spend hours hunting for the right table. With one, discovery and trust become instant.

🌐
DataHub
LINKEDIN OSS β–Ό
Architecture
DataHub Architecture
DataHub (built by LinkedIn, open-sourced in 2020) is a modern data catalog and metadata platform. It models everything as an entity (dataset, pipeline, user, chart) with aspects (schema, ownership, lineage, tags).
DATAHUB ARCHITECTURE β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ PRODUCERS (push metadata) β”‚ β”‚ Spark β”‚ Airflow β”‚ dbt β”‚ Kafka β”‚ REST API β”‚ └──────────────┬────────────────────────────────────────--β”˜ β”‚ MCE (Metadata Change Events) via Kafka β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ DataHub Ingestion Layer β”‚ ← ingestion-framework connectors β”‚ (Glue, Hive, JDBC, S3..)β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Metadata Store (GMS) │◄────►│ Graph DB β”‚ β”‚ (Generalized Metadata β”‚ β”‚ (entity graph) β”‚ β”‚ Service β€” REST/GraphQL)β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ DataHub UI + Search β”‚ ← data discovery for analysts β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Ingestion
Ingesting Spark Metadata into DataHub
DataHub has two ways to get Spark metadata: (1) the DataHub Spark agent (listens to jobs, similar to OpenLineage), and (2) the ingestion framework (crawls Glue/Hive metastore to pull table schemas).
python β€” datahub spark agent config
# Install: pip install acryl-datahub[spark]

# In your SparkSession builder:
spark = (
    SparkSession.builder
    .appName("orders_pipeline")
    .config("spark.extraListeners",
             "datahub.spark.DatahubSparkListener")
    .config("spark.datahub.rest.server",
             "http://datahub-gms:8080")          # DataHub GMS endpoint
    .config("spark.datahub.rest.token",
             "your_personal_access_token")
    .config("spark.datahub.metadata.pipeline.platformInstance",
             "production")
    .getOrCreate()
)

# Every Spark job now auto-emits lineage to DataHub
yaml β€” ingestion recipe (crawl glue catalog)
# datahub_glue_recipe.yml
# Run: datahub ingest -c datahub_glue_recipe.yml

source:
  type: glue
  config:
    aws_region: us-east-1
    extract_transforms: true      # pull Glue ETL job lineage too

sink:
  type: datahub-rest
  config:
    server: "http://localhost:8080"
    token: "your_token_here"
Lineage in DataHub
Viewing Lineage in DataHub
DataHub renders a lineage graph where you can see upstream (where data came from) and downstream (who depends on this table) entities. This is critical for impact analysis β€” before changing a table, see all dashboards and pipelines that depend on it.
python β€” programmatic lineage query via datahub SDK
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Connect to DataHub
graph = DataHubGraph(DatahubClientConfig(
    server="http://localhost:8080",
    token="your_token"
))

# Get lineage for a dataset
entity_urn = "urn:li:dataset:(urn:li:dataPlatform:s3,gold/orders_agg,PROD)"

lineage = graph.get_entity_lineage(
    entity_urn=entity_urn,
    direction="UPSTREAM",   # or "DOWNSTREAM"
    max_hops=3
)

for upstream in lineage.upstreams:
    print(f"Upstream: {upstream.dataset}")

# Search for datasets
results = graph.search_entities(
    entity_type="dataset",
    query="orders",
    start=0,
    count=10
)
for r in results:
    print(r['entity'])
Discovery
Data Discovery Features
DataHub's UI lets users search for datasets by name, column name, tag, or owner. Every dataset has a detail page showing schema, sample data, lineage, and ownership. This transforms data discovery from tribal knowledge into self-service.
πŸ”
Full-text search
Search "customer_id" to find every table containing that column.
πŸ“Š
Schema browsing
See every column, its type, description, and sample values.
πŸ‘€
Ownership
Every dataset has an assigned owner β€” "who do I ask about this table?"
⭐
Glossary terms
Attach business glossary terms ("LTV", "MRR") to datasets for business context.
⚑
Apache Atlas
APACHE β–Ό
Architecture
Atlas Architecture
Apache Atlas is the traditional metadata and governance platform in the Hadoop ecosystem. It is deeply integrated with HDP (Hortonworks) clusters and Apache Ranger (for access control). It uses a type system to model entities and relationships.
ATLAS ARCHITECTURE β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ METADATA SOURCES β”‚ β”‚ Hive β”‚ HBase β”‚ Kafka β”‚ Spark β”‚ β”‚ (via Spark Atlas Connector) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ Kafka messages (entity events) β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Atlas Ingest (Kafka topic) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Atlas Core β”‚ β”‚ β”œβ”€ Type System β”‚ ← entity types and relationships β”‚ β”œβ”€ Graph DB (JanusGraph) β”‚ ← stores entities + lineage β”‚ β”œβ”€ Search (Solr/ES) β”‚ ← full-text search β”‚ └─ Notification (Kafka) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Atlas UI + REST API β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Spark Connector
Spark Atlas Connector
The Spark Atlas Connector (SAC) automatically captures lineage from Spark jobs and pushes entities (datasets, columns) and relationships (lineage) into Atlas β€” similar to the OpenLineage listener.
python β€” spark atlas connector config
# spark-submit with Spark Atlas Connector
# --packages com.hortonworks.spark:spark-atlas-connector_2.12:0.1.0-SNAPSHOT

spark = (
    SparkSession.builder
    .appName("orders_pipeline")
    .config("spark.extraListeners",
             "com.hortonworks.spark.atlas.SparkAtlasEventTracker")
    .config("spark.sql.queryExecutionListeners",
             "com.hortonworks.spark.atlas.SparkAtlasEventTracker")
    .config("spark.sql.streaming.streamingQueryListeners",
             "com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker")
    .getOrCreate()
)

# atlas-application.properties also needed on driver:
# atlas.client.type=kafka
# atlas.kafka.bootstrap.servers=kafka:9092
Classification & Tagging
Atlas Classification and Tagging
Atlas's key differentiator is its classification system β€” you can tag entities with labels like PII, SENSITIVE, FINANCIAL. These tags propagate through the lineage graph, so if a source column is tagged PII, every downstream column derived from it automatically inherits that tag.
python β€” atlas REST API β€” add classification
import requests
import json

ATLAS = "http://atlas-server:21000"
AUTH = ("admin", "admin")

# 1. Find the entity GUID for a table
resp = requests.get(
    f"{ATLAS}/api/atlas/v2/search/basic",
    params={"typeName": "hive_table", "query": "customers"},
    auth=AUTH
)
guid = resp.json()["entities"][0]["guid"]

# 2. Add a PII classification to the entity
payload = [
    {
        "typeName": "PII",
        "attributes": {}
    }
]
resp = requests.post(
    f"{ATLAS}/api/atlas/v2/entity/guid/{guid}/classifications",
    json=payload,
    auth=AUTH
)
print("Classification added:", resp.status_code)

# 3. Tag propagation β€” all downstream datasets derived from
# 'customers' will automatically inherit the PII tag
# This makes compliance audits trivial: search for all PII-tagged datasets
Tag propagation is Atlas's superpower. Tag a raw source column as PII once β€” every derived table, view, and report that uses it gets tagged automatically. GDPR compliance becomes a search query.
Comparison
DataHub vs Apache Atlas
AspectDataHubApache Atlas
Created byLinkedInApache / Cloudera
Best fitModern cloud-native stacks (Spark, dbt, Airflow)Hadoop ecosystem (Hive, HBase, Cloudera)
Lineage standardOpenLineage + customCustom Kafka events
UIModern React UI, fast searchOlder UI, functional
Graph DBCustom entity store + ElasticsearchJanusGraph + Solr
Tag propagationYes (via lineage)Yes (first-class feature)
Active devVery active (Acryl Data)Moderate
32.3 β€” DATA CATALOGS

Data Catalogs

A data catalog is the table registry that Spark consults to find where tables live, what their schemas are, and how they are partitioned. Without a catalog, Spark cannot resolve table names in SQL queries.

☁️
AWS Glue Catalog
AWS β–Ό
Concept
Table Definitions
The AWS Glue Data Catalog is a managed Hive-compatible metastore. It stores database and table definitions (schema, location on S3, file format, partition info). Spark on EMR, Athena, and Glue jobs all read from the same catalog β€” one source of truth.
python β€” create table in glue catalog via pyspark
# Configure Spark to use Glue Catalog as Hive Metastore
spark = (
    SparkSession.builder
    .appName("catalog_example")
    .config("hive.metastore.client.factory.class",
             "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
    .enableHiveSupport()
    .getOrCreate()
)

# Create a database in Glue Catalog
spark.sql("CREATE DATABASE IF NOT EXISTS gold_layer")

# Create a table β€” stored in Glue, data in S3
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_layer.orders_agg (
        customer_id  STRING,
        total_amount DOUBLE,
        order_count  BIGINT,
        last_order   DATE
    )
    STORED AS PARQUET
    LOCATION 's3://my-datalake/gold/orders_agg/'
    PARTITIONED BY (order_year INT, order_month INT)
""")

# After writing data, add partitions so Athena/Spark can find them
spark.sql("MSCK REPAIR TABLE gold_layer.orders_agg")
# or use: spark.sql("ALTER TABLE gold_layer.orders_agg ADD PARTITION ...")

# Now any Athena query or EMR Spark job can refer to this table by name
df = spark.sql("SELECT * FROM gold_layer.orders_agg WHERE order_year = 2024")
Partition Management
Partition Management in Glue
Glue catalog tracks every partition of a table. Partition metadata must be registered β€” Spark/Athena won't see data written to S3 until the partition is added to the catalog.
python β€” partition management
import boto3

glue = boto3.client("glue", region_name="us-east-1")

# Manually add a partition to Glue Catalog
glue.create_partition(
    DatabaseName="gold_layer",
    TableName="orders_agg",
    PartitionInput={
        "Values": ["2024", "3"],   # order_year=2024, order_month=3
        "StorageDescriptor": {
            "Location": "s3://my-datalake/gold/orders_agg/order_year=2024/order_month=3/",
            "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
            }
        }
    }
)

# Batch add partitions (much faster for large datasets)
partition_inputs = []
for year in [2023, 2024]:
    for month in range(1, 13):
        partition_inputs.append({
            "Values": [str(year), str(month)],
            "StorageDescriptor": {
                "Location": f"s3://my-datalake/gold/orders_agg/order_year={year}/order_month={month}/",
                "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
                "SerdeInfo": {"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"}
            }
        })

# batch_create_partition takes up to 100 at a time
glue.batch_create_partition(
    DatabaseName="gold_layer",
    TableName="orders_agg",
    PartitionInputList=partition_inputs[:100]
)
Schema Versions
Schema Versioning and Evolution in Glue
Glue Catalog tracks schema versions. When you run a crawler or update a table definition, Glue stores the old schema and the new one. You can roll back to a previous schema version.
python β€” schema version management via boto3
# List schema versions for a table
resp = glue.get_table(
    DatabaseName="gold_layer",
    TableName="orders_agg"
)
current_schema = resp["Table"]["StorageDescriptor"]["Columns"]
print("Current columns:", [c["Name"] for c in current_schema])

# Add a new column (schema evolution)
new_columns = current_schema + [{
    "Name": "avg_order_value",
    "Type": "double",
    "Comment": "Average order value per customer"
}]

# Update the table definition
table_input = resp["Table"].copy()
table_input["StorageDescriptor"]["Columns"] = new_columns
# Remove read-only fields Glue doesn't accept in update
for key in ["DatabaseName", "CreateTime", "UpdateTime", "CreatedBy", "IsRegisteredWithLakeFormation"]:
    table_input.pop(key, None)

glue.update_table(
    DatabaseName="gold_layer",
    TableInput=table_input
)
print("Schema evolved β€” new column added")
🐝
Hive Metastore
HADOOP β–Ό
Architecture
Metastore Architecture
The Hive Metastore (HMS) is a relational database (usually MySQL or PostgreSQL) that stores table metadata. Spark connects to it via the HiveMetaStoreClient. It is the original, battle-tested catalog of the big data world.
HIVE METASTORE ARCHITECTURE Spark (on EMR/K8s) Airflow Hive Operator β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ Thrift protocol (port 9083) β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Hive Metastore Service β”‚ β”‚ (HMS β€” Java service) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ JDBC β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ MySQL / PostgreSQL β”‚ β”‚ (metastore DB) β”‚ β”‚ Tables: DBS, TBLS, COLUMNS_V2 β”‚ PARTITIONS, SDS β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Managed vs External
Managed Tables vs External Tables
This is one of the most important Spark/Hive concepts for a data engineer to understand deeply.
🏠 Managed Table (Internal)
Hive/Spark owns BOTH the metadata AND the data files. DROP TABLE deletes the data from the warehouse directory. Use for intermediate or temporary data.
πŸ“‚ External Table
Hive/Spark owns only the metadata. Data lives at a user-specified path. DROP TABLE only removes metadata β€” data stays on S3/HDFS. Use for production data that other tools also access.
python β€” managed vs external tables
# MANAGED table β€” Spark controls the data location
spark.sql("""
    CREATE TABLE IF NOT EXISTS gold_layer.orders_managed (
        customer_id STRING,
        amount      DOUBLE
    ) STORED AS PARQUET
""")
# Data stored at: spark.sql.warehouse.dir/gold_layer.db/orders_managed/
# DROP TABLE gold_layer.orders_managed  β†’  DELETES the files too!

# EXTERNAL table β€” you control the data location
spark.sql("""
    CREATE EXTERNAL TABLE IF NOT EXISTS gold_layer.orders_external (
        customer_id STRING,
        amount      DOUBLE
    )
    STORED AS PARQUET
    LOCATION 's3://my-datalake/gold/orders/'
""")
# DROP TABLE gold_layer.orders_external  β†’  only removes catalog entry
# Data on S3 remains intact!

# Check if a table is managed or external
df = spark.sql("DESCRIBE EXTENDED gold_layer.orders_external")
df.filter(df.col_name == "Type").show()
Production rule: Always use EXTERNAL tables for production data lakes. A misconfigured DROP TABLE on a managed table can permanently delete terabytes of data.
🎯
Unity Catalog (Databricks)
DATABRICKS β–Ό
Concept
Three-Level Namespace
Unity Catalog extends the traditional two-level namespace (database.table) to a three-level namespace: catalog.schema.table. This lets you organise data across multiple catalogs (dev, staging, prod, or by business domain) within one Databricks workspace.
UNITY CATALOG β€” THREE-LEVEL NAMESPACE catalog β”œβ”€β”€ main ← production catalog β”‚ β”œβ”€β”€ sales_schema β”‚ β”‚ β”œβ”€β”€ orders ← main.sales_schema.orders β”‚ β”‚ └── customers ← main.sales_schema.customers β”‚ └── finance_schema β”‚ └── revenue_agg ← main.finance_schema.revenue_agg β”‚ β”œβ”€β”€ dev ← development catalog β”‚ └── sales_schema β”‚ └── orders ← dev.sales_schema.orders β”‚ └── hive_metastore ← legacy catalog (Spark 2-level tables)
python β€” unity catalog in pyspark
# Use three-level namespace in SQL
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA sales_schema")

# Or fully qualify in every query
df = spark.sql("SELECT * FROM main.sales_schema.orders")

# Create a managed Delta table in Unity Catalog
spark.sql("""
    CREATE TABLE IF NOT EXISTS main.sales_schema.orders (
        order_id    BIGINT,
        customer_id STRING,
        amount      DOUBLE,
        order_date  DATE
    )
    USING DELTA
    COMMENT 'Production orders table β€” owner: data-engineering team'
""")

# Grant permissions (Unity Catalog fine-grained access control)
spark.sql("GRANT SELECT ON TABLE main.sales_schema.orders TO `analyst-group`")
spark.sql("GRANT MODIFY ON TABLE main.sales_schema.orders TO `data-engineers`")

# Row filter (only show rows where region = user's region)
spark.sql("""
    ALTER TABLE main.sales_schema.orders
    SET ROW FILTER row_filter_orders ON (region)
""")
Features
Lineage Tracking and Discovery
Unity Catalog automatically tracks column-level lineage for all Delta tables β€” no additional configuration needed. It also powers Databricks' search and discovery experience.
πŸ”—
Auto lineage
Column-level lineage tracked automatically for all Spark SQL, notebooks, and workflows.
πŸ”
Data search
Search tables, columns, notebooks, and dashboards from one unified search bar.
πŸ›‘οΈ
Row + Column security
Row filters and column masks enforced at the catalog level β€” no app-level code needed.
πŸ“¦
Volumes
Unity Catalog Volumes give the same governance to files (CSV, JSON) as to Delta tables.
32.4 β€” DATA CLASSIFICATION

Data Classification

Data classification is the process of labelling data by its sensitivity level β€” so that the right controls (encryption, masking, access restrictions) are automatically applied. It is the foundation of GDPR, HIPAA, and PCI-DSS compliance.

🏷️
PII Tagging, Sensitivity Labels, Tag Propagation, Automated Classification
COMPLIANCE β–Ό
Concept
PII Tagging
PII (Personally Identifiable Information) is any data that can identify a specific person: name, email, phone, SSN, IP address, etc. Tagging PII columns in your catalog is the first step to compliance β€” you can't protect what you haven't identified.
python β€” automated PII detection in pyspark
import re
from pyspark.sql import functions as F

# Rule-based PII column name detection
PII_COLUMN_PATTERNS = [
    re.compile(p, re.IGNORECASE)
    for p in [
        r"email", r"phone", r"ssn", r"social.?security",
        r"credit.?card", r"dob", r"date.?of.?birth",
        r"passport", r"national.?id", r"ip.?address",
        r"first.?name", r"last.?name", r"full.?name",
        r"home.?address", r"zip.?code", r"postal"
    ]
]

def detect_pii_columns(df):
    """Returns list of column names likely to contain PII"""
    pii_cols = []
    for col_name in df.columns:
        for pattern in PII_COLUMN_PATTERNS:
            if pattern.search(col_name):
                pii_cols.append(col_name)
                break
    return pii_cols

# Example usage
df = spark.read.parquet("s3://bucket/bronze/customers")
pii_detected = detect_pii_columns(df)
print(f"PII columns detected: {pii_detected}")
# Output: PII columns detected: ['email', 'phone', 'home_address', 'date_of_birth']

# Content-based PII detection using regex on actual data values
EMAIL_REGEX = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
SSN_REGEX   = r'^\d{3}-\d{2}-\d{4}$'

def is_pii_by_content(df, col_name, sample_size=100):
    """Sample a column and check if values look like PII"""
    sample = (
        df.select(col_name)
        .dropna()
        .limit(sample_size)
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    email_count = sum(1 for v in sample if re.match(EMAIL_REGEX, str(v)))
    return email_count / len(sample) > 0.8   # >80% match = likely PII
Sensitivity Labels
Sensitivity Label Hierarchy
Most enterprises define a tiered sensitivity label system. The label determines the required controls (encryption level, access restriction, retention policy).
LabelExamplesRequired Controls
RESTRICTEDSSN, credit card, medical recordsColumn encryption, strict access, audit logging, no copy to dev
CONFIDENTIALEmail, phone, salary, DOBMasking in non-prod, access review, encrypted at rest
INTERNALOrder amounts, employee IDsInternal access only, no public sharing
PUBLICProduct names, public pricesNo special controls needed
python β€” applying sensitivity labels via datahub SDK
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    GlobalTagsClass, TagAssociationClass, TagKeyClass
)
import datahub.emitter.mce_builder as builder

emitter = DatahubRestEmitter("http://localhost:8080")

# Build a tag for PII RESTRICTED
dataset_urn = builder.make_dataset_urn(
    platform="s3",
    name="bronze/customers",
    env="PROD"
)

# Add RESTRICTED sensitivity label as a tag
tag_urn = builder.make_tag_urn("RESTRICTED")
tags_aspect = GlobalTagsClass(tags=[
    TagAssociationClass(tag=tag_urn)
])

mce = builder.make_lineage_mce(
    upstream_urns=[],
    downstream_urn=dataset_urn
)
# Emit the tag
emitter.emit_mce(mce)
Tag Propagation
Tag Propagation Through Lineage
The most powerful feature in a governed data platform: when a RESTRICTED tag is applied to a source column, every downstream dataset derived from it automatically inherits the tag. This means one classification decision protects the entire data pipeline.
TAG PROPAGATION EXAMPLE bronze/customers.email ── TAG: PII.CONFIDENTIAL β”‚ β–Ό (Spark join + transform) silver/orders.customer_email ── TAG: PII.CONFIDENTIAL ← AUTO-PROPAGATED β”‚ β–Ό (aggregation) gold/revenue.customer_email ── TAG: PII.CONFIDENTIAL ← AUTO-PROPAGATED Result: A GDPR "right to erasure" request means you search for all datasets tagged PII.CONFIDENTIAL that contain customer_email β†’ system shows all 3 tables automatically β†’ you can delete/anonymise in all 3 places
Automated Classification
Automated Classification Pipeline
Rather than tagging tables manually, enterprises build automated classification pipelines that scan new tables on ingestion and apply tags based on column names and content patterns.
python β€” automated classification pipeline
from pyspark.sql import SparkSession
import boto3, re, json
from datetime import datetime

def classify_table(spark, database: str, table: str) -> dict:
    """
    Scan a Glue table and return a classification report.
    Called by Airflow after every new table lands in the bronze layer.
    """
    df = spark.sql(f"SELECT * FROM {database}.{table} LIMIT 500")

    pii_columns = detect_pii_columns(df)    # function from earlier

    # Determine overall table sensitivity label
    if any(p in pii_columns for p in ["ssn", "credit_card", "passport"]):
        label = "RESTRICTED"
    elif pii_columns:
        label = "CONFIDENTIAL"
    else:
        label = "INTERNAL"

    report = {
        "database": database,
        "table": table,
        "classification_label": label,
        "pii_columns": pii_columns,
        "classified_at": datetime.utcnow().isoformat(),
        "row_count": df.count()
    }

    # Store classification in DynamoDB audit table
    dynamo = boto3.resource("dynamodb")
    tbl = dynamo.Table("data_classification_audit")
    tbl.put_item(Item=report)

    # Apply tag in Glue Catalog via boto3
    glue = boto3.client("glue")
    glue.tag_resource(
        ResourceArn=f"arn:aws:glue:us-east-1:123456789:table/{database}/{table}",
        TagsToAdd={"sensitivity": label, "pii_detected": str(bool(pii_columns))}
    )

    return report

# Example usage β€” called from Airflow DAG after table creation
result = classify_table(spark, "bronze_layer", "new_customers_table")
print(f"Table classified as: {result['classification_label']}")
print(f"PII columns: {result['pii_columns']}")
32.5 β€” GOVERNANCE BEST PRACTICES

Governance Best Practices

Tools alone don't create governance. Governance requires the right processes and culture β€” clear ownership, regular access reviews, automated policy enforcement, and a complete audit trail.

βœ…
Data Ownership, Access Reviews, Policy Enforcement, Audit Trail
OPERATIONS β–Ό
Data Ownership
Data Ownership Model
Every dataset must have a named owner β€” a team or person accountable for its quality, documentation, and compliance. The ownership model defines who approves access requests, who fixes data quality issues, and who maintains the schema.
DATA OWNERSHIP MODEL Dataset: main.sales_schema.orders β”œβ”€β”€ Technical Owner: Data Engineering Team (@de-team) β”‚ β†’ responsible for pipeline, schema, SLA β”œβ”€β”€ Business Owner: Sales Analytics (@sales-analytics) β”‚ β†’ responsible for business definitions, KPIs └── Data Steward: Compliance Officer (@compliance) β†’ responsible for PII classification, GDPR Dataset: main.hr_schema.employees (RESTRICTED) β”œβ”€β”€ Technical Owner: Platform Engineering (@platform-eng) β”œβ”€β”€ Business Owner: HR Systems (@hr-systems) └── Data Steward: HR Compliance Lead (@hr-compliance)
python β€” registering ownership in datahub
from datahub.emitter.rest_emitter import DatahubRestEmitter
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import OwnershipClass, OwnerClass, OwnershipTypeClass

emitter = DatahubRestEmitter("http://datahub:8080")

dataset_urn = builder.make_dataset_urn("s3", "gold/orders_agg", "PROD")

ownership = OwnershipClass(owners=[
    OwnerClass(
        owner=builder.make_user_urn("john.doe"),
        type=OwnershipTypeClass.TECHNICAL_OWNER
    ),
    OwnerClass(
        owner=builder.make_group_urn("sales-analytics"),
        type=OwnershipTypeClass.BUSINESS_OWNER
    )
])

print("Ownership registered in DataHub")
Access Review Cycles
Access Review Cycles
Access reviews (sometimes called recertification campaigns) are periodic checks where data owners review who has access to their datasets and revoke any access that is no longer needed. Most enterprises do this quarterly for RESTRICTED data and annually for CONFIDENTIAL data.
python β€” automated access review report
import boto3
from datetime import datetime, timedelta

def generate_access_review_report(database: str, table: str):
    """
    Generate a quarterly access review report for a RESTRICTED table.
    Lists all IAM roles/users with access and flags any unused access.
    """
    lf = boto3.client("lakeformation")
    iam = boto3.client("iam")
    cloudtrail = boto3.client("cloudtrail")

    # 1. Get all principals with access to this table
    permissions = lf.list_permissions(
        Resource={
            "Table": {
                "DatabaseName": database,
                "Name": table
            }
        }
    )["PrincipalResourcePermissions"]

    # 2. For each principal, check when they last accessed this table
    ninety_days_ago = datetime.utcnow() - timedelta(days=90)
    report = []

    for perm in permissions:
        principal = perm["Principal"]["DataLakePrincipalIdentifier"]
        actions = perm["Permissions"]

        # Check CloudTrail for last access
        events = cloudtrail.lookup_events(
            LookupAttributes=[{"AttributeKey": "Username", "AttributeValue": principal}],
            StartTime=ninety_days_ago,
            EndTime=datetime.utcnow()
        )["Events"]

        last_access = events[0]["EventTime"] if events else None
        stale = last_access is None  # no access in 90 days = stale

        report.append({
            "principal": principal,
            "permissions": actions,
            "last_access": last_access,
            "flag": "⚠️ REVIEW β€” no access in 90 days" if stale else "βœ… Active"
        })

    return report

# Run quarterly from Airflow
report = generate_access_review_report("gold_layer", "orders_agg")
for row in report:
    print(f"{row['flag']}  {row['principal']}  last_access={row['last_access']}")
Policy Enforcement Automation
Policy Enforcement Automation
Governance policies should be enforced automatically β€” not by humans remembering to check. Tools like Lake Formation, Ranger, and Unity Catalog enforce access policies at query time. CI/CD pipelines should validate governance rules before deploying new tables.
python β€” governance policy check in ci/cd pipeline
#!/usr/bin/env python3
# governance_check.py β€” run in CI before deploying new Spark job

import boto3, sys

def check_governance_policy(database: str, table: str):
    """
    Before deploying a Spark job that writes to a table, verify:
    1. Table has an owner tag
    2. Table has a sensitivity label tag
    3. If RESTRICTED, encryption is enabled
    """
    glue = boto3.client("glue")
    violations = []

    try:
        table_meta = glue.get_table(DatabaseName=database, TableName=table)["Table"]
        tags = glue.get_tags(
            ResourceArn=f"arn:aws:glue:us-east-1:123456789:table/{database}/{table}"
        )["Tags"]
    except glue.exceptions.EntityNotFoundException:
        print(f"Table {database}.{table} not found in catalog")
        return True   # new table β€” skip check

    # Policy 1: Every table must have an owner
    if "owner" not in tags:
        violations.append("❌ No 'owner' tag β€” table must have an owner")

    # Policy 2: Every table must have a sensitivity label
    if "sensitivity" not in tags:
        violations.append("❌ No 'sensitivity' tag β€” classify before deploying")

    # Policy 3: RESTRICTED tables must store data in a KMS-encrypted S3 location
    if tags.get("sensitivity") == "RESTRICTED":
        location = table_meta["StorageDescriptor"]["Location"]
        s3 = boto3.client("s3")
        bucket = location.split("/")[2]
        encryption = s3.get_bucket_encryption(Bucket=bucket)
        rules = encryption["ServerSideEncryptionConfiguration"]["Rules"]
        if not any(r["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "aws:kms"
                  for r in rules):
            violations.append("❌ RESTRICTED table must be in KMS-encrypted bucket")

    if violations:
        print("GOVERNANCE POLICY VIOLATIONS:")
        for v in violations:
            print(f"  {v}")
        sys.exit(1)   # fail the CI pipeline
    else:
        print(f"βœ… All governance policies satisfied for {database}.{table}")

if __name__ == "__main__":
    check_governance_policy(sys.argv[1], sys.argv[2])
Audit Trail
Audit Trail Requirements
A complete audit trail answers: "Who accessed what data, when, and from where?" This is mandatory for GDPR, HIPAA, and PCI-DSS. The audit trail must be tamper-proof (write-once storage) and retained for a legally required period (often 7 years for financial data).
python β€” centralised audit logging from spark jobs
import boto3, json
from datetime import datetime

def log_data_access(
    job_name: str,
    user: str,
    action: str,          # READ, WRITE, DELETE
    database: str,
    table: str,
    row_count: int,
    run_id: str
):
    """
    Write a tamper-proof audit log entry to S3 (write-once)
    and DynamoDB (queryable audit table).
    """
    audit_record = {
        "audit_id":  f"{run_id}-{action}",
        "timestamp": datetime.utcnow().isoformat(),
        "job_name":  job_name,
        "user":      user,
        "action":    action,
        "database":  database,
        "table":     table,
        "row_count": row_count,
        "run_id":    run_id
    }

    # Write to S3 audit bucket (enable Object Lock for tamper-proof)
    s3 = boto3.client("s3")
    key = f"audit-logs/{datetime.utcnow().strftime('%Y/%m/%d')}/{run_id}.json"
    s3.put_object(
        Bucket="my-audit-logs-bucket",
        Key=key,
        Body=json.dumps(audit_record),
        ContentType="application/json"
    )

    # Also write to DynamoDB for fast querying
    dynamo = boto3.resource("dynamodb")
    dynamo.Table("data_access_audit").put_item(Item=audit_record)

    print(f"Audit log written: {action} on {database}.{table} by {user}")

# Call at start and end of every Spark ETL job
log_data_access(
    job_name="orders_gold_etl",
    user="iam-role/emr-spark-role",
    action="READ",
    database="silver_layer",
    table="orders_clean",
    row_count=5000000,
    run_id="run-2024-03-01-001"
)
S3 Object Lock: Enable COMPLIANCE mode Object Lock on your audit bucket with a 7-year retention period. This makes audit logs legally tamper-proof β€” not even AWS root can delete them during the retention window.
What your audit trail must capture: timestamp, who (user/role), what action (read/write/delete), which dataset, how many rows affected, and the job/run ID. For GDPR, you also need the legal basis for processing.
MODULE 32 β€” QUIZ & SUMMARY

Test Your Knowledge

Answer these questions to solidify your Module 32 understanding. These are the kinds of questions that appear in senior data engineer interviews.

Q1. What is the role of OpenLineage in data governance?
βœ… Correct! OpenLineage is a vendor-neutral event format standard. Tools emit lineage events in this format; backends like Marquez or DataHub receive and store them.
❌ Incorrect. OpenLineage is a standard / event format β€” not a database, not a Spark optimizer. It defines what a lineage event looks like so all tools can speak the same language.
Q2. What is the difference between a Managed Table and an External Table in Hive/Spark?
βœ… Correct! This is a critical distinction. In production, always use EXTERNAL tables so a dropped table doesn't delete your data lake.
❌ Incorrect. The key difference is about data ownership on DROP. Managed = Spark owns the data (deletes on DROP). External = you own the data (catalog entry only is removed).
Q3. What does "tag propagation" mean in the context of Apache Atlas?
βœ… Correct! Tag propagation follows the lineage graph. Tag a raw PII column once β€” every downstream column derived from it gets the PII tag automatically. This is Atlas's superpower for GDPR compliance.
❌ Incorrect. Tag propagation specifically means lineage-based inheritance. If source column A is tagged PII, every column B, C, D derived from A through Spark transforms is also automatically tagged PII.
Q4. Unity Catalog introduces a three-level namespace. Which is correct?
βœ… Correct! The three levels are catalog β†’ schema β†’ table. Example: main.sales_schema.orders. This lets you have dev/staging/prod catalogs within one Databricks workspace.
❌ Incorrect. Unity Catalog's three-level namespace is catalog.schema.table. E.g. main.sales_schema.orders. Not region, workspace, or cluster.
Q5. You are building an automated governance pipeline. A new table lands in your bronze layer. What should happen automatically?
βœ… Correct! This is the governance-first ingestion pattern: classify β†’ tag β†’ register ownership β†’ audit log β†’ then expose. New data should never be accessible to consumers before it's classified.
❌ Incorrect. In a governed data platform, new tables should be automatically classified (PII scan, sensitivity label, ownership) before being made accessible. Exposing unclassified data is a compliance violation.
πŸ“‹
Module 32 β€” Complete Summary
β–Ό
Key Takeaways
What You Learned
MODULE 32 β€” DATA GOVERNANCE SUMMARY 32.1 DATA LINEAGE β”œβ”€β”€ OpenLineage = open standard JSON event format for lineage β”œβ”€β”€ Spark listener auto-emits events on job complete (zero code changes) β”œβ”€β”€ Events include: inputs, outputs, schemas, column-level derivations └── Marquez = open-source backend that receives + visualises lineage events 32.2 METADATA MANAGEMENT β”œβ”€β”€ DataHub (LinkedIn OSS) β€” modern, best for cloud-native stacks β”‚ β”œβ”€β”€ Ingestion via Spark listener OR ingestion recipes (Glue, Hive) β”‚ β”œβ”€β”€ Search datasets by name, column, tag, or owner β”‚ └── Lineage graph + ownership registration via SDK └── Apache Atlas β€” Hadoop ecosystem, first-class tag propagation β”œβ”€β”€ Type system models entities + relationships β”œβ”€β”€ Spark Atlas Connector captures lineage automatically └── Tags propagate through lineage graph (GDPR superpower) 32.3 DATA CATALOGS β”œβ”€β”€ Glue Catalog β€” AWS-managed Hive-compatible metastore β”‚ β”œβ”€β”€ table definitions, partitions, schema versions β”‚ └── boto3 APIs: create_table, batch_create_partition, update_table β”œβ”€β”€ Hive Metastore β€” original big data catalog (MySQL/Postgres backend) β”‚ β”œβ”€β”€ MANAGED tables (Spark owns data β€” DELETE on DROP) β”‚ └── EXTERNAL tables (you own data β€” safe for production!) └── Unity Catalog β€” Databricks three-level: catalog.schema.table β”œβ”€β”€ Auto column-level lineage, row filters, column masks └── Volumes for file governance (CSV, JSON, etc.) 32.4 DATA CLASSIFICATION β”œβ”€β”€ PII tagging β€” detect by column name patterns + content regex β”œβ”€β”€ Sensitivity labels: RESTRICTED β†’ CONFIDENTIAL β†’ INTERNAL β†’ PUBLIC β”œβ”€β”€ Tag propagation β€” PII tag on source β†’ auto-inherited downstream └── Automated classification pipeline β€” scan on ingest, tag, audit 32.5 GOVERNANCE BEST PRACTICES β”œβ”€β”€ Data ownership β€” every dataset has Technical + Business + Steward owner β”œβ”€β”€ Access review cycles β€” quarterly for RESTRICTED, annual for CONFIDENTIAL β”œβ”€β”€ Policy enforcement in CI/CD β€” check tags + encryption before deploy └── Audit trail β€” who + what + when + how many rows, S3 Object Lock (write-once)
Interview Cheat Sheet
Common Interview Questions for Module 32
  • Q: What is OpenLineage and how does it work with Spark? β€” Open standard JSON event format. Add the listener to SparkSession config. Events are emitted automatically on job complete. Marquez or DataHub receives them.
  • Q: DataHub vs Apache Atlas β€” when to use each? β€” DataHub for modern cloud-native stacks (Spark, dbt, Airflow, Kafka). Atlas for Hadoop/Cloudera environments with Ranger integration.
  • Q: What happens when you DROP a managed table vs external table? β€” Managed: data files deleted. External: only catalog entry removed, S3 data survives.
  • Q: What is Unity Catalog's three-level namespace? β€” catalog.schema.table. E.g. main.sales_schema.orders. Enables dev/prod isolation and cross-workspace sharing.
  • Q: How would you implement automated PII classification? β€” Column name regex + content sampling regex β†’ assign sensitivity label β†’ write to DynamoDB audit table β†’ tag in Glue Catalog via boto3.
  • Q: What is tag propagation in Atlas? β€” PII classification on a source column propagates through lineage to all downstream columns. Makes GDPR right-to-erasure auditable via a catalog search.
πŸŽ‰ Module 32 Complete! You now understand data lineage (OpenLineage + Marquez), metadata management (DataHub + Atlas), data catalogs (Glue, Hive Metastore, Unity Catalog), classification, and enterprise governance practices. Next: Module 33 β€” Data Quality Ecosystem (Great Expectations, Deequ, Soda, Monte Carlo, Data Observability).