Implementation Example - Quick Start
Overview
This document provides a concrete implementation example showing how all the components work together. This is a simplified MVP version suitable for initial development.
Complete Working Example
1. Download Worker (Python)
# download_worker.py
"""
Ephemeral worker that downloads Terraform state from TFC API.
Security: No disk persistence, encrypted memory, auto-destruct.
"""
import os
import hashlib
import requests
from cryptography.fernet import Fernet
from dataclasses import dataclass
from typing import Optional
import gc
@dataclass
class EncryptedState:
ciphertext: bytes
workspace_id: str
encryption_key_id: str
class TerraformStateDownloader:
def __init__(self, workspace_id: str, tfc_token: str):
self.workspace_id = workspace_id
self.tfc_token = tfc_token
self.base_url = "https://app.terraform.io/api/v2"
self.encryption_key = Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)
def download_current_state(self) -> EncryptedState:
"""
Download latest state and encrypt in memory immediately.
"""
headers = {
"Authorization": f"Bearer {self.tfc_token}",
"Content-Type": "application/vnd.api+json"
}
# Get current state version
response = requests.get(
f"{self.base_url}/workspaces/{self.workspace_id}/current-state-version",
headers=headers,
timeout=30
)
response.raise_for_status()
state_version = response.json()
download_url = state_version["data"]["attributes"]["hosted-state-download-url"]
# Download state JSON
state_response = requests.get(download_url, timeout=30)
state_response.raise_for_status()
raw_state_bytes = state_response.content
# Encrypt immediately in memory
encrypted_state = EncryptedState(
ciphertext=self.cipher.encrypt(raw_state_bytes),
workspace_id=self.workspace_id,
encryption_key_id=hashlib.sha256(self.encryption_key).hexdigest()[:16]
)
# Destroy raw state from memory
del raw_state_bytes
gc.collect()
return encrypted_state
def decrypt_state(self, encrypted_state: EncryptedState) -> dict:
"""
Decrypt state for processing (use immediately then destroy).
"""
import json
decrypted_bytes = self.cipher.decrypt(encrypted_state.ciphertext)
state_dict = json.loads(decrypted_bytes)
# Clear decrypted bytes from memory
del decrypted_bytes
gc.collect()
return state_dict
# Usage
if __name__ == "__main__":
workspace_id = os.environ["TFC_WORKSPACE_ID"]
tfc_token = os.environ["TFC_TOKEN"]
downloader = TerraformStateDownloader(workspace_id, tfc_token)
encrypted_state = downloader.download_current_state()
print(f"Downloaded and encrypted state for workspace: {workspace_id}")
print(f"Encryption key ID: {encrypted_state.encryption_key_id}")
2. Sanitization Engine (Python)
# sanitization_engine.py
"""
Multi-stage sanitization engine with rule-based filtering.
"""
import re
import hashlib
import math
from typing import Any, Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum
class Action(Enum):
REDACT = "REDACT"
MASK = "MASK"
HASH = "HASH"
PRESERVE = "PRESERVE"
class SensitivityLevel(Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
@dataclass
class Rule:
attribute_path: str
action: Action
sensitivity: SensitivityLevel
reason: str
@dataclass
class AuditLogEntry:
workspace_id: str
resource_type: str
attribute_path: str
action: str
sensitivity: str
reason: str
original_value_hash: str
class SanitizationEngine:
def __init__(self):
self.rules = self._load_base_rules()
self.audit_log = []
# Critical patterns (always redact)
self.critical_patterns = [
(re.compile(r"-----BEGIN .*PRIVATE KEY-----"), "PRIVATE_KEY"),
(re.compile(r"AKIA[0-9A-Z]{16}"), "AWS_ACCESS_KEY"),
(re.compile(r"AIza[0-9A-Za-z-_]{35}"), "GOOGLE_API_KEY"),
(re.compile(r"ghp_[0-9a-zA-Z]{36}"), "GITHUB_TOKEN"),
]
def _load_base_rules(self) -> Dict[str, Dict[str, Rule]]:
"""
Load base sanitization rules.
In production, load from YAML files.
"""
return {
"google_sql_database_instance": {
"root_password": Rule(
attribute_path="root_password",
action=Action.REDACT,
sensitivity=SensitivityLevel.CRITICAL,
reason="Database root password"
),
"private_ip_address": Rule(
attribute_path="private_ip_address",
action=Action.MASK,
sensitivity=SensitivityLevel.HIGH,
reason="Internal network topology"
),
"connection_name": Rule(
attribute_path="connection_name",
action=Action.PRESERVE,
sensitivity=SensitivityLevel.LOW,
reason="Connection name needed for service catalog"
),
},
"google_service_account_key": {
"private_key": Rule(
attribute_path="private_key",
action=Action.REDACT,
sensitivity=SensitivityLevel.CRITICAL,
reason="Service account private key"
),
},
}
def sanitize_state(self, state: dict, workspace_id: str) -> dict:
"""
Sanitize entire Terraform state.
"""
sanitized_resources = []
for resource in state.get("resources", []):
sanitized_resource = self._sanitize_resource(
resource,
workspace_id
)
sanitized_resources.append(sanitized_resource)
return {
"terraform_version": state.get("terraform_version"),
"serial": state.get("serial"),
"resources": sanitized_resources,
"sanitization_version": "v1.0.0"
}
def _sanitize_resource(self, resource: dict, workspace_id: str) -> dict:
"""
Sanitize a single Terraform resource.
"""
resource_type = resource["type"]
resource_name = resource["name"]
sanitized_instances = []
for instance in resource.get("instances", []):
sanitized_attributes = self._sanitize_attributes(
resource_type,
instance.get("attributes", {}),
workspace_id
)
sanitized_instances.append({
"schema_version": instance.get("schema_version"),
"attributes": sanitized_attributes
})
return {
"type": resource_type,
"name": resource_name,
"provider": resource.get("provider"),
"instances": sanitized_instances
}
def _sanitize_attributes(
self,
resource_type: str,
attributes: dict,
workspace_id: str,
path_prefix: str = ""
) -> dict:
"""
Recursively sanitize attributes.
"""
sanitized = {}
for key, value in attributes.items():
attr_path = f"{path_prefix}.{key}" if path_prefix else key
# Stage 1: Check exact attribute name match
if resource_type in self.rules and attr_path in self.rules[resource_type]:
rule = self.rules[resource_type][attr_path]
sanitized[key] = self._apply_action(
rule.action,
value,
attr_path
)
self._log_sanitization(
workspace_id,
resource_type,
attr_path,
rule,
value
)
# Stage 2: Pattern-based detection
elif self._matches_critical_pattern(value):
sanitized[key] = "[REDACTED:SECRET_PATTERN]"
self._log_sanitization(
workspace_id,
resource_type,
attr_path,
Rule(attr_path, Action.REDACT, SensitivityLevel.CRITICAL, "Pattern match"),
value
)
# Stage 3: Entropy analysis
elif isinstance(value, str) and self._is_high_entropy(value):
sanitized[key] = "[REDACTED:HIGH_ENTROPY]"
self._log_sanitization(
workspace_id,
resource_type,
attr_path,
Rule(attr_path, Action.REDACT, SensitivityLevel.HIGH, "High entropy"),
value
)
# Recursive: Handle nested dictionaries
elif isinstance(value, dict):
sanitized[key] = self._sanitize_attributes(
resource_type,
value,
workspace_id,
attr_path
)
# Recursive: Handle lists
elif isinstance(value, list):
sanitized[key] = [
self._sanitize_attributes(resource_type, item, workspace_id, attr_path)
if isinstance(item, dict)
else item
for item in value
]
# Safe to preserve
else:
sanitized[key] = value
return sanitized
def _apply_action(self, action: Action, value: Any, attr_path: str) -> Any:
"""
Apply sanitization action to value.
"""
if action == Action.REDACT:
return f"[REDACTED:{attr_path.upper()}]"
elif action == Action.MASK:
if isinstance(value, str) and re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value):
# Mask IP address
octets = value.split(".")
return f"{octets[0]}.x.x.x"
return "[MASKED]"
elif action == Action.HASH:
return f"sha256:{hashlib.sha256(str(value).encode()).hexdigest()[:16]}"
elif action == Action.PRESERVE:
return value
def _matches_critical_pattern(self, value: Any) -> bool:
"""
Check if value matches any critical secret pattern.
"""
if not isinstance(value, str):
return False
for pattern, _ in self.critical_patterns:
if pattern.search(value):
return True
return False
def _is_high_entropy(self, value: str, threshold: float = 4.5) -> bool:
"""
Calculate Shannon entropy to detect random strings (likely secrets).
"""
if len(value) < 16: # Too short to be meaningful
return False
# Calculate frequency of each character
freq = {}
for char in value:
freq[char] = freq.get(char, 0) + 1
# Calculate Shannon entropy
entropy = 0
for count in freq.values():
p = count / len(value)
entropy -= p * math.log2(p)
return entropy > threshold
def _log_sanitization(
self,
workspace_id: str,
resource_type: str,
attribute_path: str,
rule: Rule,
original_value: Any
):
"""
Log sanitization action to audit trail.
"""
self.audit_log.append(AuditLogEntry(
workspace_id=workspace_id,
resource_type=resource_type,
attribute_path=attribute_path,
action=rule.action.value,
sensitivity=rule.sensitivity.value,
reason=rule.reason,
original_value_hash=hashlib.sha256(
str(original_value).encode()
).hexdigest()
))
def get_audit_log(self) -> List[Dict]:
"""
Return audit log for compliance reporting.
"""
return [
{
"workspace_id": entry.workspace_id,
"resource_type": entry.resource_type,
"attribute_path": entry.attribute_path,
"action": entry.action,
"sensitivity": entry.sensitivity,
"reason": entry.reason,
"original_value_hash": entry.original_value_hash
}
for entry in self.audit_log
]
# Usage
if __name__ == "__main__":
# Example Terraform state
state = {
"terraform_version": "1.5.0",
"serial": 42,
"resources": [
{
"type": "google_sql_database_instance",
"name": "main",
"provider": "provider[\"registry.terraform.io/hashicorp/google\"]",
"instances": [
{
"schema_version": 0,
"attributes": {
"name": "prod-db",
"root_password": "SuperSecret123!",
"private_ip_address": "10.128.0.45",
"public_ip_address": "34.123.45.67",
"connection_name": "my-project:us-central1:prod-db"
}
}
]
}
]
}
engine = SanitizationEngine()
sanitized_state = engine.sanitize_state(state, "ws-abc123")
print("Sanitized State:")
import json
print(json.dumps(sanitized_state, indent=2))
print("\nAudit Log:")
for entry in engine.get_audit_log():
print(f" - {entry['resource_type']}.{entry['attribute_path']}: {entry['action']}")
3. Entity Transformer (Python)
# entity_transformer.py
"""
Transform sanitized Terraform resources into Backstage entities.
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class BackstageEntity:
apiVersion: str
kind: str
metadata: Dict
spec: Dict
class EntityTransformer:
def __init__(self):
self.transformers = {
"google_project": self._transform_project,
"google_sql_database_instance": self._transform_database,
"google_storage_bucket": self._transform_storage,
"google_compute_instance": self._transform_compute,
}
def transform_state(self, sanitized_state: dict, workspace_id: str) -> List[BackstageEntity]:
"""
Transform all resources in sanitized state to Backstage entities.
"""
entities = []
for resource in sanitized_state.get("resources", []):
resource_type = resource["type"]
if resource_type in self.transformers:
for instance in resource.get("instances", []):
entity = self.transformers[resource_type](
resource["name"],
instance["attributes"],
workspace_id
)
if entity:
entities.append(entity)
return entities
def _transform_project(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_project to Backstage Component.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Component",
metadata={
"name": attrs.get("project_id", name),
"description": attrs.get("name", ""),
"labels": {
"environment": self._extract_environment(attrs),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_project",
"google.com/project-id": attrs.get("project_id"),
},
},
spec={
"type": "gcp-project",
"lifecycle": "production",
"owner": "platform-team",
}
)
def _transform_database(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_sql_database_instance to Backstage Resource.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": attrs.get("name", name),
"labels": {
"database-type": "cloud-sql",
"database-version": attrs.get("database_version", "unknown"),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_sql_database_instance",
"google.com/region": attrs.get("region"),
},
},
spec={
"type": "database",
"owner": "data-team",
"dependsOn": [
f"component:{attrs.get('project')}"
] if attrs.get('project') else [],
}
)
def _transform_storage(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_storage_bucket to Backstage Resource.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": attrs.get("name", name),
"labels": {
"storage-class": attrs.get("storage_class", "STANDARD"),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_storage_bucket",
"google.com/location": attrs.get("location"),
},
},
spec={
"type": "storage",
"owner": "platform-team",
}
)
def _transform_compute(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_compute_instance to Backstage Resource.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": attrs.get("name", name),
"labels": {
"machine-type": attrs.get("machine_type", "unknown"),
"zone": attrs.get("zone", "unknown"),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_compute_instance",
},
},
spec={
"type": "compute-instance",
"owner": "compute-team",
}
)
def _extract_environment(self, attrs: dict) -> str:
"""
Extract environment from labels or name.
"""
labels = attrs.get("labels", {})
if "environment" in labels:
return labels["environment"]
name = attrs.get("name", "").lower()
if "prod" in name:
return "production"
elif "dev" in name:
return "development"
elif "staging" in name or "stg" in name:
return "staging"
return "unknown"
# Usage
if __name__ == "__main__":
import json
sanitized_state = {
"resources": [
{
"type": "google_sql_database_instance",
"name": "main",
"instances": [
{
"attributes": {
"name": "prod-db",
"database_version": "POSTGRES_14",
"region": "us-central1",
"project": "my-project"
}
}
]
}
]
}
transformer = EntityTransformer()
entities = transformer.transform_state(sanitized_state, "ws-abc123")
print("Backstage Entities:")
for entity in entities:
print(json.dumps({
"apiVersion": entity.apiVersion,
"kind": entity.kind,
"metadata": entity.metadata,
"spec": entity.spec
}, indent=2))
4. Database Loader (Python)
# database_loader.py
"""
Load Backstage entities into PostgreSQL database.
"""
import psycopg2
import json
from typing import List
from entity_transformer import BackstageEntity
class DatabaseLoader:
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
self.cursor = self.conn.cursor()
def load_entities(
self,
entities: List[BackstageEntity],
tenant_id: str,
execution_id: str
):
"""
Load entities into database with tenant isolation.
"""
try:
# Set tenant ID for row-level security
self.cursor.execute(
f"SET app.current_tenant_id = '{tenant_id}'"
)
# Check if execution already completed (idempotency)
if self._is_execution_complete(execution_id):
print(f"Execution {execution_id} already completed, skipping")
return
# Upsert entities
for entity in entities:
self._upsert_entity(entity, tenant_id)
# Mark execution as complete
self._mark_execution_complete(execution_id, len(entities))
self.conn.commit()
print(f"Successfully loaded {len(entities)} entities for tenant {tenant_id}")
except Exception as e:
self.conn.rollback()
print(f"Error loading entities: {e}")
raise
def _upsert_entity(self, entity: BackstageEntity, tenant_id: str):
"""
Insert or update entity (idempotent).
"""
entity_ref = self._make_entity_ref(
entity.kind,
entity.metadata["name"],
entity.metadata.get("namespace", "default")
)
query = """
INSERT INTO catalog.entities (
tenant_id,
entity_ref,
kind,
namespace,
name,
metadata,
spec,
sanitization_version,
created_at,
updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()
)
ON CONFLICT (tenant_id, entity_ref)
DO UPDATE SET
metadata = EXCLUDED.metadata,
spec = EXCLUDED.spec,
updated_at = NOW()
"""
self.cursor.execute(query, (
tenant_id,
entity_ref,
entity.kind,
entity.metadata.get("namespace", "default"),
entity.metadata["name"],
json.dumps(entity.metadata),
json.dumps(entity.spec),
"v1.0.0"
))
def _is_execution_complete(self, execution_id: str) -> bool:
"""
Check if execution already completed (idempotency).
"""
query = "SELECT 1 FROM catalog.executions WHERE execution_id = %s"
self.cursor.execute(query, (execution_id,))
return self.cursor.fetchone() is not None
def _mark_execution_complete(self, execution_id: str, entity_count: int):
"""
Mark execution as complete.
"""
query = """
INSERT INTO catalog.executions (execution_id, entity_count, completed_at)
VALUES (%s, %s, NOW())
"""
self.cursor.execute(query, (execution_id, entity_count))
def _make_entity_ref(self, kind: str, name: str, namespace: str) -> str:
"""
Create Backstage entity reference.
"""
return f"{kind.lower()}:{namespace}/{name}"
def close(self):
"""Close database connection."""
self.cursor.close()
self.conn.close()
# Usage
if __name__ == "__main__":
from entity_transformer import BackstageEntity
# Example entities
entities = [
BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": "prod-db",
"namespace": "default",
"labels": {"database-type": "cloud-sql"}
},
spec={
"type": "database",
"owner": "data-team"
}
)
]
loader = DatabaseLoader("postgresql://user:pass@localhost:5432/backstage")
loader.load_entities(entities, tenant_id="client-1", execution_id="exec-123")
loader.close()
5. End-to-End Workflow
# main.py
"""
Complete end-to-end sanitization workflow.
"""
import os
from download_worker import TerraformStateDownloader
from sanitization_engine import SanitizationEngine
from entity_transformer import EntityTransformer
from database_loader import DatabaseLoader
def process_workspace(
workspace_id: str,
tfc_token: str,
tenant_id: str,
db_connection_string: str
):
"""
Complete workflow: Download → Sanitize → Transform → Load
"""
execution_id = f"exec-{workspace_id}-{int(time.time())}"
try:
# Step 1: Download state
print(f"[{workspace_id}] Downloading state...")
downloader = TerraformStateDownloader(workspace_id, tfc_token)
encrypted_state = downloader.download_current_state()
# Step 2: Decrypt and sanitize
print(f"[{workspace_id}] Sanitizing state...")
raw_state = downloader.decrypt_state(encrypted_state)
engine = SanitizationEngine()
sanitized_state = engine.sanitize_state(raw_state, workspace_id)
# Step 3: Transform to Backstage entities
print(f"[{workspace_id}] Transforming to entities...")
transformer = EntityTransformer()
entities = transformer.transform_state(sanitized_state, workspace_id)
# Step 4: Load into database
print(f"[{workspace_id}] Loading into database...")
loader = DatabaseLoader(db_connection_string)
loader.load_entities(entities, tenant_id, execution_id)
loader.close()
# Step 5: Store audit log
print(f"[{workspace_id}] Storing audit log...")
audit_log = engine.get_audit_log()
store_audit_log(workspace_id, audit_log)
print(f"[{workspace_id}] ✅ Successfully processed")
return {
"workspace_id": workspace_id,
"status": "success",
"entities_loaded": len(entities),
"sanitizations": len(audit_log)
}
except Exception as e:
print(f"[{workspace_id}] ❌ Failed: {e}")
return {
"workspace_id": workspace_id,
"status": "failed",
"error": str(e)
}
def store_audit_log(workspace_id: str, audit_log: list):
"""
Store audit log to GCS/S3 for compliance.
"""
import json
from google.cloud import storage
client = storage.Client()
bucket = client.bucket("sanitization-audit-logs")
blob = bucket.blob(f"{workspace_id}/audit-{int(time.time())}.json")
blob.upload_from_string(
json.dumps(audit_log, indent=2),
content_type="application/json"
)
if __name__ == "__main__":
import time
# Configuration from environment
workspace_id = os.environ["TFC_WORKSPACE_ID"]
tfc_token = os.environ["TFC_TOKEN"]
tenant_id = os.environ["TENANT_ID"]
db_connection_string = os.environ["DATABASE_URL"]
# Process workspace
result = process_workspace(
workspace_id,
tfc_token,
tenant_id,
db_connection_string
)
print(f"\nResult: {result}")
Running the Example
1. Setup
# Install dependencies
pip install requests cryptography psycopg2-binary google-cloud-storage
# Set environment variables
export TFC_WORKSPACE_ID="ws-abc123"
export TFC_TOKEN="your-tfc-token"
export TENANT_ID="client-1"
export DATABASE_URL="postgresql://user:pass@localhost:5432/backstage"
2. Database Schema
-- Create database schema
CREATE SCHEMA IF NOT EXISTS catalog;
-- Entities table
CREATE TABLE catalog.entities (
entity_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(255) NOT NULL,
entity_ref VARCHAR(512) NOT NULL,
kind VARCHAR(64) NOT NULL,
namespace VARCHAR(255) NOT NULL DEFAULT 'default',
name VARCHAR(255) NOT NULL,
metadata JSONB NOT NULL,
spec JSONB NOT NULL,
sanitization_version VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT entity_ref_per_tenant UNIQUE (tenant_id, entity_ref)
);
-- Executions table (for idempotency)
CREATE TABLE catalog.executions (
execution_id VARCHAR(255) PRIMARY KEY,
entity_count INTEGER NOT NULL,
completed_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Row-level security
ALTER TABLE catalog.entities ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON catalog.entities
USING (tenant_id = current_setting('app.current_tenant_id'));
3. Run
python main.py
Expected Output
[ws-abc123] Downloading state...
[ws-abc123] Sanitizing state...
[ws-abc123] Transforming to entities...
[ws-abc123] Loading into database...
[ws-abc123] Storing audit log...
[ws-abc123] ✅ Successfully processed
Result: {
'workspace_id': 'ws-abc123',
'status': 'success',
'entities_loaded': 3,
'sanitizations': 5
}
What This Example Demonstrates
✅ Ephemeral Processing: Raw state never persists to disk ✅ Multi-Stage Sanitization: Attribute name, pattern, entropy detection ✅ Audit Trail: Complete log of all sanitization actions ✅ Idempotency: Safe to re-run without duplicates ✅ Tenant Isolation: Per-client database separation
Next Steps
- Add Orchestration: Wrap this in Temporal workflow or Step Function
- Add More Rules: Expand
_load_base_rules()with full taxonomy - Add Client Overrides: Load per-client rules from configuration
- Add Performance Monitoring: Instrument with metrics and tracing
- Add DLQ: Handle permanent failures with dead letter queue
Production Considerations
⚠️ This is a simplified example. For production:
- Use Secrets Manager (not environment variables)
- Add Comprehensive Error Handling (retries, DLQ, alerting)
- Implement Connection Pooling (for database)
- Add Schema Validation (before database insert)
- Enable Workload Identity (GCP) or IAM Roles (AWS)
- Add Performance Instrumentation (OpenTelemetry)
- Load Rules from Files (not hardcoded)
- Add Final Security Verification (before database insert)
References
- Full Design: SECURITY-PIPELINE-SUMMARY.md
- Architecture: sanitization-pipeline-architecture.md
- Rules Engine: sanitization-rules-engine.md
- Technology Choices: technology-choices.md