Data Governance
Data Governance answers the question: "Who owns what data, where did it come from, who can access it, and how do we ensure it's trustworthy?" It is the set of policies, tools, and processes that make data reliable, discoverable, and compliant at enterprise scale.
Data Lineage
Data lineage is the ability to answer: "Where did this column's value come from?" and "Which downstream tables are broken if I change this source?" It is the backbone of trustworthy data engineering.
Without a standard, every tool invents its own lineage format β silos. With OpenLineage, you get a unified lineage graph across all tools.
# Option 1: Pass as spark-submit flags
# spark-submit \
# --conf spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener \
# --conf spark.openlineage.transport.type=http \
# --conf spark.openlineage.transport.url=http://marquez:5000 \
# --conf spark.openlineage.transport.endpoint=/api/v1/lineage \
# --conf spark.openlineage.namespace=my-spark-cluster \
# my_etl_job.py
# Option 2: SparkSession config in code
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("orders_etl")
.config("spark.extraListeners",
"io.openlineage.spark.agent.OpenLineageSparkListener")
.config("spark.openlineage.transport.type", "http")
.config("spark.openlineage.transport.url", "http://marquez:5000")
.config("spark.openlineage.transport.endpoint", "/api/v1/lineage")
.config("spark.openlineage.namespace", "production-spark")
.getOrCreate()
)
# Your normal PySpark code β lineage emitted AUTOMATICALLY
df = spark.read.parquet("s3://bucket/bronze/orders/")
agg = df.groupBy("customer_id").agg({"amount": "sum"})
agg.write.parquet("s3://bucket/gold/orders_agg/")
# OpenLineage listener will emit:
# START event when the job begins
# COMPLETE event when it finishes (with input/output datasets + schemas)
io.openlineage:openlineage-spark_2.12:1.x.x
total_revenue was derived from input columns amount and discount."
from pyspark.sql import functions as F
# Column lineage is captured automatically by the listener when you do transforms
df = spark.read.parquet("s3://bucket/bronze/sales")
result = df.withColumn(
"total_revenue",
F.col("amount") - F.col("discount") # OpenLineage captures this derivation
).withColumn(
"profit_margin",
F.col("total_revenue") / F.col("cost")
)
result.write.parquet("s3://bucket/gold/revenue_report")
# The emitted lineage event will contain a ColumnLineageDatasetFacet:
# total_revenue β [bronze/sales.amount, bronze/sales.discount]
# profit_margin β [bronze/sales.total_revenue, bronze/sales.cost]
# This lets analysts trace where any output column came from.
It was created by WeWork and donated to the Linux Foundation. It is one of two major OpenLineage backends (the other being DataHub).
# docker-compose.yml β run Marquez locally
version: '3'
services:
marquez:
image: marquezproject/marquez:latest
ports:
- "5000:5000" # API port
- "5001:5001" # Admin port
environment:
- MARQUEZ_PORT=5000
- MARQUEZ_ADMIN_PORT=5001
marquez-web:
image: marquezproject/marquez-web:latest
ports:
- "3000:3000" # UI port β open http://localhost:3000
environment:
- MARQUEZ_HOST=marquez
- MARQUEZ_PORT=5000
postgres:
image: postgres:14
environment:
POSTGRES_USER: marquez
POSTGRES_PASSWORD: password
POSTGRES_DB: marquez
docker-compose up β open http://localhost:3000 β run a Spark job with OpenLineage listener configured β watch the lineage graph appear in real time.
import requests
MARQUEZ = "http://localhost:5000"
# List all namespaces
resp = requests.get(f"{MARQUEZ}/api/v1/namespaces")
print(resp.json())
# List datasets in a namespace
resp = requests.get(f"{MARQUEZ}/api/v1/namespaces/production-spark/datasets")
datasets = resp.json()['datasets']
for ds in datasets:
print(ds['name'], ds['description'])
# Get lineage graph for a specific dataset
resp = requests.get(
f"{MARQUEZ}/api/v1/lineage",
params={
"nodeId": "dataset:production-spark:gold/orders_agg",
"depth": 3 # how many hops upstream/downstream
}
)
lineage = resp.json()
print("Upstream nodes:", lineage['graph'])
Metadata Management
Metadata is data about your data β schema, owner, tags, lineage, quality scores. Without a metadata management platform, data engineers spend hours hunting for the right table. With one, discovery and trust become instant.
# Install: pip install acryl-datahub[spark]
# In your SparkSession builder:
spark = (
SparkSession.builder
.appName("orders_pipeline")
.config("spark.extraListeners",
"datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server",
"http://datahub-gms:8080") # DataHub GMS endpoint
.config("spark.datahub.rest.token",
"your_personal_access_token")
.config("spark.datahub.metadata.pipeline.platformInstance",
"production")
.getOrCreate()
)
# Every Spark job now auto-emits lineage to DataHub
# datahub_glue_recipe.yml
# Run: datahub ingest -c datahub_glue_recipe.yml
source:
type: glue
config:
aws_region: us-east-1
extract_transforms: true # pull Glue ETL job lineage too
sink:
type: datahub-rest
config:
server: "http://localhost:8080"
token: "your_token_here"
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
# Connect to DataHub
graph = DataHubGraph(DatahubClientConfig(
server="http://localhost:8080",
token="your_token"
))
# Get lineage for a dataset
entity_urn = "urn:li:dataset:(urn:li:dataPlatform:s3,gold/orders_agg,PROD)"
lineage = graph.get_entity_lineage(
entity_urn=entity_urn,
direction="UPSTREAM", # or "DOWNSTREAM"
max_hops=3
)
for upstream in lineage.upstreams:
print(f"Upstream: {upstream.dataset}")
# Search for datasets
results = graph.search_entities(
entity_type="dataset",
query="orders",
start=0,
count=10
)
for r in results:
print(r['entity'])
# spark-submit with Spark Atlas Connector
# --packages com.hortonworks.spark:spark-atlas-connector_2.12:0.1.0-SNAPSHOT
spark = (
SparkSession.builder
.appName("orders_pipeline")
.config("spark.extraListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker")
.config("spark.sql.queryExecutionListeners",
"com.hortonworks.spark.atlas.SparkAtlasEventTracker")
.config("spark.sql.streaming.streamingQueryListeners",
"com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker")
.getOrCreate()
)
# atlas-application.properties also needed on driver:
# atlas.client.type=kafka
# atlas.kafka.bootstrap.servers=kafka:9092
PII, SENSITIVE, FINANCIAL. These tags propagate through the lineage graph, so if a source column is tagged PII, every downstream column derived from it automatically inherits that tag.
import requests
import json
ATLAS = "http://atlas-server:21000"
AUTH = ("admin", "admin")
# 1. Find the entity GUID for a table
resp = requests.get(
f"{ATLAS}/api/atlas/v2/search/basic",
params={"typeName": "hive_table", "query": "customers"},
auth=AUTH
)
guid = resp.json()["entities"][0]["guid"]
# 2. Add a PII classification to the entity
payload = [
{
"typeName": "PII",
"attributes": {}
}
]
resp = requests.post(
f"{ATLAS}/api/atlas/v2/entity/guid/{guid}/classifications",
json=payload,
auth=AUTH
)
print("Classification added:", resp.status_code)
# 3. Tag propagation β all downstream datasets derived from
# 'customers' will automatically inherit the PII tag
# This makes compliance audits trivial: search for all PII-tagged datasets
PII once β every derived table, view, and report that uses it gets tagged automatically. GDPR compliance becomes a search query.
| Aspect | DataHub | Apache Atlas |
|---|---|---|
| Created by | Apache / Cloudera | |
| Best fit | Modern cloud-native stacks (Spark, dbt, Airflow) | Hadoop ecosystem (Hive, HBase, Cloudera) |
| Lineage standard | OpenLineage + custom | Custom Kafka events |
| UI | Modern React UI, fast search | Older UI, functional |
| Graph DB | Custom entity store + Elasticsearch | JanusGraph + Solr |
| Tag propagation | Yes (via lineage) | Yes (first-class feature) |
| Active dev | Very active (Acryl Data) | Moderate |
Data Catalogs
A data catalog is the table registry that Spark consults to find where tables live, what their schemas are, and how they are partitioned. Without a catalog, Spark cannot resolve table names in SQL queries.
# Configure Spark to use Glue Catalog as Hive Metastore
spark = (
SparkSession.builder
.appName("catalog_example")
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.enableHiveSupport()
.getOrCreate()
)
# Create a database in Glue Catalog
spark.sql("CREATE DATABASE IF NOT EXISTS gold_layer")
# Create a table β stored in Glue, data in S3
spark.sql("""
CREATE TABLE IF NOT EXISTS gold_layer.orders_agg (
customer_id STRING,
total_amount DOUBLE,
order_count BIGINT,
last_order DATE
)
STORED AS PARQUET
LOCATION 's3://my-datalake/gold/orders_agg/'
PARTITIONED BY (order_year INT, order_month INT)
""")
# After writing data, add partitions so Athena/Spark can find them
spark.sql("MSCK REPAIR TABLE gold_layer.orders_agg")
# or use: spark.sql("ALTER TABLE gold_layer.orders_agg ADD PARTITION ...")
# Now any Athena query or EMR Spark job can refer to this table by name
df = spark.sql("SELECT * FROM gold_layer.orders_agg WHERE order_year = 2024")
import boto3
glue = boto3.client("glue", region_name="us-east-1")
# Manually add a partition to Glue Catalog
glue.create_partition(
DatabaseName="gold_layer",
TableName="orders_agg",
PartitionInput={
"Values": ["2024", "3"], # order_year=2024, order_month=3
"StorageDescriptor": {
"Location": "s3://my-datalake/gold/orders_agg/order_year=2024/order_month=3/",
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
}
}
}
)
# Batch add partitions (much faster for large datasets)
partition_inputs = []
for year in [2023, 2024]:
for month in range(1, 13):
partition_inputs.append({
"Values": [str(year), str(month)],
"StorageDescriptor": {
"Location": f"s3://my-datalake/gold/orders_agg/order_year={year}/order_month={month}/",
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"SerdeInfo": {"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"}
}
})
# batch_create_partition takes up to 100 at a time
glue.batch_create_partition(
DatabaseName="gold_layer",
TableName="orders_agg",
PartitionInputList=partition_inputs[:100]
)
# List schema versions for a table
resp = glue.get_table(
DatabaseName="gold_layer",
TableName="orders_agg"
)
current_schema = resp["Table"]["StorageDescriptor"]["Columns"]
print("Current columns:", [c["Name"] for c in current_schema])
# Add a new column (schema evolution)
new_columns = current_schema + [{
"Name": "avg_order_value",
"Type": "double",
"Comment": "Average order value per customer"
}]
# Update the table definition
table_input = resp["Table"].copy()
table_input["StorageDescriptor"]["Columns"] = new_columns
# Remove read-only fields Glue doesn't accept in update
for key in ["DatabaseName", "CreateTime", "UpdateTime", "CreatedBy", "IsRegisteredWithLakeFormation"]:
table_input.pop(key, None)
glue.update_table(
DatabaseName="gold_layer",
TableInput=table_input
)
print("Schema evolved β new column added")
HiveMetaStoreClient. It is the original, battle-tested catalog of the big data world.
DROP TABLE deletes the data from the warehouse directory. Use for intermediate or temporary data.DROP TABLE only removes metadata β data stays on S3/HDFS. Use for production data that other tools also access.# MANAGED table β Spark controls the data location
spark.sql("""
CREATE TABLE IF NOT EXISTS gold_layer.orders_managed (
customer_id STRING,
amount DOUBLE
) STORED AS PARQUET
""")
# Data stored at: spark.sql.warehouse.dir/gold_layer.db/orders_managed/
# DROP TABLE gold_layer.orders_managed β DELETES the files too!
# EXTERNAL table β you control the data location
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS gold_layer.orders_external (
customer_id STRING,
amount DOUBLE
)
STORED AS PARQUET
LOCATION 's3://my-datalake/gold/orders/'
""")
# DROP TABLE gold_layer.orders_external β only removes catalog entry
# Data on S3 remains intact!
# Check if a table is managed or external
df = spark.sql("DESCRIBE EXTENDED gold_layer.orders_external")
df.filter(df.col_name == "Type").show()
catalog.schema.table. This lets you organise data across multiple catalogs (dev, staging, prod, or by business domain) within one Databricks workspace.
# Use three-level namespace in SQL
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA sales_schema")
# Or fully qualify in every query
df = spark.sql("SELECT * FROM main.sales_schema.orders")
# Create a managed Delta table in Unity Catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS main.sales_schema.orders (
order_id BIGINT,
customer_id STRING,
amount DOUBLE,
order_date DATE
)
USING DELTA
COMMENT 'Production orders table β owner: data-engineering team'
""")
# Grant permissions (Unity Catalog fine-grained access control)
spark.sql("GRANT SELECT ON TABLE main.sales_schema.orders TO `analyst-group`")
spark.sql("GRANT MODIFY ON TABLE main.sales_schema.orders TO `data-engineers`")
# Row filter (only show rows where region = user's region)
spark.sql("""
ALTER TABLE main.sales_schema.orders
SET ROW FILTER row_filter_orders ON (region)
""")
Data Classification
Data classification is the process of labelling data by its sensitivity level β so that the right controls (encryption, masking, access restrictions) are automatically applied. It is the foundation of GDPR, HIPAA, and PCI-DSS compliance.
import re
from pyspark.sql import functions as F
# Rule-based PII column name detection
PII_COLUMN_PATTERNS = [
re.compile(p, re.IGNORECASE)
for p in [
r"email", r"phone", r"ssn", r"social.?security",
r"credit.?card", r"dob", r"date.?of.?birth",
r"passport", r"national.?id", r"ip.?address",
r"first.?name", r"last.?name", r"full.?name",
r"home.?address", r"zip.?code", r"postal"
]
]
def detect_pii_columns(df):
"""Returns list of column names likely to contain PII"""
pii_cols = []
for col_name in df.columns:
for pattern in PII_COLUMN_PATTERNS:
if pattern.search(col_name):
pii_cols.append(col_name)
break
return pii_cols
# Example usage
df = spark.read.parquet("s3://bucket/bronze/customers")
pii_detected = detect_pii_columns(df)
print(f"PII columns detected: {pii_detected}")
# Output: PII columns detected: ['email', 'phone', 'home_address', 'date_of_birth']
# Content-based PII detection using regex on actual data values
EMAIL_REGEX = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
SSN_REGEX = r'^\d{3}-\d{2}-\d{4}$'
def is_pii_by_content(df, col_name, sample_size=100):
"""Sample a column and check if values look like PII"""
sample = (
df.select(col_name)
.dropna()
.limit(sample_size)
.rdd.flatMap(lambda x: x)
.collect()
)
email_count = sum(1 for v in sample if re.match(EMAIL_REGEX, str(v)))
return email_count / len(sample) > 0.8 # >80% match = likely PII
| Label | Examples | Required Controls |
|---|---|---|
| RESTRICTED | SSN, credit card, medical records | Column encryption, strict access, audit logging, no copy to dev |
| CONFIDENTIAL | Email, phone, salary, DOB | Masking in non-prod, access review, encrypted at rest |
| INTERNAL | Order amounts, employee IDs | Internal access only, no public sharing |
| PUBLIC | Product names, public prices | No special controls needed |
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
GlobalTagsClass, TagAssociationClass, TagKeyClass
)
import datahub.emitter.mce_builder as builder
emitter = DatahubRestEmitter("http://localhost:8080")
# Build a tag for PII RESTRICTED
dataset_urn = builder.make_dataset_urn(
platform="s3",
name="bronze/customers",
env="PROD"
)
# Add RESTRICTED sensitivity label as a tag
tag_urn = builder.make_tag_urn("RESTRICTED")
tags_aspect = GlobalTagsClass(tags=[
TagAssociationClass(tag=tag_urn)
])
mce = builder.make_lineage_mce(
upstream_urns=[],
downstream_urn=dataset_urn
)
# Emit the tag
emitter.emit_mce(mce)
RESTRICTED tag is applied to a source column, every downstream dataset derived from it automatically inherits the tag. This means one classification decision protects the entire data pipeline.
from pyspark.sql import SparkSession
import boto3, re, json
from datetime import datetime
def classify_table(spark, database: str, table: str) -> dict:
"""
Scan a Glue table and return a classification report.
Called by Airflow after every new table lands in the bronze layer.
"""
df = spark.sql(f"SELECT * FROM {database}.{table} LIMIT 500")
pii_columns = detect_pii_columns(df) # function from earlier
# Determine overall table sensitivity label
if any(p in pii_columns for p in ["ssn", "credit_card", "passport"]):
label = "RESTRICTED"
elif pii_columns:
label = "CONFIDENTIAL"
else:
label = "INTERNAL"
report = {
"database": database,
"table": table,
"classification_label": label,
"pii_columns": pii_columns,
"classified_at": datetime.utcnow().isoformat(),
"row_count": df.count()
}
# Store classification in DynamoDB audit table
dynamo = boto3.resource("dynamodb")
tbl = dynamo.Table("data_classification_audit")
tbl.put_item(Item=report)
# Apply tag in Glue Catalog via boto3
glue = boto3.client("glue")
glue.tag_resource(
ResourceArn=f"arn:aws:glue:us-east-1:123456789:table/{database}/{table}",
TagsToAdd={"sensitivity": label, "pii_detected": str(bool(pii_columns))}
)
return report
# Example usage β called from Airflow DAG after table creation
result = classify_table(spark, "bronze_layer", "new_customers_table")
print(f"Table classified as: {result['classification_label']}")
print(f"PII columns: {result['pii_columns']}")
Governance Best Practices
Tools alone don't create governance. Governance requires the right processes and culture β clear ownership, regular access reviews, automated policy enforcement, and a complete audit trail.
from datahub.emitter.rest_emitter import DatahubRestEmitter
import datahub.emitter.mce_builder as builder
from datahub.metadata.schema_classes import OwnershipClass, OwnerClass, OwnershipTypeClass
emitter = DatahubRestEmitter("http://datahub:8080")
dataset_urn = builder.make_dataset_urn("s3", "gold/orders_agg", "PROD")
ownership = OwnershipClass(owners=[
OwnerClass(
owner=builder.make_user_urn("john.doe"),
type=OwnershipTypeClass.TECHNICAL_OWNER
),
OwnerClass(
owner=builder.make_group_urn("sales-analytics"),
type=OwnershipTypeClass.BUSINESS_OWNER
)
])
print("Ownership registered in DataHub")
import boto3
from datetime import datetime, timedelta
def generate_access_review_report(database: str, table: str):
"""
Generate a quarterly access review report for a RESTRICTED table.
Lists all IAM roles/users with access and flags any unused access.
"""
lf = boto3.client("lakeformation")
iam = boto3.client("iam")
cloudtrail = boto3.client("cloudtrail")
# 1. Get all principals with access to this table
permissions = lf.list_permissions(
Resource={
"Table": {
"DatabaseName": database,
"Name": table
}
}
)["PrincipalResourcePermissions"]
# 2. For each principal, check when they last accessed this table
ninety_days_ago = datetime.utcnow() - timedelta(days=90)
report = []
for perm in permissions:
principal = perm["Principal"]["DataLakePrincipalIdentifier"]
actions = perm["Permissions"]
# Check CloudTrail for last access
events = cloudtrail.lookup_events(
LookupAttributes=[{"AttributeKey": "Username", "AttributeValue": principal}],
StartTime=ninety_days_ago,
EndTime=datetime.utcnow()
)["Events"]
last_access = events[0]["EventTime"] if events else None
stale = last_access is None # no access in 90 days = stale
report.append({
"principal": principal,
"permissions": actions,
"last_access": last_access,
"flag": "β οΈ REVIEW β no access in 90 days" if stale else "β
Active"
})
return report
# Run quarterly from Airflow
report = generate_access_review_report("gold_layer", "orders_agg")
for row in report:
print(f"{row['flag']} {row['principal']} last_access={row['last_access']}")
#!/usr/bin/env python3
# governance_check.py β run in CI before deploying new Spark job
import boto3, sys
def check_governance_policy(database: str, table: str):
"""
Before deploying a Spark job that writes to a table, verify:
1. Table has an owner tag
2. Table has a sensitivity label tag
3. If RESTRICTED, encryption is enabled
"""
glue = boto3.client("glue")
violations = []
try:
table_meta = glue.get_table(DatabaseName=database, TableName=table)["Table"]
tags = glue.get_tags(
ResourceArn=f"arn:aws:glue:us-east-1:123456789:table/{database}/{table}"
)["Tags"]
except glue.exceptions.EntityNotFoundException:
print(f"Table {database}.{table} not found in catalog")
return True # new table β skip check
# Policy 1: Every table must have an owner
if "owner" not in tags:
violations.append("β No 'owner' tag β table must have an owner")
# Policy 2: Every table must have a sensitivity label
if "sensitivity" not in tags:
violations.append("β No 'sensitivity' tag β classify before deploying")
# Policy 3: RESTRICTED tables must store data in a KMS-encrypted S3 location
if tags.get("sensitivity") == "RESTRICTED":
location = table_meta["StorageDescriptor"]["Location"]
s3 = boto3.client("s3")
bucket = location.split("/")[2]
encryption = s3.get_bucket_encryption(Bucket=bucket)
rules = encryption["ServerSideEncryptionConfiguration"]["Rules"]
if not any(r["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "aws:kms"
for r in rules):
violations.append("β RESTRICTED table must be in KMS-encrypted bucket")
if violations:
print("GOVERNANCE POLICY VIOLATIONS:")
for v in violations:
print(f" {v}")
sys.exit(1) # fail the CI pipeline
else:
print(f"β
All governance policies satisfied for {database}.{table}")
if __name__ == "__main__":
check_governance_policy(sys.argv[1], sys.argv[2])
import boto3, json
from datetime import datetime
def log_data_access(
job_name: str,
user: str,
action: str, # READ, WRITE, DELETE
database: str,
table: str,
row_count: int,
run_id: str
):
"""
Write a tamper-proof audit log entry to S3 (write-once)
and DynamoDB (queryable audit table).
"""
audit_record = {
"audit_id": f"{run_id}-{action}",
"timestamp": datetime.utcnow().isoformat(),
"job_name": job_name,
"user": user,
"action": action,
"database": database,
"table": table,
"row_count": row_count,
"run_id": run_id
}
# Write to S3 audit bucket (enable Object Lock for tamper-proof)
s3 = boto3.client("s3")
key = f"audit-logs/{datetime.utcnow().strftime('%Y/%m/%d')}/{run_id}.json"
s3.put_object(
Bucket="my-audit-logs-bucket",
Key=key,
Body=json.dumps(audit_record),
ContentType="application/json"
)
# Also write to DynamoDB for fast querying
dynamo = boto3.resource("dynamodb")
dynamo.Table("data_access_audit").put_item(Item=audit_record)
print(f"Audit log written: {action} on {database}.{table} by {user}")
# Call at start and end of every Spark ETL job
log_data_access(
job_name="orders_gold_etl",
user="iam-role/emr-spark-role",
action="READ",
database="silver_layer",
table="orders_clean",
row_count=5000000,
run_id="run-2024-03-01-001"
)
COMPLIANCE mode Object Lock on your audit bucket with a 7-year retention period. This makes audit logs legally tamper-proof β not even AWS root can delete them during the retention window.
Test Your Knowledge
Answer these questions to solidify your Module 32 understanding. These are the kinds of questions that appear in senior data engineer interviews.
main.sales_schema.orders. This lets you have dev/staging/prod catalogs within one Databricks workspace.main.sales_schema.orders. Not region, workspace, or cluster.- Q: What is OpenLineage and how does it work with Spark? β Open standard JSON event format. Add the listener to SparkSession config. Events are emitted automatically on job complete. Marquez or DataHub receives them.
- Q: DataHub vs Apache Atlas β when to use each? β DataHub for modern cloud-native stacks (Spark, dbt, Airflow, Kafka). Atlas for Hadoop/Cloudera environments with Ranger integration.
- Q: What happens when you DROP a managed table vs external table? β Managed: data files deleted. External: only catalog entry removed, S3 data survives.
- Q: What is Unity Catalog's three-level namespace? β catalog.schema.table. E.g. main.sales_schema.orders. Enables dev/prod isolation and cross-workspace sharing.
- Q: How would you implement automated PII classification? β Column name regex + content sampling regex β assign sensitivity label β write to DynamoDB audit table β tag in Glue Catalog via boto3.
- Q: What is tag propagation in Atlas? β PII classification on a source column propagates through lineage to all downstream columns. Makes GDPR right-to-erasure auditable via a catalog search.