Skip to main content

Implementation Example - Quick Start

Overview

This document provides a concrete implementation example showing how all the components work together. This is a simplified MVP version suitable for initial development.


Complete Working Example

1. Download Worker (Python)

# download_worker.py
"""
Ephemeral worker that downloads Terraform state from TFC API.
Security: No disk persistence, encrypted memory, auto-destruct.
"""

import os
import hashlib
import requests
from cryptography.fernet import Fernet
from dataclasses import dataclass
from typing import Optional
import gc

@dataclass
class EncryptedState:
ciphertext: bytes
workspace_id: str
encryption_key_id: str

class TerraformStateDownloader:
def __init__(self, workspace_id: str, tfc_token: str):
self.workspace_id = workspace_id
self.tfc_token = tfc_token
self.base_url = "https://app.terraform.io/api/v2"
self.encryption_key = Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)

def download_current_state(self) -> EncryptedState:
"""
Download latest state and encrypt in memory immediately.
"""
headers = {
"Authorization": f"Bearer {self.tfc_token}",
"Content-Type": "application/vnd.api+json"
}

# Get current state version
response = requests.get(
f"{self.base_url}/workspaces/{self.workspace_id}/current-state-version",
headers=headers,
timeout=30
)
response.raise_for_status()

state_version = response.json()
download_url = state_version["data"]["attributes"]["hosted-state-download-url"]

# Download state JSON
state_response = requests.get(download_url, timeout=30)
state_response.raise_for_status()

raw_state_bytes = state_response.content

# Encrypt immediately in memory
encrypted_state = EncryptedState(
ciphertext=self.cipher.encrypt(raw_state_bytes),
workspace_id=self.workspace_id,
encryption_key_id=hashlib.sha256(self.encryption_key).hexdigest()[:16]
)

# Destroy raw state from memory
del raw_state_bytes
gc.collect()

return encrypted_state

def decrypt_state(self, encrypted_state: EncryptedState) -> dict:
"""
Decrypt state for processing (use immediately then destroy).
"""
import json
decrypted_bytes = self.cipher.decrypt(encrypted_state.ciphertext)
state_dict = json.loads(decrypted_bytes)

# Clear decrypted bytes from memory
del decrypted_bytes
gc.collect()

return state_dict


# Usage
if __name__ == "__main__":
workspace_id = os.environ["TFC_WORKSPACE_ID"]
tfc_token = os.environ["TFC_TOKEN"]

downloader = TerraformStateDownloader(workspace_id, tfc_token)
encrypted_state = downloader.download_current_state()

print(f"Downloaded and encrypted state for workspace: {workspace_id}")
print(f"Encryption key ID: {encrypted_state.encryption_key_id}")

2. Sanitization Engine (Python)

# sanitization_engine.py
"""
Multi-stage sanitization engine with rule-based filtering.
"""

import re
import hashlib
import math
from typing import Any, Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum

class Action(Enum):
REDACT = "REDACT"
MASK = "MASK"
HASH = "HASH"
PRESERVE = "PRESERVE"

class SensitivityLevel(Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"

@dataclass
class Rule:
attribute_path: str
action: Action
sensitivity: SensitivityLevel
reason: str

@dataclass
class AuditLogEntry:
workspace_id: str
resource_type: str
attribute_path: str
action: str
sensitivity: str
reason: str
original_value_hash: str

class SanitizationEngine:
def __init__(self):
self.rules = self._load_base_rules()
self.audit_log = []

# Critical patterns (always redact)
self.critical_patterns = [
(re.compile(r"-----BEGIN .*PRIVATE KEY-----"), "PRIVATE_KEY"),
(re.compile(r"AKIA[0-9A-Z]{16}"), "AWS_ACCESS_KEY"),
(re.compile(r"AIza[0-9A-Za-z-_]{35}"), "GOOGLE_API_KEY"),
(re.compile(r"ghp_[0-9a-zA-Z]{36}"), "GITHUB_TOKEN"),
]

def _load_base_rules(self) -> Dict[str, Dict[str, Rule]]:
"""
Load base sanitization rules.
In production, load from YAML files.
"""
return {
"google_sql_database_instance": {
"root_password": Rule(
attribute_path="root_password",
action=Action.REDACT,
sensitivity=SensitivityLevel.CRITICAL,
reason="Database root password"
),
"private_ip_address": Rule(
attribute_path="private_ip_address",
action=Action.MASK,
sensitivity=SensitivityLevel.HIGH,
reason="Internal network topology"
),
"connection_name": Rule(
attribute_path="connection_name",
action=Action.PRESERVE,
sensitivity=SensitivityLevel.LOW,
reason="Connection name needed for service catalog"
),
},
"google_service_account_key": {
"private_key": Rule(
attribute_path="private_key",
action=Action.REDACT,
sensitivity=SensitivityLevel.CRITICAL,
reason="Service account private key"
),
},
}

def sanitize_state(self, state: dict, workspace_id: str) -> dict:
"""
Sanitize entire Terraform state.
"""
sanitized_resources = []

for resource in state.get("resources", []):
sanitized_resource = self._sanitize_resource(
resource,
workspace_id
)
sanitized_resources.append(sanitized_resource)

return {
"terraform_version": state.get("terraform_version"),
"serial": state.get("serial"),
"resources": sanitized_resources,
"sanitization_version": "v1.0.0"
}

def _sanitize_resource(self, resource: dict, workspace_id: str) -> dict:
"""
Sanitize a single Terraform resource.
"""
resource_type = resource["type"]
resource_name = resource["name"]

sanitized_instances = []
for instance in resource.get("instances", []):
sanitized_attributes = self._sanitize_attributes(
resource_type,
instance.get("attributes", {}),
workspace_id
)

sanitized_instances.append({
"schema_version": instance.get("schema_version"),
"attributes": sanitized_attributes
})

return {
"type": resource_type,
"name": resource_name,
"provider": resource.get("provider"),
"instances": sanitized_instances
}

def _sanitize_attributes(
self,
resource_type: str,
attributes: dict,
workspace_id: str,
path_prefix: str = ""
) -> dict:
"""
Recursively sanitize attributes.
"""
sanitized = {}

for key, value in attributes.items():
attr_path = f"{path_prefix}.{key}" if path_prefix else key

# Stage 1: Check exact attribute name match
if resource_type in self.rules and attr_path in self.rules[resource_type]:
rule = self.rules[resource_type][attr_path]
sanitized[key] = self._apply_action(
rule.action,
value,
attr_path
)
self._log_sanitization(
workspace_id,
resource_type,
attr_path,
rule,
value
)

# Stage 2: Pattern-based detection
elif self._matches_critical_pattern(value):
sanitized[key] = "[REDACTED:SECRET_PATTERN]"
self._log_sanitization(
workspace_id,
resource_type,
attr_path,
Rule(attr_path, Action.REDACT, SensitivityLevel.CRITICAL, "Pattern match"),
value
)

# Stage 3: Entropy analysis
elif isinstance(value, str) and self._is_high_entropy(value):
sanitized[key] = "[REDACTED:HIGH_ENTROPY]"
self._log_sanitization(
workspace_id,
resource_type,
attr_path,
Rule(attr_path, Action.REDACT, SensitivityLevel.HIGH, "High entropy"),
value
)

# Recursive: Handle nested dictionaries
elif isinstance(value, dict):
sanitized[key] = self._sanitize_attributes(
resource_type,
value,
workspace_id,
attr_path
)

# Recursive: Handle lists
elif isinstance(value, list):
sanitized[key] = [
self._sanitize_attributes(resource_type, item, workspace_id, attr_path)
if isinstance(item, dict)
else item
for item in value
]

# Safe to preserve
else:
sanitized[key] = value

return sanitized

def _apply_action(self, action: Action, value: Any, attr_path: str) -> Any:
"""
Apply sanitization action to value.
"""
if action == Action.REDACT:
return f"[REDACTED:{attr_path.upper()}]"

elif action == Action.MASK:
if isinstance(value, str) and re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value):
# Mask IP address
octets = value.split(".")
return f"{octets[0]}.x.x.x"
return "[MASKED]"

elif action == Action.HASH:
return f"sha256:{hashlib.sha256(str(value).encode()).hexdigest()[:16]}"

elif action == Action.PRESERVE:
return value

def _matches_critical_pattern(self, value: Any) -> bool:
"""
Check if value matches any critical secret pattern.
"""
if not isinstance(value, str):
return False

for pattern, _ in self.critical_patterns:
if pattern.search(value):
return True

return False

def _is_high_entropy(self, value: str, threshold: float = 4.5) -> bool:
"""
Calculate Shannon entropy to detect random strings (likely secrets).
"""
if len(value) < 16: # Too short to be meaningful
return False

# Calculate frequency of each character
freq = {}
for char in value:
freq[char] = freq.get(char, 0) + 1

# Calculate Shannon entropy
entropy = 0
for count in freq.values():
p = count / len(value)
entropy -= p * math.log2(p)

return entropy > threshold

def _log_sanitization(
self,
workspace_id: str,
resource_type: str,
attribute_path: str,
rule: Rule,
original_value: Any
):
"""
Log sanitization action to audit trail.
"""
self.audit_log.append(AuditLogEntry(
workspace_id=workspace_id,
resource_type=resource_type,
attribute_path=attribute_path,
action=rule.action.value,
sensitivity=rule.sensitivity.value,
reason=rule.reason,
original_value_hash=hashlib.sha256(
str(original_value).encode()
).hexdigest()
))

def get_audit_log(self) -> List[Dict]:
"""
Return audit log for compliance reporting.
"""
return [
{
"workspace_id": entry.workspace_id,
"resource_type": entry.resource_type,
"attribute_path": entry.attribute_path,
"action": entry.action,
"sensitivity": entry.sensitivity,
"reason": entry.reason,
"original_value_hash": entry.original_value_hash
}
for entry in self.audit_log
]


# Usage
if __name__ == "__main__":
# Example Terraform state
state = {
"terraform_version": "1.5.0",
"serial": 42,
"resources": [
{
"type": "google_sql_database_instance",
"name": "main",
"provider": "provider[\"registry.terraform.io/hashicorp/google\"]",
"instances": [
{
"schema_version": 0,
"attributes": {
"name": "prod-db",
"root_password": "SuperSecret123!",
"private_ip_address": "10.128.0.45",
"public_ip_address": "34.123.45.67",
"connection_name": "my-project:us-central1:prod-db"
}
}
]
}
]
}

engine = SanitizationEngine()
sanitized_state = engine.sanitize_state(state, "ws-abc123")

print("Sanitized State:")
import json
print(json.dumps(sanitized_state, indent=2))

print("\nAudit Log:")
for entry in engine.get_audit_log():
print(f" - {entry['resource_type']}.{entry['attribute_path']}: {entry['action']}")

3. Entity Transformer (Python)

# entity_transformer.py
"""
Transform sanitized Terraform resources into Backstage entities.
"""

from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class BackstageEntity:
apiVersion: str
kind: str
metadata: Dict
spec: Dict

class EntityTransformer:
def __init__(self):
self.transformers = {
"google_project": self._transform_project,
"google_sql_database_instance": self._transform_database,
"google_storage_bucket": self._transform_storage,
"google_compute_instance": self._transform_compute,
}

def transform_state(self, sanitized_state: dict, workspace_id: str) -> List[BackstageEntity]:
"""
Transform all resources in sanitized state to Backstage entities.
"""
entities = []

for resource in sanitized_state.get("resources", []):
resource_type = resource["type"]

if resource_type in self.transformers:
for instance in resource.get("instances", []):
entity = self.transformers[resource_type](
resource["name"],
instance["attributes"],
workspace_id
)
if entity:
entities.append(entity)

return entities

def _transform_project(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_project to Backstage Component.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Component",
metadata={
"name": attrs.get("project_id", name),
"description": attrs.get("name", ""),
"labels": {
"environment": self._extract_environment(attrs),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_project",
"google.com/project-id": attrs.get("project_id"),
},
},
spec={
"type": "gcp-project",
"lifecycle": "production",
"owner": "platform-team",
}
)

def _transform_database(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_sql_database_instance to Backstage Resource.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": attrs.get("name", name),
"labels": {
"database-type": "cloud-sql",
"database-version": attrs.get("database_version", "unknown"),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_sql_database_instance",
"google.com/region": attrs.get("region"),
},
},
spec={
"type": "database",
"owner": "data-team",
"dependsOn": [
f"component:{attrs.get('project')}"
] if attrs.get('project') else [],
}
)

def _transform_storage(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_storage_bucket to Backstage Resource.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": attrs.get("name", name),
"labels": {
"storage-class": attrs.get("storage_class", "STANDARD"),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_storage_bucket",
"google.com/location": attrs.get("location"),
},
},
spec={
"type": "storage",
"owner": "platform-team",
}
)

def _transform_compute(self, name: str, attrs: dict, workspace_id: str) -> Optional[BackstageEntity]:
"""
Transform google_compute_instance to Backstage Resource.
"""
return BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": attrs.get("name", name),
"labels": {
"machine-type": attrs.get("machine_type", "unknown"),
"zone": attrs.get("zone", "unknown"),
"cloud-provider": "gcp",
},
"annotations": {
"terraform.io/workspace": workspace_id,
"terraform.io/resource-type": "google_compute_instance",
},
},
spec={
"type": "compute-instance",
"owner": "compute-team",
}
)

def _extract_environment(self, attrs: dict) -> str:
"""
Extract environment from labels or name.
"""
labels = attrs.get("labels", {})
if "environment" in labels:
return labels["environment"]

name = attrs.get("name", "").lower()
if "prod" in name:
return "production"
elif "dev" in name:
return "development"
elif "staging" in name or "stg" in name:
return "staging"

return "unknown"


# Usage
if __name__ == "__main__":
import json

sanitized_state = {
"resources": [
{
"type": "google_sql_database_instance",
"name": "main",
"instances": [
{
"attributes": {
"name": "prod-db",
"database_version": "POSTGRES_14",
"region": "us-central1",
"project": "my-project"
}
}
]
}
]
}

transformer = EntityTransformer()
entities = transformer.transform_state(sanitized_state, "ws-abc123")

print("Backstage Entities:")
for entity in entities:
print(json.dumps({
"apiVersion": entity.apiVersion,
"kind": entity.kind,
"metadata": entity.metadata,
"spec": entity.spec
}, indent=2))

4. Database Loader (Python)

# database_loader.py
"""
Load Backstage entities into PostgreSQL database.
"""

import psycopg2
import json
from typing import List
from entity_transformer import BackstageEntity

class DatabaseLoader:
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
self.cursor = self.conn.cursor()

def load_entities(
self,
entities: List[BackstageEntity],
tenant_id: str,
execution_id: str
):
"""
Load entities into database with tenant isolation.
"""
try:
# Set tenant ID for row-level security
self.cursor.execute(
f"SET app.current_tenant_id = '{tenant_id}'"
)

# Check if execution already completed (idempotency)
if self._is_execution_complete(execution_id):
print(f"Execution {execution_id} already completed, skipping")
return

# Upsert entities
for entity in entities:
self._upsert_entity(entity, tenant_id)

# Mark execution as complete
self._mark_execution_complete(execution_id, len(entities))

self.conn.commit()
print(f"Successfully loaded {len(entities)} entities for tenant {tenant_id}")

except Exception as e:
self.conn.rollback()
print(f"Error loading entities: {e}")
raise

def _upsert_entity(self, entity: BackstageEntity, tenant_id: str):
"""
Insert or update entity (idempotent).
"""
entity_ref = self._make_entity_ref(
entity.kind,
entity.metadata["name"],
entity.metadata.get("namespace", "default")
)

query = """
INSERT INTO catalog.entities (
tenant_id,
entity_ref,
kind,
namespace,
name,
metadata,
spec,
sanitization_version,
created_at,
updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()
)
ON CONFLICT (tenant_id, entity_ref)
DO UPDATE SET
metadata = EXCLUDED.metadata,
spec = EXCLUDED.spec,
updated_at = NOW()
"""

self.cursor.execute(query, (
tenant_id,
entity_ref,
entity.kind,
entity.metadata.get("namespace", "default"),
entity.metadata["name"],
json.dumps(entity.metadata),
json.dumps(entity.spec),
"v1.0.0"
))

def _is_execution_complete(self, execution_id: str) -> bool:
"""
Check if execution already completed (idempotency).
"""
query = "SELECT 1 FROM catalog.executions WHERE execution_id = %s"
self.cursor.execute(query, (execution_id,))
return self.cursor.fetchone() is not None

def _mark_execution_complete(self, execution_id: str, entity_count: int):
"""
Mark execution as complete.
"""
query = """
INSERT INTO catalog.executions (execution_id, entity_count, completed_at)
VALUES (%s, %s, NOW())
"""
self.cursor.execute(query, (execution_id, entity_count))

def _make_entity_ref(self, kind: str, name: str, namespace: str) -> str:
"""
Create Backstage entity reference.
"""
return f"{kind.lower()}:{namespace}/{name}"

def close(self):
"""Close database connection."""
self.cursor.close()
self.conn.close()


# Usage
if __name__ == "__main__":
from entity_transformer import BackstageEntity

# Example entities
entities = [
BackstageEntity(
apiVersion="backstage.io/v1alpha1",
kind="Resource",
metadata={
"name": "prod-db",
"namespace": "default",
"labels": {"database-type": "cloud-sql"}
},
spec={
"type": "database",
"owner": "data-team"
}
)
]

loader = DatabaseLoader("postgresql://user:pass@localhost:5432/backstage")
loader.load_entities(entities, tenant_id="client-1", execution_id="exec-123")
loader.close()

5. End-to-End Workflow

# main.py
"""
Complete end-to-end sanitization workflow.
"""

import os
from download_worker import TerraformStateDownloader
from sanitization_engine import SanitizationEngine
from entity_transformer import EntityTransformer
from database_loader import DatabaseLoader

def process_workspace(
workspace_id: str,
tfc_token: str,
tenant_id: str,
db_connection_string: str
):
"""
Complete workflow: Download → Sanitize → Transform → Load
"""
execution_id = f"exec-{workspace_id}-{int(time.time())}"

try:
# Step 1: Download state
print(f"[{workspace_id}] Downloading state...")
downloader = TerraformStateDownloader(workspace_id, tfc_token)
encrypted_state = downloader.download_current_state()

# Step 2: Decrypt and sanitize
print(f"[{workspace_id}] Sanitizing state...")
raw_state = downloader.decrypt_state(encrypted_state)
engine = SanitizationEngine()
sanitized_state = engine.sanitize_state(raw_state, workspace_id)

# Step 3: Transform to Backstage entities
print(f"[{workspace_id}] Transforming to entities...")
transformer = EntityTransformer()
entities = transformer.transform_state(sanitized_state, workspace_id)

# Step 4: Load into database
print(f"[{workspace_id}] Loading into database...")
loader = DatabaseLoader(db_connection_string)
loader.load_entities(entities, tenant_id, execution_id)
loader.close()

# Step 5: Store audit log
print(f"[{workspace_id}] Storing audit log...")
audit_log = engine.get_audit_log()
store_audit_log(workspace_id, audit_log)

print(f"[{workspace_id}] ✅ Successfully processed")
return {
"workspace_id": workspace_id,
"status": "success",
"entities_loaded": len(entities),
"sanitizations": len(audit_log)
}

except Exception as e:
print(f"[{workspace_id}] ❌ Failed: {e}")
return {
"workspace_id": workspace_id,
"status": "failed",
"error": str(e)
}

def store_audit_log(workspace_id: str, audit_log: list):
"""
Store audit log to GCS/S3 for compliance.
"""
import json
from google.cloud import storage

client = storage.Client()
bucket = client.bucket("sanitization-audit-logs")
blob = bucket.blob(f"{workspace_id}/audit-{int(time.time())}.json")
blob.upload_from_string(
json.dumps(audit_log, indent=2),
content_type="application/json"
)


if __name__ == "__main__":
import time

# Configuration from environment
workspace_id = os.environ["TFC_WORKSPACE_ID"]
tfc_token = os.environ["TFC_TOKEN"]
tenant_id = os.environ["TENANT_ID"]
db_connection_string = os.environ["DATABASE_URL"]

# Process workspace
result = process_workspace(
workspace_id,
tfc_token,
tenant_id,
db_connection_string
)

print(f"\nResult: {result}")

Running the Example

1. Setup

# Install dependencies
pip install requests cryptography psycopg2-binary google-cloud-storage

# Set environment variables
export TFC_WORKSPACE_ID="ws-abc123"
export TFC_TOKEN="your-tfc-token"
export TENANT_ID="client-1"
export DATABASE_URL="postgresql://user:pass@localhost:5432/backstage"

2. Database Schema

-- Create database schema
CREATE SCHEMA IF NOT EXISTS catalog;

-- Entities table
CREATE TABLE catalog.entities (
entity_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(255) NOT NULL,
entity_ref VARCHAR(512) NOT NULL,
kind VARCHAR(64) NOT NULL,
namespace VARCHAR(255) NOT NULL DEFAULT 'default',
name VARCHAR(255) NOT NULL,
metadata JSONB NOT NULL,
spec JSONB NOT NULL,
sanitization_version VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),

CONSTRAINT entity_ref_per_tenant UNIQUE (tenant_id, entity_ref)
);

-- Executions table (for idempotency)
CREATE TABLE catalog.executions (
execution_id VARCHAR(255) PRIMARY KEY,
entity_count INTEGER NOT NULL,
completed_at TIMESTAMP NOT NULL DEFAULT NOW()
);

-- Row-level security
ALTER TABLE catalog.entities ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON catalog.entities
USING (tenant_id = current_setting('app.current_tenant_id'));

3. Run

python main.py

Expected Output

[ws-abc123] Downloading state...
[ws-abc123] Sanitizing state...
[ws-abc123] Transforming to entities...
[ws-abc123] Loading into database...
[ws-abc123] Storing audit log...
[ws-abc123] ✅ Successfully processed

Result: {
'workspace_id': 'ws-abc123',
'status': 'success',
'entities_loaded': 3,
'sanitizations': 5
}

What This Example Demonstrates

Ephemeral Processing: Raw state never persists to disk ✅ Multi-Stage Sanitization: Attribute name, pattern, entropy detection ✅ Audit Trail: Complete log of all sanitization actions ✅ Idempotency: Safe to re-run without duplicates ✅ Tenant Isolation: Per-client database separation


Next Steps

  1. Add Orchestration: Wrap this in Temporal workflow or Step Function
  2. Add More Rules: Expand _load_base_rules() with full taxonomy
  3. Add Client Overrides: Load per-client rules from configuration
  4. Add Performance Monitoring: Instrument with metrics and tracing
  5. Add DLQ: Handle permanent failures with dead letter queue

Production Considerations

⚠️ This is a simplified example. For production:

  1. Use Secrets Manager (not environment variables)
  2. Add Comprehensive Error Handling (retries, DLQ, alerting)
  3. Implement Connection Pooling (for database)
  4. Add Schema Validation (before database insert)
  5. Enable Workload Identity (GCP) or IAM Roles (AWS)
  6. Add Performance Instrumentation (OpenTelemetry)
  7. Load Rules from Files (not hardcoded)
  8. Add Final Security Verification (before database insert)

References