Sanitization Rules Engine Design
Executive Summary
This document specifies the design and implementation of the configurable, extensible rules engine that powers the sanitization pipeline. The rules engine enables per-resource-type, per-attribute, and per-client customization of sanitization behavior while maintaining security and compliance standards.
1. Rules Engine Architecture
1.1 Core Components
┌─────────────────────────────────────────────────────────────────┐
│ RULES ENGINE │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Repository (Version Controlled) │ │
│ │ - Base rules (provider-level defaults) │ │
│ │ - Resource-type rules (google_sql_*, aws_*) │ │
│ │ - Client overrides (per-tenant customization) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Compiler & Validator │ │
│ │ - YAML parsing │ │
│ │ - Syntax validation │ │
│ │ - Conflict resolution │ │
│ │ - Rule optimization (caching, indexing) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Evaluation Engine │ │
│ │ - Attribute path matching (glob patterns) │ │
│ │ - Condition evaluation (if/then logic) │ │
│ │ - Action execution (REDACT, MASK, PRESERVE) │ │
│ │ - Priority ordering (most specific wins) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Rule Audit & Monitoring │ │
│ │ - Rule application metrics │ │
│ │ - Coverage analysis (% of attributes matched) │ │
│ │ - Performance profiling │ │
│ └────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
2. Rule Definition Format
2.1 Base Rule Structure
# rules/base/google_sql_database_instance.yaml
# Metadata
rule_file_version: "1.0"
resource_type: "google_sql_database_instance"
provider: "google"
category: "database"
compliance_tags:
- SOC2_CC6.1
- HIPAA_164.312
- GDPR_Article_32
# Default sensitivity level for this resource type
default_sensitivity: HIGH
# Attribute-specific rules
attributes:
# Critical: Always redact
- path: "root_password"
action: REDACT
sensitivity: CRITICAL
reason: "Database root password"
compliance_requirement: mandatory
redaction_template: "[REDACTED:SQL_ROOT_PASSWORD]"
preserve_metadata:
- length # Store password length for policy compliance checking
- path: "settings.database_flags[?(@.name == 'cloudsql.iam_authentication')].value"
action: PRESERVE
sensitivity: LOW
reason: "IAM authentication flag is non-sensitive configuration"
# Connection details
- path: "connection_name"
action: PRESERVE
sensitivity: LOW
reason: "Connection name needed for service catalog"
- path: "private_ip_address"
action: MASK
sensitivity: HIGH
reason: "Internal network topology"
mask_pattern: "10.x.x.x"
conditions:
- if: "value matches /^10\\./"
then: MASK
- else: REDACT
- path: "public_ip_address"
action: PRESERVE
sensitivity: MEDIUM
reason: "Public IP may be needed for firewall rules visualization"
conditions:
- if: "client_policy.allow_public_ips == true"
then: PRESERVE
- else: MASK
# Network configuration
- path: "settings.ip_configuration.authorized_networks[*].value"
action: EVALUATE_PER_ITEM
sensitivity: HIGH
reason: "Authorized networks may contain sensitive IP ranges"
rules:
- if: "value == '0.0.0.0/0'"
then:
action: PRESERVE
flag: PUBLIC_EXPOSURE_WARNING
- if: "value matches /^10\\.|^172\\.(1[6-9]|2[0-9]|3[0-1])\\.|^192\\.168\\./"
then:
action: MASK
mask_pattern: "PRIVATE_CIDR"
- else:
action: REDACT
# Backup configuration (safe metadata)
- path: "settings.backup_configuration"
action: PRESERVE
sensitivity: LOW
reason: "Backup settings needed for disaster recovery visualization"
# Encryption configuration
- path: "encryption_key_name"
action: PRESERVE
sensitivity: LOW
reason: "KMS key reference (not the key itself)"
# Pattern-based rules (for dynamic/unknown attributes)
patterns:
- pattern: ".*password.*"
action: REDACT
sensitivity: CRITICAL
case_insensitive: true
- pattern: ".*secret.*"
action: REDACT
sensitivity: CRITICAL
case_insensitive: true
- pattern: ".*key.*"
action: EVALUATE
sensitivity: HIGH
reason: "Context-dependent: encryption_key_name (reference) vs. service_account_key (secret)"
conditions:
- if: "attribute_path ends_with '_name' or '_id' or '_reference'"
then: PRESERVE
- else: REDACT
# Metadata preservation (always keep these for entity relationships)
preserve_always:
- "project"
- "name"
- "region"
- "labels"
- "terraform_labels"
# Testing examples
test_cases:
- description: "Redact root password"
input:
root_password: "super_secret_123"
expected:
root_password: "[REDACTED:SQL_ROOT_PASSWORD]"
- description: "Mask private IP"
input:
private_ip_address: "10.128.0.45"
expected:
private_ip_address: "10.x.x.x"
- description: "Preserve public IP if allowed"
input:
public_ip_address: "34.123.45.67"
expected:
public_ip_address: "34.123.45.67" # If client policy allows
2.2 Client Override Rules
# rules/clients/fintech_client_1/overrides.yaml
client_id: "fintech_client_1"
policy_version: "2.1.0"
compliance_profile: "SOC2_Type2_HIPAA"
# More restrictive than base rules
resource_overrides:
google_sql_database_instance:
# Override base rules
attributes:
- path: "public_ip_address"
action: REDACT # Base rule: PRESERVE
reason: "FinTech client requires no public IP exposure"
- path: "settings.ip_configuration.authorized_networks[*].value"
action: REDACT_ALL # Base rule: MASK private IPs
reason: "No network topology disclosure"
- path: "connection_name"
action: MASK # Base rule: PRESERVE
mask_pattern: "[CONNECTION_NAME_REDACTED]"
reason: "Client security policy: hide all infrastructure details"
google_compute_instance:
# Additional restrictions
attributes:
- path: "metadata"
action: DEEP_SCAN_AND_REDACT
sensitivity: CRITICAL
reason: "Scan all metadata keys for sensitive data"
# Allow-list approach (only these attributes preserved)
allow_lists:
google_project:
- project_id
- name
- labels.environment
- labels.cost_center
# All other attributes: REDACT
# Deny-list approach (these patterns always redacted, regardless of base rules)
deny_patterns:
- ".*email.*"
- ".*phone.*"
- ".*ssn.*"
- ".*credit.*card.*"
# Custom sanitization functions
custom_functions:
- name: "anonymize_email"
description: "Replace email with anonymized version"
implementation: |
def anonymize_email(email: str) -> str:
local, domain = email.split('@')
return f"{hashlib.sha256(local.encode()).hexdigest()[:8]}@{domain}"
- name: "hash_identifier"
description: "One-way hash for identifiers"
implementation: |
def hash_identifier(value: str) -> str:
return f"HASH-{hashlib.sha256(value.encode()).hexdigest()[:16]}"
2.3 Development Environment Rules (Less Restrictive)
# rules/clients/dev_client_2/overrides.yaml
client_id: "dev_client_2"
policy_version: "1.0.0"
compliance_profile: "Development"
# More permissive than base rules
resource_overrides:
google_sql_database_instance:
attributes:
- path: "private_ip_address"
action: PRESERVE # Base rule: MASK
reason: "Dev environment needs full visibility"
- path: "settings.ip_configuration.authorized_networks[*].value"
action: PRESERVE # Base rule: MASK/REDACT
reason: "Dev team needs to see network configuration"
google_compute_instance:
attributes:
- path: "metadata.startup-script"
action: PRESERVE # Base rule: SCAN_FOR_SECRETS
reason: "Dev environment, startup scripts safe to expose"
# Still redact critical secrets
enforce_base_rules_for:
- ".*password.*"
- ".*private_key.*"
- ".*api_key.*"
- ".*secret.*"
3. Rule Evaluation Algorithm
3.1 Rule Precedence & Conflict Resolution
class RulePrecedence:
"""
Rule application order (highest to lowest priority):
1. Client-specific explicit attribute rules
2. Client-specific pattern rules
3. Base resource-type explicit attribute rules
4. Base resource-type pattern rules
5. Provider-level default rules
6. Global fallback (default: REDACT unknown high-entropy strings)
"""
def get_applicable_rule(
self,
resource_type: str,
attribute_path: str,
attribute_value: Any,
client_config: ClientConfig
) -> Rule:
# 1. Check client-specific explicit rules
if client_rule := self.client_rules.get_exact_match(
client_config.client_id,
resource_type,
attribute_path
):
return client_rule
# 2. Check client-specific pattern rules
if client_pattern_rule := self.client_rules.get_pattern_match(
client_config.client_id,
resource_type,
attribute_path
):
return client_pattern_rule
# 3. Check base explicit rules
if base_rule := self.base_rules.get_exact_match(
resource_type,
attribute_path
):
return base_rule
# 4. Check base pattern rules
if base_pattern_rule := self.base_rules.get_pattern_match(
resource_type,
attribute_path
):
return base_pattern_rule
# 5. Provider-level default
provider = self._extract_provider(resource_type)
if provider_rule := self.provider_defaults.get(provider):
return provider_rule
# 6. Global fallback
return self._evaluate_fallback_rule(attribute_value)
def _evaluate_fallback_rule(self, value: Any) -> Rule:
"""
Conservative default: if high entropy or looks like secret, redact.
"""
if isinstance(value, str):
entropy = self._calculate_entropy(value)
if entropy > 4.5: # High randomness
return Rule(action=Action.REDACT, reason="High-entropy unknown attribute")
return Rule(action=Action.PRESERVE, reason="Low-risk unknown attribute")
3.2 Condition Evaluation Engine
class ConditionEvaluator:
"""
Evaluate complex conditions in rules.
"""
def evaluate(self, condition: Condition, context: EvaluationContext) -> bool:
"""
Supported conditions:
- value matches regex
- value in list
- value equals constant
- client_policy.setting == value
- attribute_path ends_with suffix
"""
if condition.type == "matches":
return bool(re.match(condition.pattern, context.value))
elif condition.type == "in":
return context.value in condition.allowed_values
elif condition.type == "equals":
return context.value == condition.expected_value
elif condition.type == "client_policy":
policy_value = self._get_client_policy_value(
context.client_config,
condition.policy_key
)
return policy_value == condition.expected_value
elif condition.type == "attribute_path_suffix":
return context.attribute_path.endswith(condition.suffix)
else:
raise ValueError(f"Unknown condition type: {condition.type}")
# Example usage
condition = Condition(
type="matches",
pattern="/^10\\.|^172\\.(1[6-9]|2[0-9]|3[0-1])\\.|^192\\.168\\./"
)
context = EvaluationContext(
attribute_path="private_ip_address",
value="10.128.0.45",
resource_type="google_compute_instance",
client_config=client_config
)
is_private_ip = evaluator.evaluate(condition, context) # True
4. Action Types & Implementation
4.1 Supported Actions
from enum import Enum
from typing import Any, Dict
class Action(Enum):
REDACT = "REDACT" # Replace with placeholder
MASK = "MASK" # Partially hide
HASH = "HASH" # Cryptographic hash
PRESERVE = "PRESERVE" # Keep original
ANONYMIZE = "ANONYMIZE" # Pseudonymization
EVALUATE_PER_ITEM = "EVALUATE_PER_ITEM" # For arrays
DEEP_SCAN = "DEEP_SCAN" # Recursive scan for nested secrets
CUSTOM_FUNCTION = "CUSTOM_FUNCTION" # Client-defined function
class ActionExecutor:
def execute(self, action: Action, value: Any, rule: Rule, context: EvaluationContext) -> Any:
handlers = {
Action.REDACT: self._handle_redact,
Action.MASK: self._handle_mask,
Action.HASH: self._handle_hash,
Action.PRESERVE: self._handle_preserve,
Action.ANONYMIZE: self._handle_anonymize,
Action.EVALUATE_PER_ITEM: self._handle_evaluate_per_item,
Action.DEEP_SCAN: self._handle_deep_scan,
Action.CUSTOM_FUNCTION: self._handle_custom_function,
}
handler = handlers.get(action)
if not handler:
raise ValueError(f"Unknown action: {action}")
return handler(value, rule, context)
def _handle_redact(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Replace value with redaction placeholder"""
template = rule.redaction_template or "[REDACTED:{type}]"
redaction_type = self._infer_type(context.attribute_path, value)
return template.format(type=redaction_type)
def _handle_mask(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Partially hide value while preserving structure"""
if rule.mask_pattern:
return rule.mask_pattern
# Auto-detect mask pattern
if isinstance(value, str):
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", value):
# IP address
octets = value.split(".")
return f"{octets[0]}.x.x.x"
elif "@" in value:
# Email
local, domain = value.split("@", 1)
return f"{local[0]}***@{domain}"
return "[MASKED]"
def _handle_hash(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Replace with cryptographic hash"""
hash_value = hashlib.sha256(str(value).encode()).hexdigest()
return f"sha256:{hash_value[:16]}"
def _handle_preserve(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Keep original value"""
return value
def _handle_anonymize(self, value: Any, rule: Rule, context: EvaluationContext) -> str:
"""Pseudonymization with consistent mapping"""
# Use deterministic but irreversible transformation
hash_input = f"{context.client_id}:{context.attribute_path}:{value}"
hash_output = hashlib.sha256(hash_input.encode()).hexdigest()[:16]
return f"ANON-{hash_output}"
def _handle_evaluate_per_item(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Evaluate each item in array individually"""
if not isinstance(value, list):
return value
return [
self._evaluate_item(item, rule, context)
for item in value
]
def _handle_deep_scan(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Recursively scan nested structures for secrets"""
if isinstance(value, dict):
return {
k: self._handle_deep_scan(v, rule, context)
for k, v in value.items()
}
elif isinstance(value, list):
return [
self._handle_deep_scan(item, rule, context)
for item in value
]
elif isinstance(value, str):
# Check if string contains secrets
if self._contains_secret_pattern(value):
return self._handle_redact(value, rule, context)
return value
def _handle_custom_function(self, value: Any, rule: Rule, context: EvaluationContext) -> Any:
"""Execute client-defined custom function"""
function_name = rule.custom_function_name
function = context.client_config.custom_functions.get(function_name)
if not function:
raise ValueError(f"Custom function not found: {function_name}")
return function(value)
5. Rule Repository Structure
rules/
├── base/ # Provider-level default rules
│ ├── google/
│ │ ├── compute/
│ │ │ ├── google_compute_instance.yaml
│ │ │ ├── google_compute_address.yaml
│ │ │ └── google_compute_network.yaml
│ │ ├── sql/
│ │ │ ├── google_sql_database_instance.yaml
│ │ │ └── google_sql_user.yaml
│ │ ├── kms/
│ │ │ ├── google_kms_crypto_key.yaml
│ │ │ └── google_kms_secret_ciphertext.yaml
│ │ ├── iam/
│ │ │ ├── google_service_account.yaml
│ │ │ ├── google_service_account_key.yaml
│ │ │ └── google_project_iam_binding.yaml
│ │ └── storage/
│ │ ├── google_storage_bucket.yaml
│ │ └── google_storage_bucket_iam_binding.yaml
│ ├── aws/
│ │ ├── ec2/
│ │ ├── rds/
│ │ └── iam/
│ └── azure/
│ ├── compute/
│ └── storage/
│
├── clients/ # Client-specific overrides
│ ├── fintech_client_1/
│ │ ├── overrides.yaml
│ │ ├── custom_functions.py
│ │ └── test_cases.yaml
│ ├── healthcare_client_2/
│ │ └── overrides.yaml
│ └── dev_client_3/
│ └── overrides.yaml
│
├── patterns/ # Reusable pattern libraries
│ ├── credentials.yaml # Private keys, passwords, tokens
│ ├── network.yaml # IPs, CIDRs, URLs
│ ├── pii.yaml # Emails, phone numbers, SSN
│ └── encryption.yaml # Keys, certificates
│
├── compliance/ # Compliance-driven rule sets
│ ├── soc2.yaml
│ ├── hipaa.yaml
│ ├── gdpr.yaml
│ └── pci_dss.yaml
│
└── test_suites/ # Comprehensive test cases
├── google_resources_test.yaml
├── aws_resources_test.yaml
└── edge_cases_test.yaml
6. Rule Testing & Validation
6.1 Test Case Format
# test_suites/google_sql_database_instance_test.yaml
resource_type: google_sql_database_instance
test_cases:
- test_id: "sql_001"
description: "Redact root password"
client_policy: "default"
input:
root_password: "P@ssw0rd123!"
expected_output:
root_password: "[REDACTED:SQL_ROOT_PASSWORD]"
expected_audit_log:
action: "REDACT"
sensitivity: "CRITICAL"
rule_matched: "google_sql_database_instance/root_password"
- test_id: "sql_002"
description: "Mask private IP address"
client_policy: "default"
input:
private_ip_address: "10.128.0.45"
expected_output:
private_ip_address: "10.x.x.x"
expected_audit_log:
action: "MASK"
sensitivity: "HIGH"
- test_id: "sql_003"
description: "Preserve connection name"
client_policy: "default"
input:
connection_name: "my-project:us-central1:my-db"
expected_output:
connection_name: "my-project:us-central1:my-db"
expected_audit_log:
action: "PRESERVE"
sensitivity: "LOW"
- test_id: "sql_004"
description: "Client override: redact public IP"
client_policy: "fintech_client_1"
input:
public_ip_address: "34.123.45.67"
expected_output:
public_ip_address: "[REDACTED:PUBLIC_IP]"
expected_audit_log:
action: "REDACT"
sensitivity: "HIGH"
reason: "Client security policy override"
- test_id: "sql_005"
description: "Evaluate authorized networks individually"
client_policy: "default"
input:
settings:
ip_configuration:
authorized_networks:
- name: "office"
value: "203.0.113.0/24"
- name: "vpn"
value: "10.0.0.0/8"
- name: "public"
value: "0.0.0.0/0"
expected_output:
settings:
ip_configuration:
authorized_networks:
- name: "office"
value: "[REDACTED:PUBLIC_IP_RANGE]"
- name: "vpn"
value: "PRIVATE_CIDR"
- name: "public"
value: "0.0.0.0/0"
flag: "PUBLIC_EXPOSURE_WARNING"
6.2 Automated Testing Framework
class RuleTester:
def __init__(self, rules_engine: RulesEngine):
self.rules_engine = rules_engine
def run_test_suite(self, test_suite_path: str) -> TestReport:
"""
Run all test cases in a test suite file.
"""
test_suite = yaml.safe_load(Path(test_suite_path).read_text())
report = TestReport()
for test_case in test_suite["test_cases"]:
result = self.run_test_case(test_case, test_suite["resource_type"])
report.add_result(result)
return report
def run_test_case(self, test_case: Dict, resource_type: str) -> TestResult:
"""
Execute a single test case.
"""
# Create mock resource
resource = TerraformResource(
type=resource_type,
name="test-resource",
attributes=test_case["input"]
)
# Apply sanitization with client policy
client_config = self._get_client_config(test_case["client_policy"])
sanitized = self.rules_engine.sanitize_resource(resource, client_config)
# Compare output
expected = test_case["expected_output"]
actual = sanitized.attributes
# Validate
if self._deep_equals(expected, actual):
return TestResult(
test_id=test_case["test_id"],
status="PASS",
description=test_case["description"]
)
else:
return TestResult(
test_id=test_case["test_id"],
status="FAIL",
description=test_case["description"],
expected=expected,
actual=actual,
diff=self._generate_diff(expected, actual)
)
def _deep_equals(self, expected: Any, actual: Any) -> bool:
"""Deep equality comparison for nested structures"""
if type(expected) != type(actual):
return False
if isinstance(expected, dict):
if set(expected.keys()) != set(actual.keys()):
return False
return all(
self._deep_equals(expected[k], actual[k])
for k in expected.keys()
)
elif isinstance(expected, list):
if len(expected) != len(actual):
return False
return all(
self._deep_equals(e, a)
for e, a in zip(expected, actual)
)
else:
return expected == actual
7. Rule Coverage Analysis
7.1 Coverage Metrics
class RuleCoverageAnalyzer:
"""
Analyze rule coverage to identify gaps.
"""
def analyze(self, state: TerraformState, rules: RulesEngine) -> CoverageReport:
"""
Calculate coverage across all attributes in state.
"""
total_attributes = 0
matched_attributes = 0
unmatched_attributes = []
for resource in state.resources:
for attr_path, value in self._flatten_attributes(resource.attributes):
total_attributes += 1
rule = rules.get_applicable_rule(
resource.type,
attr_path,
value,
client_config
)
if rule and rule.source != "fallback":
matched_attributes += 1
else:
unmatched_attributes.append({
"resource_type": resource.type,
"attribute_path": attr_path,
"value_type": type(value).__name__,
"sample_value": self._safe_sample(value)
})
coverage_percentage = (matched_attributes / total_attributes) * 100
return CoverageReport(
total_attributes=total_attributes,
matched_attributes=matched_attributes,
coverage_percentage=coverage_percentage,
unmatched_attributes=unmatched_attributes
)
def _safe_sample(self, value: Any, max_length: int = 50) -> str:
"""Return safe sample of value without exposing secrets"""
value_str = str(value)
if len(value_str) > max_length:
value_str = value_str[:max_length] + "..."
# Redact if looks like secret
if self._looks_like_secret(value_str):
return "[REDACTED_IN_SAMPLE]"
return value_str
# Example coverage report
"""
Rule Coverage Report
====================
Total Attributes Scanned: 12,345
Matched by Explicit Rules: 10,234 (82.9%)
Matched by Pattern Rules: 1,456 (11.8%)
Matched by Fallback: 655 (5.3%)
Coverage by Resource Type:
- google_sql_database_instance: 95% (23/24 attributes)
- google_compute_instance: 88% (45/51 attributes)
- google_storage_bucket: 100% (12/12 attributes)
- google_project_iam_binding: 75% (9/12 attributes)
Unmatched Attributes (requiring new rules):
1. google_compute_instance: metadata.custom_script_url
2. google_project_iam_binding: condition.expression
3. google_container_cluster: node_config.workload_metadata_config
"""
8. Rule Versioning & Migration
8.1 Rule Version Management
# rules/base/google/compute/google_compute_instance.yaml
# Version tracking
rule_file_version: "2.1.0"
changelog:
- version: "2.1.0"
date: "2025-01-13"
changes:
- "Added deep scan for metadata fields"
- "Improved SSH key detection in metadata"
author: "security-team"
- version: "2.0.0"
date: "2025-01-01"
changes:
- "Breaking: Changed private_ip_address from PRESERVE to MASK"
- "Added support for nested metadata attributes"
author: "security-team"
- version: "1.0.0"
date: "2024-12-01"
changes:
- "Initial rule definition"
author: "security-team"
# Backward compatibility
deprecated_attributes:
- path: "metadata.ssh-keys"
deprecated_in: "2.0.0"
replacement: "metadata.ssh_keys"
migration_note: "Underscore-separated key names now preferred"
8.2 Migration Strategy
class RuleMigrator:
"""
Safely migrate rules to new versions.
"""
def migrate(
self,
from_version: str,
to_version: str,
dry_run: bool = True
) -> MigrationReport:
"""
Migrate rules from one version to another.
"""
migration_steps = self._plan_migration(from_version, to_version)
report = MigrationReport()
for step in migration_steps:
if dry_run:
report.add_planned_step(step)
else:
result = self._execute_migration_step(step)
report.add_executed_step(step, result)
return report
def validate_migration(self, report: MigrationReport) -> ValidationResult:
"""
Run test suites to validate migration didn't break anything.
"""
# Run all test suites with new rules
test_results = []
for test_suite in self._get_all_test_suites():
result = self.test_runner.run_test_suite(test_suite)
test_results.append(result)
# Check for regressions
regressions = [
r for r in test_results
if r.status == "FAIL" and r.previous_status == "PASS"
]
if regressions:
return ValidationResult(
status="FAILED",
regressions=regressions
)
return ValidationResult(status="PASSED")
9. Performance Optimization
9.1 Rule Caching
class RuleCache:
"""
Cache compiled rules for performance.
"""
def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds
def get_rule(
self,
resource_type: str,
attribute_path: str,
client_id: str
) -> Optional[Rule]:
"""
Get rule from cache or compile if not cached.
"""
cache_key = self._make_key(resource_type, attribute_path, client_id)
if cache_key in self.cache:
entry = self.cache[cache_key]
if time.time() - entry.timestamp < self.ttl:
return entry.rule
# Cache miss or expired
rule = self._compile_rule(resource_type, attribute_path, client_id)
self.cache[cache_key] = CacheEntry(rule=rule, timestamp=time.time())
return rule
def _make_key(self, resource_type: str, attribute_path: str, client_id: str) -> str:
return f"{client_id}:{resource_type}:{attribute_path}"
9.2 Rule Indexing
class RuleIndex:
"""
Pre-index rules for fast lookup.
"""
def __init__(self):
self.exact_match_index = {} # attribute_path -> Rule
self.prefix_match_index = {} # prefix -> List[Rule]
self.pattern_match_index = [] # List[(compiled_regex, Rule)]
def build_index(self, rules: List[Rule]):
"""
Build optimized lookup structures.
"""
for rule in rules:
if rule.match_type == "exact":
self.exact_match_index[rule.attribute_path] = rule
elif rule.match_type == "prefix":
prefix = rule.attribute_path.rstrip("*")
if prefix not in self.prefix_match_index:
self.prefix_match_index[prefix] = []
self.prefix_match_index[prefix].append(rule)
elif rule.match_type == "pattern":
compiled_pattern = re.compile(rule.pattern)
self.pattern_match_index.append((compiled_pattern, rule))
def lookup(self, attribute_path: str) -> Optional[Rule]:
"""
Fast O(1) lookup for exact matches, O(log n) for patterns.
"""
# Try exact match first (O(1))
if attribute_path in self.exact_match_index:
return self.exact_match_index[attribute_path]
# Try prefix match (O(1) per prefix)
for prefix, rules in self.prefix_match_index.items():
if attribute_path.startswith(prefix):
return rules[0] # Return most specific
# Try pattern match (O(n) but typically small n)
for pattern, rule in self.pattern_match_index:
if pattern.match(attribute_path):
return rule
return None
10. Rule Documentation & Discoverability
10.1 Rule Catalog
# catalog/index.yaml
catalog_version: "1.0"
total_rules: 234
total_resource_types: 87
categories:
- name: "Google Cloud Platform"
resource_types: 45
rules: 156
coverage: 95%
- name: "Amazon Web Services"
resource_types: 32
rules: 68
coverage: 88%
- name: "Microsoft Azure"
resource_types: 10
rules: 10
coverage: 65%
by_sensitivity:
CRITICAL: 45 rules
HIGH: 89 rules
MEDIUM: 78 rules
LOW: 22 rules
by_action:
REDACT: 98 rules
MASK: 45 rules
PRESERVE: 67 rules
HASH: 12 rules
CUSTOM: 12 rules
most_common_patterns:
- ".*password.*": 67 matches
- ".*secret.*": 54 matches
- ".*key.*": 89 matches (context-dependent)
- ".*token.*": 34 matches
11. Monitoring & Observability
11.1 Rule Performance Metrics
Metrics to Track:
- rule_evaluation_duration_seconds{rule_id, resource_type}
- rule_match_rate{rule_id, action}
- rule_cache_hit_ratio
- rule_coverage_percentage{resource_type}
- rule_false_positive_rate{rule_id}
- rule_version{rule_file, version}
Alerts:
- LowCoverage: coverage < 90% for any resource type
- SlowRuleEvaluation: p99 evaluation time > 100ms
- HighFalsePositiveRate: false_positive_rate > 5%
Version History
- v1.0 (2025-01-13): Initial rules engine design
- v1.1 (TBD): Add machine learning-based secret detection
- v1.2 (TBD): Add real-time rule updates without restart