Skip to main content

Sanitization Rules Engine Design

Executive Summary

This document specifies the design and implementation of the configurable, extensible rules engine that powers the sanitization pipeline. The rules engine enables per-resource-type, per-attribute, and per-client customization of sanitization behavior while maintaining security and compliance standards.


1. Rules Engine Architecture

1.1 Core Components

┌─────────────────────────────────────────────────────────────────┐
│ RULES ENGINE │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Repository (Version Controlled) │ │
│ │ - Base rules (provider-level defaults) │ │
│ │ - Resource-type rules (google_sql_*, aws_*) │ │
│ │ - Client overrides (per-tenant customization) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Compiler & Validator │ │
│ │ - YAML parsing │ │
│ │ - Syntax validation │ │
│ │ - Conflict resolution │ │
│ │ - Rule optimization (caching, indexing) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Evaluation Engine │ │
│ │ - Attribute path matching (glob patterns) │ │
│ │ - Condition evaluation (if/then logic) │ │
│ │ - Action execution (REDACT, MASK, PRESERVE) │ │
│ │ - Priority ordering (most specific wins) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Audit & Monitoring │ │
│ │ - Rule application metrics │ │
│ │ - Coverage analysis (% of attributes matched) │ │
│ │ - Performance profiling │ │
│ └────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

2. Rule Definition Format

2.1 Base Rule Structure

# rules/base/google_sql_database_instance.yaml

# Metadata
rule_file_version: "1.0"
resource_type: "google_sql_database_instance"
provider: "google"
category: "database"
compliance_tags:
- SOC2_CC6.1
- HIPAA_164.312
- GDPR_Article_32

# Default sensitivity level for this resource type
default_sensitivity: HIGH

# Attribute-specific rules
attributes:
# Critical: Always redact
- path: "root_password"
action: REDACT
sensitivity: CRITICAL
reason: "Database root password"
compliance_requirement: mandatory
redaction_template: "[REDACTED:SQL_ROOT_PASSWORD]"
preserve_metadata:
- length # Store password length for policy compliance checking

- path: "settings.database_flags[?(@.name == 'cloudsql.iam_authentication')].value"
action: PRESERVE
sensitivity: LOW
reason: "IAM authentication flag is non-sensitive configuration"

# Connection details
- path: "connection_name"
action: PRESERVE
sensitivity: LOW
reason: "Connection name needed for service catalog"

- path: "private_ip_address"
action: MASK
sensitivity: HIGH
reason: "Internal network topology"
mask_pattern: "10.x.x.x"
conditions:
- if: "value matches /^10\\./"
then: MASK
- else: REDACT

- path: "public_ip_address"
action: PRESERVE
sensitivity: MEDIUM
reason: "Public IP may be needed for firewall rules visualization"
conditions:
- if: "client_policy.allow_public_ips == true"
then: PRESERVE
- else: MASK

# Network configuration
- path: "settings.ip_configuration.authorized_networks[*].value"
action: EVALUATE_PER_ITEM
sensitivity: HIGH
reason: "Authorized networks may contain sensitive IP ranges"
rules:
- if: "value == '0.0.0.0/0'"
then:
action: PRESERVE
flag: PUBLIC_EXPOSURE_WARNING
- if: "value matches /^10\\.|^172\\.(1[6-9]|2[0-9]|3[0-1])\\.|^192\\.168\\./"
then:
action: MASK
mask_pattern: "PRIVATE_CIDR"
- else:
action: REDACT

# Backup configuration (safe metadata)
- path: "settings.backup_configuration"
action: PRESERVE
sensitivity: LOW
reason: "Backup settings needed for disaster recovery visualization"

# Encryption configuration
- path: "encryption_key_name"
action: PRESERVE
sensitivity: LOW
reason: "KMS key reference (not the key itself)"

# Pattern-based rules (for dynamic/unknown attributes)
patterns:
- pattern: ".*password.*"
action: REDACT
sensitivity: CRITICAL
case_insensitive: true

- pattern: ".*secret.*"
action: REDACT
sensitivity: CRITICAL
case_insensitive: true

- pattern: ".*key.*"
action: EVALUATE
sensitivity: HIGH
reason: "Context-dependent: encryption_key_name (reference) vs. service_account_key (secret)"
conditions:
- if: "attribute_path ends_with '_name' or '_id' or '_reference'"
then: PRESERVE
- else: REDACT

# Metadata preservation (always keep these for entity relationships)
preserve_always:
- "project"
- "name"
- "region"
- "labels"
- "terraform_labels"

# Testing examples
test_cases:
- description: "Redact root password"
input:
root_password: "super_secret_123"
expected:
root_password: "[REDACTED:SQL_ROOT_PASSWORD]"

- description: "Mask private IP"
input:
private_ip_address: "10.128.0.45"
expected:
private_ip_address: "10.x.x.x"

- description: "Preserve public IP if allowed"
input:
public_ip_address: "34.123.45.67"
expected:
public_ip_address: "34.123.45.67" # If client policy allows

2.2 Client Override Rules

# rules/clients/fintech_client_1/overrides.yaml

client_id: "fintech_client_1"
policy_version: "2.1.0"
compliance_profile: "SOC2_Type2_HIPAA"

# More restrictive than base rules
resource_overrides:
google_sql_database_instance:
# Override base rules
attributes:
- path: "public_ip_address"
action: REDACT # Base rule: PRESERVE
reason: "FinTech client requires no public IP exposure"

- path: "settings.ip_configuration.authorized_networks[*].value"
action: REDACT_ALL # Base rule: MASK private IPs
reason: "No network topology disclosure"

- path: "connection_name"
action: MASK # Base rule: PRESERVE
mask_pattern: "[CONNECTION_NAME_REDACTED]"
reason: "Client security policy: hide all infrastructure details"

google_compute_instance:
# Additional restrictions
attributes:
- path: "metadata"
action: DEEP_SCAN_AND_REDACT
sensitivity: CRITICAL
reason: "Scan all metadata keys for sensitive data"

# Allow-list approach (only these attributes preserved)
allow_lists:
google_project:
- project_id
- name
- labels.environment
- labels.cost_center
# All other attributes: REDACT

# Deny-list approach (these patterns always redacted, regardless of base rules)
deny_patterns:
- ".*email.*"
- ".*phone.*"
- ".*ssn.*"
- ".*credit.*card.*"

# Custom sanitization functions
custom_functions:
- name: "anonymize_email"
description: "Replace email with anonymized version"
implementation: |
def anonymize_email(email: str) -> str:
local, domain = email.split('@')
return f"{hashlib.sha256(local.encode()).hexdigest()[:8]}@{domain}"

- name: "hash_identifier"
description: "One-way hash for identifiers"
implementation: |
def hash_identifier(value: str) -> str:
return f"HASH-{hashlib.sha256(value.encode()).hexdigest()[:16]}"

2.3 Development Environment Rules (Less Restrictive)

# rules/clients/dev_client_2/overrides.yaml

client_id: "dev_client_2"
policy_version: "1.0.0"
compliance_profile: "Development"

# More permissive than base rules
resource_overrides:
google_sql_database_instance:
attributes:
- path: "private_ip_address"
action: PRESERVE # Base rule: MASK
reason: "Dev environment needs full visibility"

- path: "settings.ip_configuration.authorized_networks[*].value"
action: PRESERVE # Base rule: MASK/REDACT
reason: "Dev team needs to see network configuration"

google_compute_instance:
attributes:
- path: "metadata.startup-script"
action: PRESERVE # Base rule: SCAN_FOR_SECRETS
reason: "Dev environment, startup scripts safe to expose"

# Still redact critical secrets
enforce_base_rules_for:
- ".*password.*"
- ".*private_key.*"
- ".*api_key.*"
- ".*secret.*"

3. Rule Evaluation Algorithm

3.1 Rule Precedence & Conflict Resolution

class RulePrecedence:
"""
Rule application order (highest to lowest priority):
1. Client-specific explicit attribute rules
2. Client-specific pattern rules
3. Base resource-type explicit attribute rules
4. Base resource-type pattern rules
5. Provider-level default rules
6. Global fallback (default: REDACT unknown high-entropy strings)
"""

def get_applicable_rule(
self,
resource_type: str,
attribute_path: str,
attribute_value: Any,
client_config: ClientConfig
) -> Rule:
# 1. Check client-specific explicit rules
if client_rule := self.client_rules.get_exact_match(
client_config.client_id,
resource_type,
attribute_path
):
return client_rule

# 2. Check client-specific pattern rules
if client_pattern_rule := self.client_rules.get_pattern_match(
client_config.client_id,
resource_type,
attribute_path
):
return client_pattern_rule

# 3. Check base explicit rules
if base_rule := self.base_rules.get_exact_match(
resource_type,
attribute_path
):
return base_rule

# 4. Check base pattern rules
if base_pattern_rule := self.base_rules.get_pattern_match(
resource_type,
attribute_path
):
return base_pattern_rule

# 5. Provider-level default
provider = self._extract_provider(resource_type)
if provider_rule := self.provider_defaults.get(provider):
return provider_rule

# 6. Global fallback
return self._evaluate_fallback_rule(attribute_value)

def _evaluate_fallback_rule(self, value: Any) -> Rule:
"""
Conservative default: if high entropy or looks like secret, redact.
"""
if isinstance(value, str):
entropy = self._calculate_entropy(value)
if entropy > 4.5: # High randomness
return Rule(action=Action.REDACT, reason="High-entropy unknown attribute")

return Rule(action=Action.PRESERVE, reason="Low-risk unknown attribute")

3.2 Condition Evaluation Engine

class ConditionEvaluator:
"""
Evaluate complex conditions in rules.
"""

def evaluate(self, condition: Condition, context: EvaluationContext) -> bool:
"""
Supported conditions:
- value matches regex
- value in list
- value equals constant
- client_policy.setting == value
- attribute_path ends_with suffix
"""
if condition.type == "matches":
return bool(re.match(condition.pattern, context.value))

elif condition.type == "in":
return context.value in condition.allowed_values

elif condition.type == "equals":
return context.value == condition.expected_value

elif condition.type == "client_policy":
policy_value = self._get_client_policy_value(
context.client_config,
condition.policy_key
)
return policy_value == condition.expected_value

elif condition.type == "attribute_path_suffix":
return context.attribute_path.endswith(condition.suffix)

else:
raise ValueError(f"Unknown condition type: {condition.type}")

# Example usage
condition = Condition(
type="matches",
pattern="/^10\\.|^172\\.(1[6-9]|2[0-9]|3[0-1])\\.|^192\\.168\\./"
)

context = EvaluationContext(
attribute_path="private_ip_address",
value="10.128.0.45",
resource_type="google_compute_instance",
client_config=client_config
)

is_private_ip = evaluator.evaluate(condition, context) # True

4. Action Types & Implementation

4.1 Supported Actions

from enum import Enum
from typing import Any, Dict

class Action(Enum):
REDACT = "REDACT" # Replace with placeholder
MASK = "MASK" # Partially hide
HASH = "HASH" # Cryptographic hash
PRESERVE = "PRESERVE" # Keep original
ANONYMIZE = "ANONYMIZE" # Pseudonymization
EVALUATE_PER_ITEM = "EVALUATE_PER_ITEM" # For arrays
DEEP_SCAN = "DEEP_SCAN" # Recursive scan for nested secrets
CUSTOM_FUNCTION = "CUSTOM_FUNCTION" # Client-defined function

class ActionExecutor:
def execute(self, action: Action, value: Any, rule: Rule, context: EvaluationContext) -> Any:
handlers = {
Action.REDACT: self._handle_redact,
Action.MASK: self._handle_mask,
Action.HASH: self._handle_hash,
Action.PRESERVE: self._handle_preserve,
Action.ANONYMIZE: self._handle_anonymize,
Action.EVALUATE_PER_ITEM: self._handle_evaluate_per_item,
Action.DEEP_SCAN: self._handle_deep_scan,
Action.CUSTOM_FUNCTION: self._handle_custom_function,
}

handler = handlers.get(action)
if not handler:
raise ValueError(f"Unknown action: {action}")

return handler(value, rule, context)

def _handle_redact(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Replace value with redaction placeholder"""
template = rule.redaction_template or "[REDACTED:{type}]"
redaction_type = self._infer_type(context.attribute_path, value)
return template.format(type=redaction_type)

def _handle_mask(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Partially hide value while preserving structure"""
if rule.mask_pattern:
return rule.mask_pattern

# Auto-detect mask pattern
if isinstance(value, str):
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value):
# IP address
octets = value.split(".")
return f"{octets[0]}.x.x.x"

elif "@" in value:
# Email
local, domain = value.split("@", 1)
return f"{local[0]}***@{domain}"

return "[MASKED]"

def _handle_hash(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Replace with cryptographic hash"""
hash_value = hashlib.sha256(str(value).encode()).hexdigest()
return f"sha256:{hash_value[:16]}"

def _handle_preserve(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Keep original value"""
return value

def _handle_anonymize(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Pseudonymization with consistent mapping"""
# Use deterministic but irreversible transformation
hash_input = f"{context.client_id}:{context.attribute_path}:{value}"
hash_output = hashlib.sha256(hash_input.encode()).hexdigest()[:16]
return f"ANON-{hash_output}"

def _handle_evaluate_per_item(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Evaluate each item in array individually"""
if not isinstance(value, list):
return value

return [
self._evaluate_item(item, rule, context)
for item in value
]

def _handle_deep_scan(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Recursively scan nested structures for secrets"""
if isinstance(value, dict):
return {
k: self._handle_deep_scan(v, rule, context)
for k, v in value.items()
}
elif isinstance(value, list):
return [
self._handle_deep_scan(item, rule, context)
for item in value
]
elif isinstance(value, str):
# Check if string contains secrets
if self._contains_secret_pattern(value):
return self._handle_redact(value, rule, context)
return value

def _handle_custom_function(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Execute client-defined custom function"""
function_name = rule.custom_function_name
function = context.client_config.custom_functions.get(function_name)

if not function:
raise ValueError(f"Custom function not found: {function_name}")

return function(value)

5. Rule Repository Structure

rules/
├── base/ # Provider-level default rules
│ ├── google/
│ │ ├── compute/
│ │ │ ├── google_compute_instance.yaml
│ │ │ ├── google_compute_address.yaml
│ │ │ └── google_compute_network.yaml
│ │ ├── sql/
│ │ │ ├── google_sql_database_instance.yaml
│ │ │ └── google_sql_user.yaml
│ │ ├── kms/
│ │ │ ├── google_kms_crypto_key.yaml
│ │ │ └── google_kms_secret_ciphertext.yaml
│ │ ├── iam/
│ │ │ ├── google_service_account.yaml
│ │ │ ├── google_service_account_key.yaml
│ │ │ └── google_project_iam_binding.yaml
│ │ └── storage/
│ │ ├── google_storage_bucket.yaml
│ │ └── google_storage_bucket_iam_binding.yaml
│ ├── aws/
│ │ ├── ec2/
│ │ ├── rds/
│ │ └── iam/
│ └── azure/
│ ├── compute/
│ └── storage/

├── clients/ # Client-specific overrides
│ ├── fintech_client_1/
│ │ ├── overrides.yaml
│ │ ├── custom_functions.py
│ │ └── test_cases.yaml
│ ├── healthcare_client_2/
│ │ └── overrides.yaml
│ └── dev_client_3/
│ └── overrides.yaml

├── patterns/ # Reusable pattern libraries
│ ├── credentials.yaml # Private keys, passwords, tokens
│ ├── network.yaml # IPs, CIDRs, URLs
│ ├── pii.yaml # Emails, phone numbers, SSN
│ └── encryption.yaml # Keys, certificates

├── compliance/ # Compliance-driven rule sets
│ ├── soc2.yaml
│ ├── hipaa.yaml
│ ├── gdpr.yaml
│ └── pci_dss.yaml

└── test_suites/ # Comprehensive test cases
├── google_resources_test.yaml
├── aws_resources_test.yaml
└── edge_cases_test.yaml

6. Rule Testing & Validation

6.1 Test Case Format

# test_suites/google_sql_database_instance_test.yaml

resource_type: google_sql_database_instance
test_cases:
- test_id: "sql_001"
description: "Redact root password"
client_policy: "default"
input:
root_password: "P@ssw0rd123!"
expected_output:
root_password: "[REDACTED:SQL_ROOT_PASSWORD]"
expected_audit_log:
action: "REDACT"
sensitivity: "CRITICAL"
rule_matched: "google_sql_database_instance/root_password"

- test_id: "sql_002"
description: "Mask private IP address"
client_policy: "default"
input:
private_ip_address: "10.128.0.45"
expected_output:
private_ip_address: "10.x.x.x"
expected_audit_log:
action: "MASK"
sensitivity: "HIGH"

- test_id: "sql_003"
description: "Preserve connection name"
client_policy: "default"
input:
connection_name: "my-project:us-central1:my-db"
expected_output:
connection_name: "my-project:us-central1:my-db"
expected_audit_log:
action: "PRESERVE"
sensitivity: "LOW"

- test_id: "sql_004"
description: "Client override: redact public IP"
client_policy: "fintech_client_1"
input:
public_ip_address: "34.123.45.67"
expected_output:
public_ip_address: "[REDACTED:PUBLIC_IP]"
expected_audit_log:
action: "REDACT"
sensitivity: "HIGH"
reason: "Client security policy override"

- test_id: "sql_005"
description: "Evaluate authorized networks individually"
client_policy: "default"
input:
settings:
ip_configuration:
authorized_networks:
- name: "office"
value: "203.0.113.0/24"
- name: "vpn"
value: "10.0.0.0/8"
- name: "public"
value: "0.0.0.0/0"
expected_output:
settings:
ip_configuration:
authorized_networks:
- name: "office"
value: "[REDACTED:PUBLIC_IP_RANGE]"
- name: "vpn"
value: "PRIVATE_CIDR"
- name: "public"
value: "0.0.0.0/0"
flag: "PUBLIC_EXPOSURE_WARNING"

6.2 Automated Testing Framework

class RuleTester:
def __init__(self, rules_engine: RulesEngine):
self.rules_engine = rules_engine

def run_test_suite(self, test_suite_path: str) -> TestReport:
"""
Run all test cases in a test suite file.
"""
test_suite = yaml.safe_load(Path(test_suite_path).read_text())

report = TestReport()
for test_case in test_suite["test_cases"]:
result = self.run_test_case(test_case, test_suite["resource_type"])
report.add_result(result)

return report

def run_test_case(self, test_case: Dict, resource_type: str) -> TestResult:
"""
Execute a single test case.
"""
# Create mock resource
resource = TerraformResource(
type=resource_type,
name="test-resource",
attributes=test_case["input"]
)

# Apply sanitization with client policy
client_config = self._get_client_config(test_case["client_policy"])
sanitized = self.rules_engine.sanitize_resource(resource, client_config)

# Compare output
expected = test_case["expected_output"]
actual = sanitized.attributes

# Validate
if self._deep_equals(expected, actual):
return TestResult(
test_id=test_case["test_id"],
status="PASS",
description=test_case["description"]
)
else:
return TestResult(
test_id=test_case["test_id"],
status="FAIL",
description=test_case["description"],
expected=expected,
actual=actual,
diff=self._generate_diff(expected, actual)
)

def _deep_equals(self, expected: Any, actual: Any) -> bool:
"""Deep equality comparison for nested structures"""
if type(expected) != type(actual):
return False

if isinstance(expected, dict):
if set(expected.keys()) != set(actual.keys()):
return False
return all(
self._deep_equals(expected[k], actual[k])
for k in expected.keys()
)

elif isinstance(expected, list):
if len(expected) != len(actual):
return False
return all(
self._deep_equals(e, a)
for e, a in zip(expected, actual)
)

else:
return expected == actual

7. Rule Coverage Analysis

7.1 Coverage Metrics

class RuleCoverageAnalyzer:
"""
Analyze rule coverage to identify gaps.
"""

def analyze(self, state: TerraformState, rules: RulesEngine) -> CoverageReport:
"""
Calculate coverage across all attributes in state.
"""
total_attributes = 0
matched_attributes = 0
unmatched_attributes = []

for resource in state.resources:
for attr_path, value in self._flatten_attributes(resource.attributes):
total_attributes += 1

rule = rules.get_applicable_rule(
resource.type,
attr_path,
value,
client_config
)

if rule and rule.source != "fallback":
matched_attributes += 1
else:
unmatched_attributes.append({
"resource_type": resource.type,
"attribute_path": attr_path,
"value_type": type(value).__name__,
"sample_value": self._safe_sample(value)
})

coverage_percentage = (matched_attributes / total_attributes) * 100

return CoverageReport(
total_attributes=total_attributes,
matched_attributes=matched_attributes,
coverage_percentage=coverage_percentage,
unmatched_attributes=unmatched_attributes
)

def _safe_sample(self, value: Any, max_length: int = 50) -> str:
"""Return safe sample of value without exposing secrets"""
value_str = str(value)
if len(value_str) > max_length:
value_str = value_str[:max_length] + "..."
# Redact if looks like secret
if self._looks_like_secret(value_str):
return "[REDACTED_IN_SAMPLE]"
return value_str

# Example coverage report
"""
Rule Coverage Report
====================
Total Attributes Scanned: 12,345
Matched by Explicit Rules: 10,234 (82.9%)
Matched by Pattern Rules: 1,456 (11.8%)
Matched by Fallback: 655 (5.3%)

Coverage by Resource Type:
- google_sql_database_instance: 95% (23/24 attributes)
- google_compute_instance: 88% (45/51 attributes)
- google_storage_bucket: 100% (12/12 attributes)
- google_project_iam_binding: 75% (9/12 attributes)

Unmatched Attributes (requiring new rules):
1. google_compute_instance: metadata.custom_script_url
2. google_project_iam_binding: condition.expression
3. google_container_cluster: node_config.workload_metadata_config
"""

8. Rule Versioning & Migration

8.1 Rule Version Management

# rules/base/google/compute/google_compute_instance.yaml

# Version tracking
rule_file_version: "2.1.0"
changelog:
- version: "2.1.0"
date: "2025-01-13"
changes:
- "Added deep scan for metadata fields"
- "Improved SSH key detection in metadata"
author: "security-team"

- version: "2.0.0"
date: "2025-01-01"
changes:
- "Breaking: Changed private_ip_address from PRESERVE to MASK"
- "Added support for nested metadata attributes"
author: "security-team"

- version: "1.0.0"
date: "2024-12-01"
changes:
- "Initial rule definition"
author: "security-team"

# Backward compatibility
deprecated_attributes:
- path: "metadata.ssh-keys"
deprecated_in: "2.0.0"
replacement: "metadata.ssh_keys"
migration_note: "Underscore-separated key names now preferred"

8.2 Migration Strategy

class RuleMigrator:
"""
Safely migrate rules to new versions.
"""

def migrate(
self,
from_version: str,
to_version: str,
dry_run: bool = True
) -> MigrationReport:
"""
Migrate rules from one version to another.
"""
migration_steps = self._plan_migration(from_version, to_version)

report = MigrationReport()
for step in migration_steps:
if dry_run:
report.add_planned_step(step)
else:
result = self._execute_migration_step(step)
report.add_executed_step(step, result)

return report

def validate_migration(self, report: MigrationReport) -> ValidationResult:
"""
Run test suites to validate migration didn't break anything.
"""
# Run all test suites with new rules
test_results = []
for test_suite in self._get_all_test_suites():
result = self.test_runner.run_test_suite(test_suite)
test_results.append(result)

# Check for regressions
regressions = [
r for r in test_results
if r.status == "FAIL" and r.previous_status == "PASS"
]

if regressions:
return ValidationResult(
status="FAILED",
regressions=regressions
)

return ValidationResult(status="PASSED")

9. Performance Optimization

9.1 Rule Caching

class RuleCache:
"""
Cache compiled rules for performance.
"""

def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds

def get_rule(
self,
resource_type: str,
attribute_path: str,
client_id: str
) -> Optional[Rule]:
"""
Get rule from cache or compile if not cached.
"""
cache_key = self._make_key(resource_type, attribute_path, client_id)

if cache_key in self.cache:
entry = self.cache[cache_key]
if time.time() - entry.timestamp < self.ttl:
return entry.rule

# Cache miss or expired
rule = self._compile_rule(resource_type, attribute_path, client_id)
self.cache[cache_key] = CacheEntry(rule=rule, timestamp=time.time())
return rule

def _make_key(self, resource_type: str, attribute_path: str, client_id: str) -> str:
return f"{client_id}:{resource_type}:{attribute_path}"

9.2 Rule Indexing

class RuleIndex:
"""
Pre-index rules for fast lookup.
"""

def __init__(self):
self.exact_match_index = {} # attribute_path -> Rule
self.prefix_match_index = {} # prefix -> List[Rule]
self.pattern_match_index = [] # List[(compiled_regex, Rule)]

def build_index(self, rules: List[Rule]):
"""
Build optimized lookup structures.
"""
for rule in rules:
if rule.match_type == "exact":
self.exact_match_index[rule.attribute_path] = rule

elif rule.match_type == "prefix":
prefix = rule.attribute_path.rstrip("*")
if prefix not in self.prefix_match_index:
self.prefix_match_index[prefix] = []
self.prefix_match_index[prefix].append(rule)

elif rule.match_type == "pattern":
compiled_pattern = re.compile(rule.pattern)
self.pattern_match_index.append((compiled_pattern, rule))

def lookup(self, attribute_path: str) -> Optional[Rule]:
"""
Fast O(1) lookup for exact matches, O(log n) for patterns.
"""
# Try exact match first (O(1))
if attribute_path in self.exact_match_index:
return self.exact_match_index[attribute_path]

# Try prefix match (O(1) per prefix)
for prefix, rules in self.prefix_match_index.items():
if attribute_path.startswith(prefix):
return rules[0] # Return most specific

# Try pattern match (O(n) but typically small n)
for pattern, rule in self.pattern_match_index:
if pattern.match(attribute_path):
return rule

return None

10. Rule Documentation & Discoverability

10.1 Rule Catalog

# catalog/index.yaml

catalog_version: "1.0"
total_rules: 234
total_resource_types: 87

categories:
- name: "Google Cloud Platform"
resource_types: 45
rules: 156
coverage: 95%

- name: "Amazon Web Services"
resource_types: 32
rules: 68
coverage: 88%

- name: "Microsoft Azure"
resource_types: 10
rules: 10
coverage: 65%

by_sensitivity:
CRITICAL: 45 rules
HIGH: 89 rules
MEDIUM: 78 rules
LOW: 22 rules

by_action:
REDACT: 98 rules
MASK: 45 rules
PRESERVE: 67 rules
HASH: 12 rules
CUSTOM: 12 rules

most_common_patterns:
- ".*password.*": 67 matches
- ".*secret.*": 54 matches
- ".*key.*": 89 matches (context-dependent)
- ".*token.*": 34 matches

11. Monitoring & Observability

11.1 Rule Performance Metrics

Metrics to Track:
- rule_evaluation_duration_seconds{rule_id, resource_type}
- rule_match_rate{rule_id, action}
- rule_cache_hit_ratio
- rule_coverage_percentage{resource_type}
- rule_false_positive_rate{rule_id}
- rule_version{rule_file, version}

Alerts:
- LowCoverage: coverage < 90% for any resource type
- SlowRuleEvaluation: p99 evaluation time > 100ms
- HighFalsePositiveRate: false_positive_rate > 5%

Version History

  • v1.0 (2025-01-13): Initial rules engine design
  • v1.1 (TBD): Add machine learning-based secret detection
  • v1.2 (TBD): Add real-time rule updates without restart

References