A biopharma company maintains three layers of data governance: 1. Technical catalog (CSVs) — what actually exists in the data platform 2. Contracts (.md) — legal agreements with external data providers 3. Business catalog (.md) — business-facing data product descriptions
"""
Demo: Biopharma Data Governance — Cross-Layer Consistency.
Scenario: A biopharma company maintains three layers of data governance:
1. Technical catalog (CSVs) — what actually exists in the data platform
2. Contracts (.md) — legal agreements with external data providers
3. Business catalog (.md) — business-facing data product descriptions
Each layer is authored and maintained independently. Over time, drift
accumulates: paths change, contracts expire, business catalog entries
reference datasets that no longer exist, refresh frequencies diverge.
This demo loads all three layers as Parseltongue documents, extracts
facts from each, and runs cross-layer consistency checks. A synthetic
generator (generate.py) injects ~15-20% corruptions across layers,
and Parseltongue catches every one.
Run:
python generate.py --clean # first, generate the data
python demo.py # then, run the consistency checks
"""
import csv
import json
import logging
import re
import sys
from pathlib import Path
from operators import GOVERNANCE_EFFECTS
from parseltongue.core import System, load_source
RESOURCES = Path(__file__).parent / "resources"
# ── document loading ─────────────────────────────────────────────────
def load_all_documents(system: System):
"""Load every resource file as a named document."""
count = 0
# Technical catalogs (CSVs)
tech_dir = RESOURCES / "technical_catalog"
for csv_file in sorted(tech_dir.glob("*.csv")):
name = f"tech:{csv_file.stem}"
system.load_document(name, str(csv_file))
count += 1
# Contracts
ctr_dir = RESOURCES / "contracts"
for md_file in sorted(ctr_dir.glob("*.md")):
name = f"contract:{md_file.stem}"
system.load_document(name, str(md_file))
count += 1
# Business catalog
biz_dir = RESOURCES / "business_catalog"
for md_file in sorted(biz_dir.glob("*.md")):
name = f"business:{md_file.stem}"
system.load_document(name, str(md_file))
count += 1
return count
# ── fact extraction ──────────────────────────────────────────────────
def _escape(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"')
def extract_technical_facts(system: System) -> dict:
"""Extract facts from technical catalog CSVs.
Returns {dataset_id: {name, path, table, cadence, source_type, provider, dept, doc_name}}
"""
tech_dir = RESOURCES / "technical_catalog"
datasets = {}
for csv_file in sorted(tech_dir.glob("*.csv")):
dept = csv_file.stem
doc_name = f"tech:{dept}"
with open(csv_file) as f:
reader = csv.DictReader(f)
for row in reader:
ds_id = row["dataset_id"]
safe_id = ds_id.lower().replace("-", "_")
datasets[ds_id] = {
"name": row["name"],
"path": row["storage_path"],
"table": row["table_name"],
"cadence": row["refresh_cadence"],
"source_type": row["source_type"],
"provider": row["provider"],
"owner": row["owner"],
"dept": dept,
"doc_name": doc_name,
"safe_id": safe_id,
}
# Emit facts
load_source(
system,
f"""
(fact {safe_id}-path "{_escape(row['storage_path'])}"
:evidence (evidence "{doc_name}"
:quotes ("{_escape(row['storage_path'])}")
:explanation "Storage path for {_escape(row['name'])}"))
(fact {safe_id}-cadence "{row['refresh_cadence']}"
:evidence (evidence "{doc_name}"
:quotes ("{row['refresh_cadence']}")
:explanation "Refresh cadence for {_escape(row['name'])}"))
(fact {safe_id}-table "{_escape(row['table_name'])}"
:evidence (evidence "{doc_name}"
:quotes ("{_escape(row['table_name'])}")
:explanation "Table name for {_escape(row['name'])}"))
(fact {safe_id}-source-type "{row['source_type']}"
:evidence (evidence "{doc_name}"
:quotes ("{row['source_type']}")
:explanation "Source type for {_escape(row['name'])}"))
(fact {safe_id}-owner "{_escape(row['owner'])}"
:evidence (evidence "{doc_name}"
:quotes ("{_escape(row['owner'])}")
:explanation "Owner of {_escape(row['name'])}"))
""",
)
return datasets
def extract_contract_facts(system: System) -> dict:
"""Extract facts from contract .md files.
Returns {provider_slug: {provider, datasets: [{dataset_id, permitted_use}],
sla, retention, classification, expiry, status, doc_name}}
"""
ctr_dir = RESOURCES / "contracts"
contracts = {}
for md_file in sorted(ctr_dir.glob("*.md")):
slug = md_file.stem
doc_name = f"contract:{slug}"
text = md_file.read_text()
# Parse fields
provider = _md_field(text, "Provider") or slug
sla = _md_field(text, "Refresh SLA") or "unknown"
retention = _md_field(text, "Retention Limit") or "unknown"
classification = _md_field(text, "Data Classification") or "unknown"
expiry = _md_field(text, "Expiry Date") or "unknown"
status = _md_field(text, "Status") or "unknown"
# Parse covered datasets table
covered = []
for m in re.finditer(r"\|\s*(DS-\d+)\s*\|\s*([^|]+)\s*\|\s*([^|]+)\s*\|", text):
covered.append(
{
"dataset_id": m.group(1).strip(),
"name": m.group(2).strip(),
"permitted_use": m.group(3).strip(),
}
)
safe_slug = slug.replace("-", "_")
contracts[slug] = {
"provider": provider,
"datasets": covered,
"sla": sla,
"retention": retention,
"classification": classification,
"expiry": expiry,
"status": status,
"doc_name": doc_name,
"safe_slug": safe_slug,
}
# Emit facts
load_source(
system,
f"""
(fact ctr-{safe_slug}-sla "{sla}"
:evidence (evidence "{doc_name}"
:quotes ("{sla}")
:explanation "Refresh SLA for {_escape(provider)} contract"))
(fact ctr-{safe_slug}-retention "{_escape(retention)}"
:evidence (evidence "{doc_name}"
:quotes ("{_escape(retention)}")
:explanation "Retention limit for {_escape(provider)} contract"))
(fact ctr-{safe_slug}-classification "{classification}"
:evidence (evidence "{doc_name}"
:quotes ("{classification}")
:explanation "Data classification for {_escape(provider)} contract"))
(fact ctr-{safe_slug}-expiry "{expiry}"
:evidence (evidence "{doc_name}"
:quotes ("{expiry}")
:explanation "Expiry date for {_escape(provider)} contract"))
(fact ctr-{safe_slug}-status "{status}"
:evidence (evidence "{doc_name}"
:quotes ("{status}")
:explanation "Contract status for {_escape(provider)}"))
""",
)
# Facts for each covered dataset
for ds in covered:
ds_safe = ds["dataset_id"].lower().replace("-", "_")
load_source(
system,
f"""
(fact ctr-{safe_slug}-covers-{ds_safe} true
:evidence (evidence "{doc_name}"
:quotes ("{_escape(ds['dataset_id'])}")
:explanation "{_escape(provider)} contract covers {_escape(ds['name'])}"))
(fact ctr-{safe_slug}-use-{ds_safe} "{ds['permitted_use']}"
:evidence (evidence "{doc_name}"
:quotes ("{_escape(ds['permitted_use'])}")
:explanation "Permitted use for {_escape(ds['name'])} under {_escape(provider)}"))
""",
)
return contracts
def extract_business_facts(system: System) -> dict:
"""Extract facts from business catalog .md files.
Returns {product_slug: {name, owner, classification, refresh, sources: [{dataset_id}], doc_name}}
"""
biz_dir = RESOURCES / "business_catalog"
products = {}
for md_file in sorted(biz_dir.glob("*.md")):
slug = md_file.stem
doc_name = f"business:{slug}"
text = md_file.read_text()
name = _md_heading(text) or slug
owner = _md_field(text, "Owner") or "unknown"
classification = _md_field(text, "Classification") or "unknown"
refresh = _md_field(text, "Refresh Frequency") or "unknown"
retention_raw = _md_field(text, "Data Retention") or "unlimited"
retention_days = 0
ret_m = re.search(r"(\d+)\s*days", retention_raw)
if ret_m:
retention_days = int(ret_m.group(1))
# Parse source datasets table (6 columns: id, name, domain, source_type, storage_path, table_name)
sources = []
for m in re.finditer(
r"\|\s*(DS-\d+)\s*\|\s*([^|]+)\s*\|\s*([^|]+)\s*\|\s*([^|]+)\s*\|\s*([^|]+)\s*\|\s*([^|]+)\s*\|",
text,
):
sources.append(
{
"dataset_id": m.group(1).strip(),
"name": m.group(2).strip(),
"domain": m.group(3).strip(),
"source_type": m.group(4).strip(),
"storage_path": m.group(5).strip(),
"table_name": m.group(6).strip(),
}
)
safe_slug = slug.replace("-", "_")
products[slug] = {
"name": name,
"owner": owner,
"classification": classification,
"refresh": refresh,
"retention_days": retention_days,
"sources": sources,
"doc_name": doc_name,
"safe_slug": safe_slug,
}
# Emit facts
load_source(
system,
f"""
(fact bp-{safe_slug}-owner "{_escape(owner)}"
:evidence (evidence "{doc_name}"
:quotes ("{_escape(owner)}")
:explanation "Owner of {_escape(name)}"))
(fact bp-{safe_slug}-classification "{classification}"
:evidence (evidence "{doc_name}"
:quotes ("{classification}")
:explanation "Classification of {_escape(name)}"))
(fact bp-{safe_slug}-refresh "{refresh}"
:evidence (evidence "{doc_name}"
:quotes ("{refresh}")
:explanation "Refresh frequency of {_escape(name)}"))
""",
)
# Facts for each source dataset reference
for src in sources:
ds_safe = src["dataset_id"].lower().replace("-", "_")
load_source(
system,
f"""
(fact bp-{safe_slug}-uses-{ds_safe} true
:evidence (evidence "{doc_name}"
:quotes ("{_escape(src['dataset_id'])}")
:explanation "{_escape(name)} uses {_escape(src['name'])}"))
""",
)
return products
def _md_field(text: str, label: str) -> str | None:
m = re.search(rf"\*\*{re.escape(label)}\*\*:\s*(.+)", text)
return m.group(1).strip() if m else None
def _md_heading(text: str) -> str | None:
m = re.search(r"^# .+:\s*(.+)", text, re.MULTILINE)
return m.group(1).strip() if m else None
# ── cross-layer checks ──────────────────────────────────────────────
def check_contract_coverage(system: System, datasets: dict, contracts: dict):
"""Every external dataset should have a contract covering it."""
# Build set of dataset_ids covered by any contract
covered_ids = set()
for ctr in contracts.values():
for ds in ctr["datasets"]:
covered_ids.add(ds["dataset_id"])
issues = []
for ds_id, ds in datasets.items():
if ds["source_type"] == "external":
safe_id = ds["safe_id"]
has_contract = ds_id in covered_ids
load_source(
system,
f"""
(fact {safe_id}-has-contract {"true" if has_contract else "false"}
:origin "Derived from contract coverage analysis")
""",
)
if not has_contract:
issues.append(f" MISSING CONTRACT: {ds_id} ({ds['name']}) from {ds['provider']}")
load_source(
system,
f"""
(derive {safe_id}-coverage-gap
(= {safe_id}-has-contract false)
:using ({safe_id}-has-contract))
""",
)
return issues
def check_sla_consistency(system: System, datasets: dict, contracts: dict):
"""Contract refresh SLA should match technical cadence for covered datasets."""
cadence_order = {"real-time": 0, "hourly": 1, "daily": 2, "weekly": 3, "monthly": 4, "quarterly": 5}
issues = []
for ctr_slug, ctr in contracts.items():
safe_slug = ctr["safe_slug"]
for ds_entry in ctr["datasets"]:
ds_id = ds_entry["dataset_id"]
if ds_id not in datasets:
continue
ds = datasets[ds_id]
safe_id = ds["safe_id"]
tech_cadence = ds["cadence"]
ctr_sla = ctr["sla"]
matches = tech_cadence == ctr_sla
if not matches:
issues.append(
f" SLA MISMATCH: {ds_id} ({ds['name']}) — " f"technical={tech_cadence}, contract={ctr_sla}"
)
load_source(
system,
f"""
(diff sla-check-{safe_id}
:replace {safe_id}-cadence
:with ctr-{safe_slug}-sla)
""",
)
return issues
def check_referential_integrity(system: System, datasets: dict, products: dict):
"""Business product source datasets should exist in the technical catalog."""
issues = []
for bp_slug, bp in products.items():
safe_slug = bp["safe_slug"]
for src in bp["sources"]:
ds_id = src["dataset_id"]
ds_safe = ds_id.lower().replace("-", "_")
exists = ds_id in datasets
if not exists:
issues.append(
f" PHANTOM REFERENCE: {bp['name']} references {ds_id} "
f"({src['name']}) — not in technical catalog"
)
load_source(
system,
f"""
(fact {ds_safe}-exists-in-tech false
:origin "Dataset {ds_id} not found in any technical catalog CSV")
(derive bp-{safe_slug}-phantom-{ds_safe}
(= {ds_safe}-exists-in-tech false)
:using ({ds_safe}-exists-in-tech))
""",
)
return issues
def check_refresh_consistency(system: System, datasets: dict, products: dict):
"""Business product refresh should match fastest source dataset cadence."""
cadence_order = {"real-time": 0, "hourly": 1, "daily": 2, "weekly": 3, "monthly": 4, "quarterly": 5}
issues = []
for bp_slug, bp in products.items():
safe_slug = bp["safe_slug"]
bp_refresh = bp["refresh"]
# Find fastest source cadence
source_cadences = []
for src in bp["sources"]:
ds_id = src["dataset_id"]
if ds_id in datasets:
source_cadences.append(datasets[ds_id]["cadence"])
if not source_cadences:
continue
fastest = min(source_cadences, key=lambda c: cadence_order.get(c, 99))
if bp_refresh != fastest:
issues.append(f" REFRESH MISMATCH: {bp['name']} says {bp_refresh}, " f"fastest source is {fastest}")
return issues
def check_expired_contracts(system: System, contracts: dict):
"""Flag contracts with status=expired that still cover active datasets."""
issues = []
for ctr_slug, ctr in contracts.items():
if ctr["status"] == "expired":
ds_names = ", ".join(d["name"] for d in ctr["datasets"])
issues.append(
f" EXPIRED CONTRACT: {ctr['provider']} (expired {ctr['expiry']}) " f"still covers: {ds_names}"
)
return issues
def check_classification_conflicts(system: System, products: dict, contracts: dict):
"""Business product classification should not be weaker than contract classification."""
strength = {"restricted": 0, "confidential": 1, "internal": 2, "public": 3}
issues = []
# Build dataset_id → contract classification map
ds_contract_class = {}
for ctr in contracts.values():
for ds in ctr["datasets"]:
ds_contract_class[ds["dataset_id"]] = ctr["classification"]
for bp_slug, bp in products.items():
bp_class = bp["classification"]
bp_strength = strength.get(bp_class, 99)
for src in bp["sources"]:
ds_id = src["dataset_id"]
if ds_id in ds_contract_class:
ctr_class = ds_contract_class[ds_id]
ctr_strength = strength.get(ctr_class, 99)
if bp_strength > ctr_strength:
issues.append(
f" CLASSIFICATION CONFLICT: {bp['name']} is '{bp_class}' "
f"but source {ds_id} contract requires '{ctr_class}'"
)
return issues
def check_path_drift(system: System, datasets: dict, products: dict):
"""Business catalog storage_path should match technical catalog storage_path."""
issues = []
for bp_slug, bp in products.items():
for src in bp["sources"]:
ds_id = src["dataset_id"]
if ds_id not in datasets:
continue
bp_path = src.get("storage_path", "")
tech_path = datasets[ds_id]["path"]
if bp_path and bp_path != tech_path:
issues.append(
f" PATH DRIFT: {bp['name']} expects {ds_id} at {bp_path}, " f"technical catalog says {tech_path}"
)
return issues
def check_table_rename(system: System, datasets: dict, products: dict):
"""Business catalog table_name should match technical catalog table_name."""
issues = []
for bp_slug, bp in products.items():
for src in bp["sources"]:
ds_id = src["dataset_id"]
if ds_id not in datasets:
continue
bp_table = src.get("table_name", "")
tech_table = datasets[ds_id]["table"]
if bp_table and bp_table != tech_table:
issues.append(
f" TABLE RENAME: {bp['name']} expects {ds_id} table '{bp_table}', "
f"technical catalog says '{tech_table}'"
)
return issues
def check_retention_conflicts(system: System, products: dict, contracts: dict):
"""Business product retention should not exceed contract retention limits."""
issues = []
# Build dataset_id → minimum contract retention
ds_retention: dict[str, int] = {}
for ctr in contracts.values():
ret_m = re.search(r"(\d+)", ctr["retention"])
if not ret_m:
continue
ctr_days = int(ret_m.group(1))
for ds in ctr["datasets"]:
existing = ds_retention.get(ds["dataset_id"])
if existing is None or ctr_days < existing:
ds_retention[ds["dataset_id"]] = ctr_days
for bp_slug, bp in products.items():
bp_retention = bp.get("retention_days", 0)
if bp_retention == 0:
continue
for src in bp["sources"]:
ds_id = src["dataset_id"]
if ds_id in ds_retention:
ctr_ret = ds_retention[ds_id]
if bp_retention > ctr_ret:
issues.append(
f" RETENTION CONFLICT: {bp['name']} claims {bp_retention} days "
f"but {ds_id} contract limits to {ctr_ret} days"
)
return issues
def check_owner_department(system: System, products: dict):
"""Business product owner should belong to the product's primary department."""
dept_owners = {
"discovery": {"Dr. Elena Rossi", "Dr. James Okafor", "Dr. Wei Zhang", "Dr. Priya Sharma"},
"translational": {"Dr. Sarah Chen", "Dr. Marcus Rivera", "Dr. Yuki Tanaka", "Dr. Amir Hassan"},
"clinical": {"Dr. Lisa Patel", "Dr. Robert Kim", "Dr. Fatima Al-Said", "Dr. Thomas Weber"},
"commercial": {"Jennifer Liu", "Michael Torres", "Anna Kowalski", "David Osei"},
}
issues = []
for bp_slug, bp in products.items():
owner = bp["owner"]
# Determine primary department from source domains
domain_counts: dict[str, int] = {}
for src in bp["sources"]:
d = src.get("domain", "")
if d:
domain_counts[d] = domain_counts.get(d, 0) + 1
if not domain_counts:
continue
primary_dept = max(domain_counts, key=domain_counts.get)
if primary_dept in dept_owners and owner not in dept_owners[primary_dept]:
# Find which dept the owner actually belongs to
actual_dept = "unknown"
for dept, owners in dept_owners.items():
if owner in owners:
actual_dept = dept
break
issues.append(
f" OWNER DRIFT: {bp['name']} primary dept is {primary_dept} "
f"but owner '{owner}' belongs to {actual_dept}"
)
return issues
# ── main ─────────────────────────────────────────────────────────────
def main():
plog = logging.getLogger("parseltongue")
plog.setLevel(logging.WARNING)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(" [%(levelname)s] %(message)s"))
plog.addHandler(handler)
# Regenerate data with corruptions (idempotent, deterministic seed)
from generate import main as generate_main
sys.argv = ["generate", "--clean"]
generate_main()
sys.argv = sys.argv[:1]
system = System(overridable=True, effects=GOVERNANCE_EFFECTS)
print("=" * 72)
print("Parseltongue — Biopharma Data Governance Cross-Layer Consistency")
print("=" * 72)
# Phase 0: Load documents
print("\n--- Phase 0: Load all documents ---")
n_docs = load_all_documents(system)
print(f" Loaded {n_docs} documents")
# Phase 1: Extract technical facts
print("\n--- Phase 1: Extract technical catalog facts ---")
datasets = extract_technical_facts(system)
n_internal = sum(1 for d in datasets.values() if d["source_type"] == "internal")
n_external = sum(1 for d in datasets.values() if d["source_type"] == "external")
print(f" {len(datasets)} datasets ({n_internal} internal, {n_external} external)")
# Phase 2: Extract contract facts
print("\n--- Phase 2: Extract contract facts ---")
contracts = extract_contract_facts(system)
n_covered = sum(len(c["datasets"]) for c in contracts.values())
print(f" {len(contracts)} contracts covering {n_covered} datasets")
# Phase 3: Extract business catalog facts
print("\n--- Phase 3: Extract business catalog facts ---")
products = extract_business_facts(system)
n_sources = sum(len(p["sources"]) for p in products.values())
print(f" {len(products)} data products referencing {n_sources} source datasets")
# Phase 4: Cross-layer checks
print("\n--- Phase 4: Cross-layer consistency checks ---")
all_issues = []
print("\n [Check 1] Contract coverage (every external dataset has a contract)")
issues = check_contract_coverage(system, datasets, contracts)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All external datasets have contracts")
print("\n [Check 2] SLA consistency (contract SLA matches technical cadence)")
issues = check_sla_consistency(system, datasets, contracts)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All SLAs match technical cadences")
print("\n [Check 3] Referential integrity (business products reference existing datasets)")
issues = check_referential_integrity(system, datasets, products)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All business product references are valid")
print("\n [Check 4] Refresh consistency (business refresh matches source cadences)")
issues = check_refresh_consistency(system, datasets, products)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All refresh frequencies are consistent")
print("\n [Check 5] Expired contracts")
issues = check_expired_contracts(system, contracts)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ No expired contracts")
print("\n [Check 6] Classification conflicts (business vs contract classification)")
issues = check_classification_conflicts(system, products, contracts)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ No classification conflicts")
print("\n [Check 7] Path drift (business catalog paths vs technical catalog)")
issues = check_path_drift(system, datasets, products)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All storage paths are consistent")
print("\n [Check 8] Table rename (business catalog tables vs technical catalog)")
issues = check_table_rename(system, datasets, products)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All table names are consistent")
print("\n [Check 9] Retention conflicts (business retention vs contract limits)")
issues = check_retention_conflicts(system, products, contracts)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ No retention conflicts")
print("\n [Check 10] Owner department alignment")
issues = check_owner_department(system, products)
all_issues.extend(issues)
for i in issues:
print(i)
if not issues:
print(" ✓ All product owners match their department")
# Phase 5: System consistency report
print("\n--- Phase 5: Parseltongue consistency report ---")
report = system.consistency()
print(f" {report}")
# Summary
print("\n" + "=" * 72)
print(f"Data estate: {len(datasets)} datasets, {len(contracts)} contracts, {len(products)} products")
print(f"Issues found: {len(all_issues)}")
if all_issues:
by_type = {}
for issue in all_issues:
# Extract the type (first word after the indent)
parts = issue.strip().split(":")
itype = parts[0] if parts else "OTHER"
by_type[itype] = by_type.get(itype, 0) + 1
for itype, count in sorted(by_type.items()):
print(f" {itype}: {count}")
# Cross-check against manifest if available
manifest_path = Path(__file__).parent / "manifest.json"
if manifest_path.exists():
manifest = json.loads(manifest_path.read_text())
n_injected = len(manifest["corruptions"])
n_found = len(all_issues)
print(f"\nManifest: {n_injected} corruptions were injected")
print(f"Detection coverage: {n_found}/{n_injected} issues surfaced")
if n_found > n_injected:
print(f" ({n_found - n_injected} extra — single corruptions cascade across layers)")
print(f"\nFinal system: {system}")
if __name__ == "__main__":
main()
B# Data Governance Demo — Cross-Layer Consistency
A biopharma company maintains three independent layers of data governance:
| Layer | Format | Contains |
|-------|--------|----------|
| **Technical catalog** | CSV | What actually exists in the data platform — paths, tables, cadences, owners |
| **Contracts** | Markdown | Legal agreements with external data providers — SLAs, retention, classification |
| **Business catalog** | Markdown | Business-facing data product descriptions — sources, refresh, classification |
Each layer is authored and maintained independently. Over time, drift accumulates: paths change, contracts expire, business entries reference datasets that no longer exist, refresh frequencies diverge.
## What Parseltongue does
The checker (`checker.pltg`) loads all three layers as documents, discovers entities at runtime via effects (`csv-rows`, `regex-match`, `list-tree-paths`, `doc-text`), and runs 9 cross-layer compliance checks — entirely through axiom pattern matching and splat reductions:
1. **Contract coverage** — every external dataset must appear in at least one contract
2. **Expired contracts** — expired contracts revoke all permitted uses
3. **Phantom references** — business products must not reference non-existent datasets
4. **Classification propagation** — product classification must be at least as restrictive as its most restrictive source
5. **Omics classification** — genomic/proteomic/transcriptomic data must be classified restricted
6. **Technical cross-check** — CSV fields must match generated facts
7. **Contract cross-check** — contract status facts must match document extraction
8. **Business cross-check** — product classification facts must match document extraction
9. **Final policy** — all checks ANDed into a single `policy-consistent` theorem
The final `policy-check` diff compares the derived `policy-consistent` result against the expected `true`.
## The vital stain
The visualization uses a **vital stain** — a runtime execution trace that captures the actual dependency edges as Parseltongue evaluates the checker. This is not static analysis. Every node in the graph is a real evaluated fact, and every edge is a real resolution that happened during execution. The stain propagates through the provenance chain, showing exactly which upstream facts contributed to each compliance decision.
## Quick start
```bash
pip install -e .
```
### Run the shell script
```bash
cd parseltongue/core/demos/data_governance_pltg
./run_viz.sh
```
This will:
1. Generate a **clean** (consistent) data estate and open its provenance visualization
2. Wait for you to press Enter
3. Inject ~15-20% corruptions across all three layers and open the corrupted visualization
4. Compare the two side by side
### Run manually
```bash
# Generate consistent baseline
python generate.py --clean --consistent-only
# Start bench with effects
pg-bench serve checker.pltg --effects parseltongue.core.demos.data_governance_pltg.operators:GOVERNANCE_EFFECTS &
pg-bench wait
# Generate visualization
pg-bench eval '(fmt "viz" (scope hologram (dissect (stain policy-check))))' > viz-results/clean.html
open viz-results/clean.html
# Now inject corruptions and regenerate
python generate.py --clean
# Restart bench (runtime data changed)
pkill -f pg-bench; sleep 1
pg-bench serve checker.pltg --effects parseltongue.core.demos.data_governance_pltg.operators:GOVERNANCE_EFFECTS &
pg-bench wait
pg-bench eval '(fmt "viz" (scope hologram (dissect (stain policy-check))))' > viz-results/corrupt.html
open viz-results/corrupt.html
```
## Files
| File | Role |
|------|------|
| `checker.pltg` | Hand-written. The compliance checker — imports manifest + policy rules, runs all 9 checks |
| `policy_rules.pltg` | Hand-written. Axioms for compliance predicates (`contract-ok`, `class-ok`, `sla-ok`, etc.) |
| `util.pltg` | Hand-written. Utility axioms (`concat`, `resolve-all`, `cons-prepend`) |
| `main.pltg` | Hand-written. Entry point that imports checker |
| `operators.py` | Hand-written. Python effects: `csv-rows`, `regex-match`, `list-tree-paths`, `doc-text`, `s` |
| `generate.py` | Hand-written. Synthetic data generator — consistent baseline + corruption injection |
| `demo.py` | Hand-written. Python-side consistency checker (alternative to the .pltg checker) |
| `resources/` | **Generated.** Governance policy/protocol docs (hand-written), catalogs and contracts (generated) |
| `src/` | **Generated.** .pltg fact modules extracted from resources by `generate.py` |
| `manifest.json` | **Generated.** Log of every injected corruption |
## What corruptions look like
The generator (`generate.py`) injects ~15-20% corruptions:
- **Path drift** — business catalog says `s3://old-path`, technical catalog says `s3://new-path`
- **Table rename** — business references old table name, tech catalog has the new one
- **Phantom references** — business product references a dataset ID that doesn't exist in tech catalog
- **Contract expiry** — contract status flipped to "expired"
- **Classification downgrade** — business product classification weakened below contract requirement
- **SLA mismatch** — contract SLA doesn't match technical refresh cadence
- **Owner drift** — product owner changed to someone from the wrong department
Every corruption is logged to `manifest.json` with the exact field, old value, and new value.
; ==========================================================
; Data Governance Checker
; ==========================================================
;
; Hand-written. Imports the generated manifest (all facts),
; the policy rules (axioms + predicates + splat reductions),
; and the governance documents. Uses general-purpose operators
; (csv-rows, regex-match, list-tree-paths, doc-text) to
; discover entities from raw source documents, then fans out
; compliance checks via axiom pattern matching and folds
; results with splat reductions.
;
; The checker is independent of HOW facts are generated —
; it discovers what exists and validates against policy.
;
; *** LIB PATH ***
;
; This demo requires lib_paths=[parseltongue/core/] so the
; loader qualifies sub-module facts with the full dotted path
; (e.g. src.manifest.technical.ds_5385-source-type). Without
; lib_paths the facts lack the "src.manifest." prefix and the
; (s ...) effect can't resolve them. pg-bench supplies this
; automatically via Bench.STD_PATH; standalone scripts must
; pass lib_paths=[core_dir] to LazyLoader.
;
; *** CRITICAL: :using IN DERIVES ***
;
; :using is NOT a convenience annotation — it controls which axioms
; are in scope during rewrite-based evaluation. Adding unnecessary
; axioms to :using causes INCORRECT structural rewrites: an axiom
; you didn't intend to fire will pattern-match your intermediate
; expressions and destructure them before they evaluate.
;
; Rules:
; 1. MINIMAL :using — only the axioms/terms the derive actually needs.
; Deps expand transitively, so you never need to list indirect deps.
; 2. Do NOT dump "all the things" into :using. Every extra axiom is
; a potential unwanted rewrite that silently corrupts evaluation.
; 3. Use (strict ...) to force evaluation of subexpressions BEFORE
; they reach axiom pattern matching. Axiom matching is structural:
; (strict X) as an arg to a non-callable head gets force-evaluated
; at the call site (engine line 884), producing a concrete value
; that the next axiom sees instead of an unevaluated expression.
; 4. Nest stricts when needed: (strict (util.concat "" (strict (regex-match ...)) ""))
; — inner strict forces regex-match before concat, outer strict forces
; concat before the consuming axiom's pattern matching.
;
; ==========================================================
(load-document "governance_protocol" "resources/governance_protocol.md")
(load-document "governance_policy" "resources/governance_policy.md")
; Import all generated facts via manifest
(import (quote src.manifest))
; Import policy axioms + compliance predicates + splat reductions
(import (quote policy_rules))
; Import utility axioms (unwrap, fact-name builders)
(import (quote util))
; Re-export policy_rules terms into local scope
(defterm contract-ok policy_rules.contract-ok
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "per-dataset contract compliance predicate"))
(defterm class-ok policy_rules.class-ok
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "per-source classification compliance predicate"))
(defterm contract-valid policy_rules.contract-valid
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "contract validity predicate"))
(defterm cadence-rank policy_rules.cadence-rank
:evidence (evidence "governance_policy"
:quotes ("the contractual refresh SLA for a dataset must be at least as frequent as the technical refresh cadence")
:explanation "cadence frequency ranking for SLA comparison"))
(defterm sla-ok policy_rules.sla-ok
:evidence (evidence "governance_policy"
:quotes ("If the technical cadence is faster than the contractual SLA, the platform may be pulling data more frequently than the provider guarantees, creating a reliability risk.")
:explanation "SLA alignment predicate"))
(defterm retention-ok policy_rules.retention-ok
:evidence (evidence "governance_policy"
:quotes ("Data products must not retain records longer than the shortest retention limit specified by any contract covering their source datasets.")
:explanation "retention limit compliance predicate"))
(defterm all-true policy_rules.all-true
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "variadic AND — all items must be compliant"))
(defterm count-violations policy_rules.count-violations
:evidence (evidence "governance_policy"
:quotes ("A commercial dataset without a valid contract, or with an expired contract, is non-compliant and must be quarantined.")
:explanation "variadic violation counter"))
(defterm any-true policy_rules.any-true
:evidence (evidence "governance_policy"
:quotes ("Data products consuming datasets covered only by expired contracts must be flagged for review.")
:explanation "variadic OR — any violation triggers flag"))
; ── fact-name helpers: use util.concat to build namespaced fact names ──
; These replace inline (+ (strict (fact-X (strict (regex-match ...)))) "-suffix") chains.
; Each takes a raw ID/doc-name and a suffix, returns the resolved fact name string.
(defterm tech-fact
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "build and resolve technical fact name from dataset ID"))
(axiom tech-fact-rule
(= (tech-fact ?id ?suffix)
(s (util.concat "src.manifest.technical.ds_" (strict (regex-match "\\d+" ?id)) ?suffix)))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "extract numeric ID, build fact name, resolve via s"))
(defterm ctr-fact
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "build and resolve contract fact name from doc name"))
(axiom ctr-fact-rule
(= (ctr-fact ?doc-name ?suffix)
(s (util.concat "src.manifest.contracts.ctr-" (strict (regex-match "contract:(.*)" ?doc-name)) ?suffix)))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "extract slug, build fact name, resolve via s"))
(defterm bp-fact
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "build and resolve business product fact name from doc name"))
(axiom bp-fact-rule
(= (bp-fact ?doc-name ?suffix)
(s (util.concat "src.manifest.business.bp-" (strict (regex-match "business:(.*)" ?doc-name)) ?suffix)))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "extract slug, build fact name, resolve via s"))
; ==========================================================
; HOW FACT RETRIEVAL AND RESOLUTION WORKS
; ==========================================================
;
; The manifest (src.manifest) generates flat facts from raw documents:
; technical.ds_5385-source-type = "external"
; contracts.ctr-10x_genomics-classification = "restricted"
; business.bp-genomic_variant_panel-classification = "restricted"
;
; The checker discovers entities at runtime from the same raw documents
; via effects (csv-rows, regex-match, list-tree-paths, doc-text).
; It needs to resolve manifest facts by name — but names depend on
; IDs extracted from the raw docs, so they can't be hardcoded.
;
; Pipeline for a single fact:
; 1. regex-match extracts the variable part from raw data
; e.g. (regex-match "\\d+" "DS-5385") → ("5385")
; 2. strict forces the regex-match effect to evaluate NOW,
; before axiom pattern matching tries to match the result
; 3. util.concat maps prefix+suffix over the regex result list:
; (util.concat "technical.ds_" ("5385") "-source-type")
; → "technical.ds_5385-source-type"
; 4. s resolves the constructed name string as a Symbol in the
; engine env, returning the fact's value: "external"
;
; For multi-match (regex returns multiple hits):
; (regex-match "DS-\\d+" text) → ("DS-1488" "DS-6408")
; util.concat maps over the list:
; (util.concat "technical.ds_" ("1488" "6408") "-source-type")
; → ("technical.ds_1488-source-type" "technical.ds_6408-source-type")
; util.resolve-all maps s over the name list:
; (util.resolve-all (...)) → (value1 value2)
;
; The three helpers — tech-fact, ctr-fact, bp-fact — encapsulate
; this pipeline. Each takes a raw identifier and a suffix:
; (tech-fact "DS-5385" "-source-type") → "external"
; (ctr-fact "contract:10x_genomics" "-classification") → "restricted"
; (bp-fact "business:genomic_variant_panel" "-classification") → "restricted"
;
; OLD PATTERN (sections 6, 8 still use this — to be replaced):
; (s (+ (strict (fact-id (strict (regex-match ...)))) "-suffix"))
; Problems: wrong "src.manifest." prefix, manual + chaining,
; fact-id/fact-ctr-prefix/fact-bp-prefix each only handle
; single-element regex results (no multi-match support).
;
; NEW PATTERN (use tech-fact/ctr-fact/bp-fact):
; (tech-fact ?id "-source-type")
; Handles single AND multi-match via util.concat internally.
; ==========================================================
; STRATEGY FOR EACH COMPLIANCE CHECK
; ==========================================================
;
; ── 3. CONTRACT COVERAGE (per-row, already implemented) ──
; For each CSV row: if source-type = "external", search all
; contract text for the dataset ID. Uses regex-match + if/true/false.
; No fact resolution needed — pure document search.
; STATUS: Done, working. No changes needed.
;
; ── 4. EXPIRED CONTRACTS (per-doc, already implemented) ──
; For each contract doc: regex for "expired" in doc text.
; Applies contract-valid predicate. Pure document search.
; STATUS: Done, working. No changes needed.
;
; ── 5. PHANTOM REFERENCES (cross-catalog, already implemented) ──
; Extract all DS-NNNN from business docs, check each exists
; in tech catalog text. Pure document search.
; STATUS: Done, working. No changes needed.
;
; ── 6. CLASSIFICATION PROPAGATION (cross-match, needs update) ──
; For each business product: extract source DS-IDs, find which
; contract covers each, get contract classification, check against
; product classification via class-ok.
; CURRENT: Uses fact-bp-prefix + fact-ctr-prefix with wrong
; "src.manifest." prefix and manual (+ (strict ...) "-suffix").
; FIX: Replace with (ctr-fact ?doc-name "-classification") and
; (bp-fact ?product-doc "-classification"). The inner fold
; find-ds-contract-class scans contracts for a DS-ID and resolves
; the covering contract's classification — replace its s+fact-ctr-prefix
; chain with ctr-fact. check-product-class-rule resolves the product's
; own classification — replace its s+fact-bp-prefix chain with bp-fact.
;
; ── 7. OMICS CLASSIFICATION (per-contract, needs update) ──
; For each contract: regex for omics keywords, if present check
; classification = "restricted" via omics-ok predicate.
; CURRENT: Uses fact-ctr-prefix with wrong prefix.
; FIX: Replace (s (+ (strict (fact-ctr-prefix ...)) "-classification"))
; with (ctr-fact ?doc-name "-classification").
;
; ── 8a. TECH CATALOG CROSS-CHECK (per-row, needs update) ──
; For each CSV row: resolve 5 facts by constructed name, compare
; to positionally-extracted CSV values.
; CURRENT: Uses fact-id with wrong "src.manifest." prefix and
; 5 repeated (s (+ (strict (fact-id ...)) "-suffix")) chains.
; FIX: Replace each with (tech-fact ?id "-suffix"):
; (= ?stype (tech-fact ?id "-source-type"))
; (= ?cadence (tech-fact ?id "-cadence"))
; etc. Cleaner AND handles multi-match if needed.
;
; ── 8b. CONTRACT CROSS-CHECK (per-doc, needs update) ──
; For each contract doc: resolve status fact, apply contract-valid.
; CURRENT: Uses fact-ctr-prefix with wrong prefix.
; FIX: Replace with (ctr-fact ?doc-name "-status").
;
; ── 8c. BUSINESS CROSS-CHECK (per-doc, needs update) ──
; For each business doc: resolve classification fact, apply class-ok.
; CURRENT: Uses fact-bp-prefix with wrong prefix.
; FIX: Replace with (bp-fact ?product-doc "-classification").
;
; ── 9. FINAL POLICY (already implemented) ──
; ANDs all check results. No fact resolution.
; STATUS: Done. Will work once upstream checks are fixed.
; NOTE: policy-consistent currently has a loader error because
; _build_restricted_env re-evaluates theorem WFFs containing
; strict, hitting bare symbol. Fixing sections 6-8 to use
; the new helpers (which still use strict internally) won't
; fix this — the _build_restricted_env issue is an engine bug.
; ── csv-rows tag ──
(defterm dx
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "csv-rows tag for dataset rows"))
; ══════════════════════════════════════════════════════════
; 1. DISCOVERY — what entities exist in the raw documents?
; ══════════════════════════════════════════════════════════
; Discover technical catalog CSV documents
(defterm tech-docs
(list-tree-paths "tech:*")
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "technical catalog document names"))
; Parse each CSV — tagged rows with all columns
; CSV columns: dataset_id, name, storage_path, table_name, schema_columns,
; format, refresh_cadence, source_type, provider, owner,
; created_date, row_count
(defterm discovery-rows
(csv-rows "tech:discovery" "dx")
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "discovery department rows"))
(defterm translational-rows
(csv-rows "tech:translational" "dx")
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "translational department rows"))
(defterm clinical-rows
(csv-rows "tech:clinical" "dx")
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "clinical department rows"))
(defterm commercial-rows
(csv-rows "tech:commercial" "dx")
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "commercial department rows"))
; Discover contract documents
(defterm contract-docs
(list-tree-paths "contract:*")
:evidence (evidence "governance_protocol"
:quotes ("Contract markdown files are parsed for provider metadata")
:explanation "contract document names"))
; Discover business catalog documents
(defterm business-docs
(list-tree-paths "business:*")
:evidence (evidence "governance_protocol"
:quotes ("Business catalog markdown files are parsed for product metadata")
:explanation "business catalog document names"))
; Combined document texts for cross-document searches
(defterm all-contract-text
(doc-text "contract:*")
:evidence (evidence "governance_protocol"
:quotes ("Contract markdown files are parsed for provider metadata")
:explanation "combined text of all contract documents"))
(defterm all-tech-text
(doc-text "tech:*")
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "combined text of all technical catalog documents"))
(defterm all-business-text
(doc-text "business:*")
:evidence (evidence "governance_protocol"
:quotes ("Business catalog markdown files are parsed for product metadata")
:explanation "combined text of all business catalog documents"))
; ══════════════════════════════════════════════════════════
; 2. PER-ROW EXTRACTION — pull fields from CSV tagged rows
; ══════════════════════════════════════════════════════════
; Extract dataset ID from a tagged CSV row
(defterm row-id
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "dataset_id from row"))
(axiom row-id-rule
(= (row-id (dx ?id ?name ?path ?table ?schema ?fmt ?cadence ?stype ?provider ?owner ?created ?rows))
?id)
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "First column is the dataset identifier"))
; Extract source-type from a tagged CSV row
(defterm row-source-type
:evidence (evidence "governance_protocol"
:quotes ("{id}-source-type")
:explanation "source_type from row"))
(axiom row-source-type-rule
(= (row-source-type (dx ?id ?name ?path ?table ?schema ?fmt ?cadence ?stype ?provider ?owner ?created ?rows))
?stype)
:evidence (evidence "governance_protocol"
:quotes ("{id}-source-type")
:explanation "source_type column from technical catalog"))
; Extract cadence from a tagged CSV row
(defterm row-cadence
:evidence (evidence "governance_protocol"
:quotes ("{id}-cadence")
:explanation "refresh_cadence from row"))
(axiom row-cadence-rule
(= (row-cadence (dx ?id ?name ?path ?table ?schema ?fmt ?cadence ?stype ?provider ?owner ?created ?rows))
?cadence)
:evidence (evidence "governance_protocol"
:quotes ("{id}-cadence")
:explanation "refresh_cadence column from technical catalog"))
; Extract path from a tagged CSV row
(defterm row-path
:evidence (evidence "governance_protocol"
:quotes ("{id}-path")
:explanation "storage_path from row"))
(axiom row-path-rule
(= (row-path (dx ?id ?name ?path ?table ?schema ?fmt ?cadence ?stype ?provider ?owner ?created ?rows))
?path)
:evidence (evidence "governance_protocol"
:quotes ("{id}-path")
:explanation "storage_path column from technical catalog"))
; Extract table from a tagged CSV row
(defterm row-table
:evidence (evidence "governance_protocol"
:quotes ("{id}-table")
:explanation "table_name from row"))
(axiom row-table-rule
(= (row-table (dx ?id ?name ?path ?table ?schema ?fmt ?cadence ?stype ?provider ?owner ?created ?rows))
?table)
:evidence (evidence "governance_protocol"
:quotes ("{id}-table")
:explanation "table_name column from technical catalog"))
; ══════════════════════════════════════════════════════════
; 3. CONTRACT COVERAGE — per external dataset
; ══════════════════════════════════════════════════════════
;
; For each CSV row: external datasets must appear in at
; least one contract document (coverage + permitted use).
; Internal datasets are exempt.
(defterm check-row-contract
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract with valid start and end dates and a specified permitted-use policy.")
:explanation "per-row contract compliance"))
(axiom check-row-contract-external
(= (check-row-contract (dx ?id ?name ?path ?table ?schema ?fmt ?cadence "external" ?provider ?owner ?created ?rows))
(contract-ok "external"
(if (regex-match ?id all-contract-text) true false)
(if (regex-match ?id all-contract-text) true false)))
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract with valid start and end dates and a specified permitted-use policy.")
:explanation "External datasets require contract coverage — search all contract docs for dataset ID"))
(axiom check-row-contract-internal
(= (check-row-contract (dx ?id ?name ?path ?table ?schema ?fmt ?cadence "internal" ?provider ?owner ?created ?rows))
true)
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "Internal datasets are exempt from contract requirements"))
; Fold per department — splat recursion
(defterm check-dept-contracts
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "department contract compliance"))
(axiom check-dept-contracts-base
(= (check-dept-contracts (?row))
(check-row-contract ?row))
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "Single row: its own compliance"))
(axiom check-dept-contracts-step
(= (check-dept-contracts (?row ?...rest))
(and (check-row-contract ?row) (check-dept-contracts (?...rest))))
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "All rows must pass"))
; Per-department contract compliance
(derive discovery-contracts-ok
(check-dept-contracts (strict discovery-rows))
:using (discovery-rows check-dept-contracts check-dept-contracts-base
check-dept-contracts-step check-row-contract-external
check-row-contract-internal contract-ok policy_rules.contract-ok-rule
all-contract-text))
(derive translational-contracts-ok
(check-dept-contracts (strict translational-rows))
:using (translational-rows check-dept-contracts check-dept-contracts-base
check-dept-contracts-step check-row-contract-external
check-row-contract-internal contract-ok policy_rules.contract-ok-rule
all-contract-text))
(derive clinical-contracts-ok
(check-dept-contracts (strict clinical-rows))
:using (clinical-rows check-dept-contracts check-dept-contracts-base
check-dept-contracts-step check-row-contract-external
check-row-contract-internal contract-ok policy_rules.contract-ok-rule
all-contract-text))
(derive commercial-contracts-ok
(check-dept-contracts (strict commercial-rows))
:using (commercial-rows check-dept-contracts check-dept-contracts-base
check-dept-contracts-step check-row-contract-external
check-row-contract-internal contract-ok policy_rules.contract-ok-rule
all-contract-text))
; All departments
(derive all-contracts-ok
(all-true discovery-contracts-ok translational-contracts-ok
clinical-contracts-ok commercial-contracts-ok)
:using (discovery-contracts-ok translational-contracts-ok
clinical-contracts-ok commercial-contracts-ok
all-true policy_rules.all-true-base policy_rules.all-true-step))
; ══════════════════════════════════════════════════════════
; 4. EXPIRED CONTRACTS — contract validity check
; ══════════════════════════════════════════════════════════
;
; Extract "Status: ..." from each contract doc via regex,
; apply contract-valid predicate. Fold with all-true.
(defterm check-contract-valid
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "per-contract validity check"))
(axiom check-contract-valid-rule
(= (check-contract-valid ?doc-name)
(contract-valid
(if (regex-match "expired" (doc-text ?doc-name))
"expired"
"active")))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "Check contract status via regex on document text"))
; Fold over all contract docs
(defterm check-all-contracts-valid
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "all contracts validity fold"))
(axiom check-all-contracts-valid-base
(= (check-all-contracts-valid (?doc))
(check-contract-valid ?doc))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "Single contract: its own validity"))
(axiom check-all-contracts-valid-step
(= (check-all-contracts-valid (?doc ?...rest))
(and (check-contract-valid ?doc) (check-all-contracts-valid ?...rest)))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "All contracts must be valid"))
(derive all-contracts-valid
(check-all-contracts-valid (contract-docs))
:using (contract-docs check-all-contracts-valid check-all-contracts-valid-base
check-all-contracts-valid-step check-contract-valid check-contract-valid-rule
contract-valid policy_rules.contract-valid-rule))
; ══════════════════════════════════════════════════════════
; 5. PHANTOM REFERENCES — business→tech existence
; ══════════════════════════════════════════════════════════
;
; Every dataset ID referenced by a business product must
; exist in the technical catalog. Extract all DS-NNNN IDs
; from business docs, check each against tech catalog text.
(defterm business-ds-ids
(regex-match "DS-\\d+" (doc-text "business:*"))
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "all dataset IDs referenced in business catalog"))
(defterm check-not-phantom
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "single dataset existence check"))
(axiom check-not-phantom-rule
(= (check-not-phantom ?id)
(if (regex-match ?id all-tech-text) true false))
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "Dataset ID must appear in technical catalog"))
; Fold over all business-referenced IDs
(defterm check-no-phantoms
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "no phantom references fold"))
(axiom check-no-phantoms-base
(= (check-no-phantoms (?id))
(check-not-phantom ?id))
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "Single ID: its own existence"))
(axiom check-no-phantoms-step
(= (check-no-phantoms (?id ?...rest))
(and (check-not-phantom ?id) (check-no-phantoms (?...rest))))
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "All referenced IDs must exist"))
(derive no-phantoms-ok
(check-no-phantoms (strict business-ds-ids))
:using (business-ds-ids check-no-phantoms check-no-phantoms-base
check-no-phantoms-step check-not-phantom check-not-phantom-rule
all-tech-text))
; ══════════════════════════════════════════════════════════
; 6. CLASSIFICATION — product ≥ most restrictive source
; ══════════════════════════════════════════════════════════
;
; Dynamic cross-match: for each product, extract source DS-IDs,
; then for each DS-ID fold over contract-docs to find the covering
; contract and get its classification. Check class-ok per pair.
;
; Three nested folds:
; outer: products (from business-docs)
; middle: source DS-IDs (from product doc text)
; inner: contract-docs (find covering contract for DS-ID)
; Inner fold: given a DS-ID and a list of contract docs,
; find the covering contract's classification.
; Returns the classification string, or "internal" if no contract covers it.
(defterm find-ds-contract-class
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "find contract classification for a dataset by scanning contracts"))
(axiom find-ds-contract-class-base
(= (find-ds-contract-class ?ds-id (?ctr-doc))
(if (regex-match ?ds-id (doc-text ?ctr-doc))
(ctr-fact ?ctr-doc "-classification")
"internal"))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Base: check last contract, fallback to internal"))
(axiom find-ds-contract-class-step
(= (find-ds-contract-class ?ds-id (?ctr-doc ?...rest))
(if (regex-match ?ds-id (doc-text ?ctr-doc))
(ctr-fact ?ctr-doc "-classification")
(find-ds-contract-class ?ds-id (?...rest))))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Step: if contract covers DS-ID return its class, else recurse"))
; Middle fold: for each source DS-ID, find contract class
; and check class-ok against the product's classification.
(defterm check-sources-class
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "fold class-ok over source DS-IDs"))
(axiom check-sources-class-base
(= (check-sources-class (?ds-id) ?product-class ?ctr-docs)
(class-ok (find-ds-contract-class ?ds-id ?ctr-docs) ?product-class))
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "Single source: find contract class, check against product"))
(axiom check-sources-class-step
(= (check-sources-class (?ds-id ?...rest) ?product-class ?ctr-docs)
(and (class-ok (find-ds-contract-class ?ds-id ?ctr-docs) ?product-class)
(check-sources-class (?...rest) ?product-class ?ctr-docs)))
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "All sources must satisfy classification propagation"))
; Per product: resolve classification, extract source DS-IDs, check all
(defterm check-product-class
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "per-product classification check"))
(axiom check-product-class-rule
(= (check-product-class ?product-doc ?ctr-docs)
(strict (check-sources-class
(strict (util.concat "" (strict (regex-match "DS-\\d+" (doc-text ?product-doc))) ""))
(bp-fact ?product-doc "-classification")
?ctr-docs)))
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "Extract product class and source IDs, cross-match against contracts"))
; Outer fold: over all business product docs
(defterm check-all-products-class
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "all products classification fold"))
(axiom check-all-products-class-base
(= (check-all-products-class (?doc) ?ctr-docs)
(check-product-class ?doc ?ctr-docs))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Single product classification check"))
(axiom check-all-products-class-step
(= (check-all-products-class (?doc ?...rest) ?ctr-docs)
(and (check-product-class ?doc ?ctr-docs)
(check-all-products-class (?...rest) ?ctr-docs)))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "All products must satisfy classification propagation"))
(derive all-products-class-ok
(check-all-products-class (strict business-docs) (strict contract-docs))
:using (check-all-products-class business-docs contract-docs))
; ══════════════════════════════════════════════════════════
; 7. OMICS CLASSIFICATION — omics data must be restricted
; ══════════════════════════════════════════════════════════
;
; Per contract doc: if covered dataset names contain omics
; keywords (genomic, proteomic, transcriptomic, omics, CRISPR,
; RNA-seq, WGS), the contract's classification must be
; "restricted". Each contract doc is self-contained with
; both the covered dataset names and the classification.
(defterm omics-ok policy_rules.omics-ok
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "omics data must be classified restricted"))
(defterm check-contract-omics
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "per-contract omics classification check"))
(axiom check-contract-omics-rule
(= (check-contract-omics ?doc-name)
(omics-ok
(regex-match "(?i)genom|proteom|transcriptom|omics|crispr|rna.seq|wgs" (doc-text ?doc-name))
(ctr-fact ?doc-name "-classification")))
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "If contract covers omics data, classification must be restricted"))
; Fold over all contract docs
(defterm check-all-omics
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "all contracts omics classification fold"))
(axiom check-all-omics-base
(= (check-all-omics (?doc))
(check-contract-omics ?doc))
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "Single contract omics check"))
(axiom check-all-omics-step
(= (check-all-omics (?doc ?...rest))
(and (check-contract-omics ?doc) (check-all-omics ?...rest)))
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "All contracts with omics data must have restricted classification"))
(derive all-omics-ok
(check-all-omics (strict contract-docs))
:using (contract-docs check-all-omics check-all-omics-base
check-all-omics-step check-contract-omics check-contract-omics-rule
ctr-fact ctr-fact-rule util.concat util.concat-one util.concat-two util.concat-many
util.cons-prepend util.cons-prepend-rule
omics-ok policy_rules.omics-ok-rule))
; ══════════════════════════════════════════════════════════
; 8. CROSS-CHECK — CSV discovery vs generated facts
; ══════════════════════════════════════════════════════════
;
; The generated src.* facts were extracted from the same raw
; documents. Cross-check: for each CSV row, resolve the
; corresponding fact by constructed name and verify it matches
; the positionally-extracted value from the CSV. This consumes
; the generated facts (making them non-dangling) and validates
; that the extraction pipeline is consistent.
; --- 8a. Technical catalog facts ---
; CSV row gives ?id = "DS-5385", fact name is technical.ds_5385-source-type
; Extract number via regex, build name with +, resolve with s.
(defterm check-row-fact
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "per-row cross-check: CSV vs generated fact"))
(axiom check-row-fact-rule
(= (check-row-fact (dx ?id ?name ?path ?table ?schema ?fmt ?cadence ?stype ?provider ?owner ?created ?rows))
(and (= ?stype (tech-fact ?id "-source-type"))
(= ?cadence (tech-fact ?id "-cadence"))
(= ?path (tech-fact ?id "-path"))
(= ?table (tech-fact ?id "-table"))
(= ?owner (tech-fact ?id "-owner"))))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "Every CSV field must match the corresponding generated fact"))
; Fold per department
(defterm check-dept-facts
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "department fact cross-check fold"))
(axiom check-dept-facts-base
(= (check-dept-facts (?row))
(check-row-fact ?row))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "Single row cross-check"))
(axiom check-dept-facts-step
(= (check-dept-facts (?row ?...rest))
(and (check-row-fact ?row) (check-dept-facts (?...rest))))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "All rows must match their facts"))
(derive discovery-facts-ok
(check-dept-facts (strict discovery-rows))
:using (discovery-rows check-dept-facts))
(derive translational-facts-ok
(check-dept-facts (strict translational-rows))
:using (translational-rows check-dept-facts))
(derive clinical-facts-ok
(check-dept-facts (strict clinical-rows))
:using (clinical-rows check-dept-facts))
(derive commercial-facts-ok
(check-dept-facts (strict commercial-rows))
:using (commercial-rows check-dept-facts))
(derive all-facts-ok
(all-true discovery-facts-ok translational-facts-ok
clinical-facts-ok commercial-facts-ok)
:using (discovery-facts-ok translational-facts-ok
clinical-facts-ok commercial-facts-ok all-true))
; --- 8b. Contract facts ---
; Doc name "contract:10x_genomics", fact name contracts.ctr-10x_genomics-status
(defterm check-contract-fact
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "per-contract cross-check: doc vs generated fact"))
(axiom check-contract-fact-rule
(= (check-contract-fact ?doc-name)
(contract-valid
(ctr-fact ?doc-name "-status")))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "Contract status fact must match document extraction"))
; Fold over all contract docs
(defterm check-all-contract-facts
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "all contracts fact cross-check fold"))
(axiom check-all-contract-facts-base
(= (check-all-contract-facts (?doc))
(check-contract-fact ?doc))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "Single contract fact check"))
(axiom check-all-contract-facts-step
(= (check-all-contract-facts (?doc ?...rest))
(and (check-contract-fact ?doc) (check-all-contract-facts (?...rest))))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "All contracts must match their facts"))
(derive all-contract-facts-ok
(check-all-contract-facts (strict contract-docs))
:using (contract-docs check-all-contract-facts check-contract-fact))
; --- 8c. Business catalog facts ---
; Doc name "business:genomic_variant_panel", fact name business.bp-genomic_variant_panel-classification
; Uses bp-fact helper (section 1) to construct fact name from doc name
(defterm check-product-fact
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "per-product cross-check: doc vs generated fact"))
(axiom check-product-fact-rule
(= (check-product-fact ?product-doc)
(class-ok
(bp-fact ?product-doc "-classification")))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Product classification fact must match document extraction"))
; Fold over all business product docs
(defterm check-all-product-facts
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "all products fact cross-check fold"))
(axiom check-all-product-facts-base
(= (check-all-product-facts (?doc))
(check-product-fact ?doc))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Single product fact check"))
(axiom check-all-product-facts-step
(= (check-all-product-facts (?doc ?...rest))
(and (check-product-fact ?doc) (check-all-product-facts (?...rest))))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "All products must match their facts"))
(derive all-product-facts-ok
(check-all-product-facts (strict business-docs))
:using (business-docs check-all-product-facts check-product-fact))
; ══════════════════════════════════════════════════════════
; 9. FINAL POLICY — combine all checks
; ══════════════════════════════════════════════════════════
(derive contract-violations
(count-violations discovery-contracts-ok translational-contracts-ok
clinical-contracts-ok commercial-contracts-ok)
:using (discovery-contracts-ok translational-contracts-ok
clinical-contracts-ok commercial-contracts-ok count-violations))
(derive policy-consistent
(and (= contract-violations 0)
all-contracts-valid
no-phantoms-ok
all-products-class-ok
all-omics-ok
all-facts-ok
all-contract-facts-ok
all-product-facts-ok)
:using (contract-violations all-contracts-valid no-phantoms-ok
all-products-class-ok all-omics-ok
all-facts-ok all-contract-facts-ok all-product-facts-ok))
(fact expected-compliant true
:evidence (evidence "governance_protocol"
:quotes ("A dataset estate is **policy-consistent** when the total violation count across all rules is zero.")
:explanation "Policy says zero violations means compliant"))
(diff policy-check
:replace policy-consistent
:with expected-compliant)
{
"seed": 42,
"total_datasets": 68,
"total_contracts": 25,
"total_products": 20,
"corruptions": [
{
"corruption_type": "path_drift",
"layer": "technical",
"file": "translational.csv",
"field": "storage_path",
"old_value": "s3://seq-lake/rnaseq/expression",
"new_value": "s3://migrated-seq-lake/rnaseq/expression",
"description": "Storage path migrated for RNA-seq Expression"
},
{
"corruption_type": "path_drift",
"layer": "technical",
"file": "commercial.csv",
"field": "storage_path",
"old_value": "s3://commercial-lake/medical/publications",
"new_value": "s3://migrated-commercial-lake/medical/publications",
"description": "Storage path migrated for Medical Affairs Publications"
},
{
"corruption_type": "path_drift",
"layer": "technical",
"file": "discovery.csv",
"field": "storage_path",
"old_value": "s3://ref-data/chemspider/compounds",
"new_value": "s3://migrated-ref-data/chemspider/compounds",
"description": "Storage path migrated for ChemSpider Compound Registry"
},
{
"corruption_type": "path_drift",
"layer": "technical",
"file": "discovery.csv",
"field": "storage_path",
"old_value": "s3://discovery-lake/targets/validation",
"new_value": "s3://migrated-discovery-lake/targets/validation",
"description": "Storage path migrated for Target Validation Assays"
},
{
"corruption_type": "path_drift",
"layer": "technical",
"file": "clinical.csv",
"field": "storage_path",
"old_value": "s3://external-feeds/ema/eudravigilance",
"new_value": "s3://migrated-external-feeds/ema/eudravigilance",
"description": "Storage path migrated for EMA EudraVigilance Reports"
},
{
"corruption_type": "table_rename",
"layer": "technical",
"file": "discovery.csv",
"field": "table_name",
"old_value": "target_validation",
"new_value": "target_validation_v2",
"description": "Table renamed for Target Validation Assays"
},
{
"corruption_type": "table_rename",
"layer": "technical",
"file": "discovery.csv",
"field": "table_name",
"old_value": "ncbi_refseq",
"new_value": "ncbi_refseq_v2",
"description": "Table renamed for NCBI RefSeq Genome"
},
{
"corruption_type": "table_rename",
"layer": "technical",
"file": "commercial.csv",
"field": "table_name",
"old_value": "post_market_surveillance",
"new_value": "post_market_surveillance_v2",
"description": "Table renamed for Post-Market Surveillance"
},
{
"corruption_type": "cadence_mismatch",
"layer": "technical",
"file": "discovery.csv",
"field": "refresh_cadence",
"old_value": "quarterly",
"new_value": "daily",
"description": "Refresh cadence changed for ZINC20 Screening Library"
},
{
"corruption_type": "cadence_mismatch",
"layer": "technical",
"file": "commercial.csv",
"field": "refresh_cadence",
"old_value": "monthly",
"new_value": "weekly",
"description": "Refresh cadence changed for Medical Affairs Publications"
},
{
"corruption_type": "cadence_mismatch",
"layer": "technical",
"file": "translational.csv",
"field": "refresh_cadence",
"old_value": "weekly",
"new_value": "daily",
"description": "Refresh cadence changed for ELISA Biomarker Results"
},
{
"corruption_type": "cadence_mismatch",
"layer": "technical",
"file": "discovery.csv",
"field": "refresh_cadence",
"old_value": "monthly",
"new_value": "hourly",
"description": "Refresh cadence changed for Fragment Library Inventory"
},
{
"corruption_type": "contract_gap",
"layer": "contracts",
"file": "ncbi_clinvar.md",
"field": "file",
"old_value": "exists",
"new_value": "deleted",
"description": "Contract deleted for provider NCBI ClinVar covering 1 datasets"
},
{
"corruption_type": "contract_gap",
"layer": "contracts",
"file": "illumina.md",
"field": "file",
"old_value": "exists",
"new_value": "deleted",
"description": "Contract deleted for provider Illumina covering 1 datasets"
},
{
"corruption_type": "sla_mismatch",
"layer": "contracts",
"file": "meddra_msso.md",
"field": "refresh_sla",
"old_value": "quarterly",
"new_value": "weekly",
"description": "Contract SLA changed for MedDRA MSSO"
},
{
"corruption_type": "sla_mismatch",
"layer": "contracts",
"file": "ncbi.md",
"field": "refresh_sla",
"old_value": "quarterly",
"new_value": "daily",
"description": "Contract SLA changed for NCBI"
},
{
"corruption_type": "sla_mismatch",
"layer": "contracts",
"file": "flatiron_health.md",
"field": "refresh_sla",
"old_value": "monthly",
"new_value": "real-time",
"description": "Contract SLA changed for Flatiron Health"
},
{
"corruption_type": "retention_conflict",
"layer": "contracts",
"file": "royal_society_of_chemistry.md",
"field": "retention_limit",
"old_value": "365 days",
"new_value": "30 days",
"description": "Retention limit shortened for Royal Society of Chemistry"
},
{
"corruption_type": "retention_conflict",
"layer": "contracts",
"file": "10x_genomics.md",
"field": "retention_limit",
"old_value": "180 days",
"new_value": "60 days",
"description": "Retention limit shortened for 10x Genomics"
},
{
"corruption_type": "expired_contract",
"layer": "contracts",
"file": "fda.md",
"field": "expiry_date",
"old_value": "2026-11-02",
"new_value": "2020-05-20",
"description": "Contract expiry backdated for FDA"
},
{
"corruption_type": "expired_contract",
"layer": "contracts",
"file": "who_icd.md",
"field": "expiry_date",
"old_value": "2028-11-21",
"new_value": "2024-05-18",
"description": "Contract expiry backdated for WHO ICD"
},
{
"corruption_type": "phantom_reference",
"layer": "business",
"file": "patient_360_profile.md",
"field": "source_datasets",
"old_value": "none",
"new_value": "DS-9902",
"description": "Added phantom dataset reference to Patient 360 Profile"
},
{
"corruption_type": "phantom_reference",
"layer": "business",
"file": "genomic_variant_panel.md",
"field": "source_datasets",
"old_value": "none",
"new_value": "DS-9979",
"description": "Added phantom dataset reference to Genomic Variant Panel"
},
{
"corruption_type": "phantom_reference",
"layer": "business",
"file": "clinical_trial_operations_hub.md",
"field": "source_datasets",
"old_value": "none",
"new_value": "DS-9919",
"description": "Added phantom dataset reference to Clinical Trial Operations Hub"
},
{
"corruption_type": "phantom_reference",
"layer": "business",
"file": "biomarker_discovery_engine.md",
"field": "source_datasets",
"old_value": "none",
"new_value": "DS-9930",
"description": "Added phantom dataset reference to Biomarker Discovery Engine"
},
{
"corruption_type": "owner_drift",
"layer": "business",
"file": "real_world_evidence_platform.md",
"field": "owner",
"old_value": "David Osei",
"new_value": "Dr. Robert Kim",
"description": "Owner changed for Real-World Evidence Platform"
},
{
"corruption_type": "owner_drift",
"layer": "business",
"file": "target_identification_suite.md",
"field": "owner",
"old_value": "Dr. Wei Zhang",
"new_value": "Dr. Sarah Chen",
"description": "Owner changed for Target Identification Suite"
},
{
"corruption_type": "owner_drift",
"layer": "business",
"file": "safety_signal_dashboard.md",
"field": "owner",
"old_value": "Dr. Fatima Al-Said",
"new_value": "Dr. Amir Hassan",
"description": "Owner changed for Safety Signal Dashboard"
},
{
"corruption_type": "classification_conflict",
"layer": "business",
"file": "competitive_intelligence_feed.md",
"field": "classification",
"old_value": "confidential",
"new_value": "public",
"description": "Classification weakened for Competitive Intelligence Feed"
},
{
"corruption_type": "classification_conflict",
"layer": "business",
"file": "market_share_tracker.md",
"field": "classification",
"old_value": "restricted",
"new_value": "public",
"description": "Classification weakened for Market Share Tracker"
},
{
"corruption_type": "classification_conflict",
"layer": "business",
"file": "compound_optimization_suite.md",
"field": "classification",
"old_value": "restricted",
"new_value": "public",
"description": "Classification weakened for Compound Optimization Suite"
},
{
"corruption_type": "refresh_mismatch",
"layer": "business",
"file": "launch_analytics_dashboard.md",
"field": "refresh_frequency",
"old_value": "weekly",
"new_value": "daily",
"description": "Refresh frequency changed for Launch Analytics Dashboard"
},
{
"corruption_type": "refresh_mismatch",
"layer": "business",
"file": "safety_signal_dashboard.md",
"field": "refresh_frequency",
"old_value": "daily",
"new_value": "real-time",
"description": "Refresh frequency changed for Safety Signal Dashboard"
},
{
"corruption_type": "refresh_mismatch",
"layer": "business",
"file": "compound_optimization_suite.md",
"field": "refresh_frequency",
"old_value": "weekly",
"new_value": "quarterly",
"description": "Refresh frequency changed for Compound Optimization Suite"
}
],
"corruption_summary": {
"path_drift": 5,
"table_rename": 3,
"cadence_mismatch": 4,
"contract_gap": 2,
"sla_mismatch": 3,
"retention_conflict": 2,
"expired_contract": 2,
"phantom_reference": 4,
"owner_drift": 3,
"classification_conflict": 3,
"refresh_mismatch": 3
}
}"""
Synthetic data generator for the data_governance_pltg demo.
Two phases:
1. Generate a fully consistent biopharma data estate
- 4 technical catalog CSVs (discovery, translational, clinical, commercial)
- ~25-30 contract .md files (external providers only)
- ~60-80 business catalog .md product files
2. Inject inconsistencies (~15-20% of records) across all three layers,
logging every mutation to manifest.json
Run:
python generate.py # generate into resources/
python generate.py --clean # wipe resources/ and regenerate
"""
import argparse
import csv
import json
import random
import re
import shutil
from dataclasses import asdict, dataclass
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List
SEED = 42
RESOURCES = Path(__file__).parent / "resources"
# ── helpers ──────────────────────────────────────────────────────────
def _id(prefix: str, n: int) -> str:
return f"{prefix}-{n:04d}"
def _date(start: str = "2021-01-01", end: str = "2025-12-31") -> str:
s = date.fromisoformat(start)
e = date.fromisoformat(end)
d = s + timedelta(days=random.randint(0, (e - s).days))
return d.isoformat()
def _future_date() -> str:
return _date("2026-06-01", "2029-12-31")
def _past_date() -> str:
return _date("2020-01-01", "2024-06-01")
def _pick(choices):
return random.choice(choices)
def _slug(name: str) -> str:
return name.lower().replace(" ", "_").replace("-", "_").replace("/", "_")
# ── domain knowledge ────────────────────────────────────────────────
CADENCES = ["real-time", "hourly", "daily", "weekly", "monthly", "quarterly"]
FORMATS = ["parquet", "csv", "json", "avro", "bam", "fastq", "vcf", "dicom", "sas7bdat"]
CLASSIFICATIONS = ["public", "internal", "confidential", "restricted"]
PERMITTED_USES = ["research-only", "internal-analytics", "commercial", "all"]
GEO_RESTRICTIONS = ["US-only", "EU-only", "global", "APAC-only"]
OWNERS_DISCOVERY = ["Dr. Elena Rossi", "Dr. James Okafor", "Dr. Wei Zhang", "Dr. Priya Sharma"]
OWNERS_TRANSLATIONAL = ["Dr. Sarah Chen", "Dr. Marcus Rivera", "Dr. Yuki Tanaka", "Dr. Amir Hassan"]
OWNERS_CLINICAL = ["Dr. Lisa Patel", "Dr. Robert Kim", "Dr. Fatima Al-Said", "Dr. Thomas Weber"]
OWNERS_COMMERCIAL = ["Jennifer Liu", "Michael Torres", "Anna Kowalski", "David Osei"]
# ── dataset templates ───────────────────────────────────────────────
@dataclass
class DatasetRecord:
dataset_id: str
name: str
storage_path: str
table_name: str
schema_columns: str
format: str
refresh_cadence: str
source_type: str # internal | external
provider: str # "internal" or vendor name
owner: str
created_date: str
row_count: int
department: str = "" # not written to CSV, used for bookkeeping
@dataclass
class ContractRecord:
contract_id: str
provider: str
effective_date: str
expiry_date: str
status: str
covered_datasets: List[Dict] # [{dataset_id, name, permitted_use}]
refresh_sla: str
retention_limit_days: int
classification: str
geo_restriction: str
deidentification_required: bool
audit_rights: str
@dataclass
class BusinessProduct:
product_id: str
name: str
owner: str
classification: str
refresh_frequency: str
created_date: str
description: str
source_datasets: List[Dict] # [{dataset_id, name, domain, source_type, storage_path, table_name}]
retention_days: int # max retention across source contracts (or 0 if all internal)
completeness_pct: float
last_validated: str
known_limitations: str
consumers: List[str]
request_process: str
# ── phase 1: consistent generation ──────────────────────────────────
DISCOVERY_DATASETS = [
(
"HTS Primary Screen Library",
"s3://discovery-lake/hts/primary_screen",
"hts_primary_screen",
"compound_id|smiles|activity_uM|target|assay_type|plate_id|well|z_score",
"parquet",
"weekly",
"internal",
),
(
"HTS Confirmation Screen",
"s3://discovery-lake/hts/confirmation",
"hts_confirmation",
"compound_id|smiles|ic50_nM|hill_slope|r_squared|assay_date",
"parquet",
"weekly",
"internal",
),
(
"SAR Analysis Tables",
"s3://discovery-lake/sar/analyses",
"sar_analysis",
"series_id|compound_id|activity|selectivity|admet_flags|iteration",
"parquet",
"daily",
"internal",
),
(
"Target Validation Assays",
"s3://discovery-lake/targets/validation",
"target_validation",
"target_id|gene_symbol|assay_type|knockdown_pct|cell_line|replicate",
"csv",
"monthly",
"internal",
),
(
"CRISPR Screen Results",
"s3://discovery-lake/crispr/screens",
"crispr_screens",
"guide_id|gene|log2fc|fdr|cell_line|library|screen_date",
"parquet",
"monthly",
"internal",
),
(
"Protein Structure Models",
"s3://discovery-lake/structures/models",
"protein_structures",
"pdb_id|target|resolution_A|method|ligand|binding_site_residues",
"parquet",
"quarterly",
"internal",
),
(
"Fragment Library Inventory",
"s3://discovery-lake/fragments/inventory",
"fragment_library",
"fragment_id|smiles|mw|clogp|hba|hbd|rotatable_bonds|stock_mg",
"csv",
"monthly",
"internal",
),
(
"DMPK In Vitro ADME",
"s3://discovery-lake/dmpk/in_vitro",
"dmpk_adme",
"compound_id|microsomal_clint|papp_ab|papp_ba|efflux_ratio|plasma_binding",
"parquet",
"weekly",
"internal",
),
(
"Compound Selectivity Panel",
"s3://discovery-lake/selectivity/panel",
"selectivity_panel",
"compound_id|kinase_panel_id|percent_inhibition|concentration_nM",
"parquet",
"monthly",
"internal",
),
(
"Lead Optimization Tracker",
"s3://discovery-lake/leads/tracker",
"lead_optimization",
"program_id|series|lead_compound|stage|potency_nM|selectivity_fold|pk_status",
"csv",
"daily",
"internal",
),
# External
(
"NCBI RefSeq Genome",
"s3://ref-data/ncbi/refseq",
"ncbi_refseq",
"accession|organism|assembly|chromosome|seq_length|gene_count|release_date",
"parquet",
"quarterly",
"external",
"NCBI",
),
(
"Ensembl Gene Annotations",
"s3://ref-data/ensembl/genes",
"ensembl_genes",
"ensembl_id|gene_symbol|biotype|chromosome|start|end|strand|description",
"parquet",
"quarterly",
"external",
"Ensembl/EMBL-EBI",
),
(
"PDB Protein Structures",
"s3://ref-data/pdb/structures",
"pdb_structures",
"pdb_id|title|method|resolution|organism|chain_count|release_date",
"parquet",
"weekly",
"external",
"RCSB PDB",
),
(
"ChEMBL Bioactivity Data",
"s3://ref-data/chembl/bioactivity",
"chembl_bioactivity",
"chembl_id|smiles|target_id|activity_type|value|units|assay_type|source",
"parquet",
"quarterly",
"external",
"EMBL-EBI ChEMBL",
),
(
"DrugBank Interactions",
"s3://ref-data/drugbank/interactions",
"drugbank_interactions",
"drugbank_id|name|cas_number|mechanism|target_gene|pathway|interaction_type",
"parquet",
"quarterly",
"external",
"DrugBank",
),
(
"UniProt Protein Sequences",
"s3://ref-data/uniprot/sequences",
"uniprot_sequences",
"uniprot_id|gene|organism|function|subcellular_location|sequence_length|reviewed",
"parquet",
"monthly",
"external",
"UniProt Consortium",
),
(
"ChemSpider Compound Registry",
"s3://ref-data/chemspider/compounds",
"chemspider_compounds",
"csid|smiles|inchi|molecular_formula|mw|data_sources|synonyms",
"csv",
"monthly",
"external",
"Royal Society of Chemistry",
),
(
"ZINC20 Screening Library",
"s3://ref-data/zinc20/library",
"zinc20_library",
"zinc_id|smiles|mw|logp|hba|hbd|charge|purchasability",
"parquet",
"quarterly",
"external",
"UCSF ZINC",
),
]
TRANSLATIONAL_DATASETS = [
(
"WGS Germline Runs",
"s3://seq-lake/wgs/germline",
"wgs_germline",
"sample_id|patient_id|flowcell|lane|read_count|mean_coverage|qc_pass|run_date",
"bam",
"daily",
"internal",
),
(
"WGS Tumor Runs",
"s3://seq-lake/wgs/tumor",
"wgs_tumor",
"sample_id|patient_id|tumor_purity|ploidy|coverage|variant_count|run_date",
"bam",
"daily",
"internal",
),
(
"WES Exome Captures",
"s3://seq-lake/wes/captures",
"wes_captures",
"sample_id|patient_id|capture_kit|on_target_pct|mean_coverage|run_date",
"bam",
"daily",
"internal",
),
(
"RNA-seq Expression",
"s3://seq-lake/rnaseq/expression",
"rnaseq_expression",
"sample_id|patient_id|gene_id|tpm|fpkm|raw_count|library_type|run_date",
"parquet",
"daily",
"internal",
),
(
"scRNA-seq Cell Clusters",
"s3://seq-lake/scrnaseq/clusters",
"scrnaseq_clusters",
"sample_id|cell_barcode|cluster_id|cell_type|umap_x|umap_y|n_genes|n_umi",
"parquet",
"weekly",
"internal",
),
(
"ELISA Biomarker Results",
"s3://assay-lake/elisa/results",
"elisa_results",
"sample_id|patient_id|analyte|concentration_pg_ml|cv_pct|plate_id|run_date",
"csv",
"weekly",
"internal",
),
(
"Flow Cytometry Panels",
"s3://assay-lake/flow/panels",
"flow_cytometry",
"sample_id|patient_id|panel|cd_marker|pct_positive|mfi|gate|run_date",
"csv",
"weekly",
"internal",
),
(
"Mass Spec Proteomics",
"s3://assay-lake/massspec/proteomics",
"massspec_proteomics",
"sample_id|patient_id|protein_id|abundance|peptide_count|coverage_pct|run_date",
"parquet",
"weekly",
"internal",
),
(
"Multiplex Immunoassay",
"s3://assay-lake/multiplex/results",
"multiplex_immunoassay",
"sample_id|patient_id|panel_name|analyte|concentration|unit|cv_pct",
"csv",
"weekly",
"internal",
),
(
"Histopathology Slides",
"s3://imaging-lake/pathology/slides",
"histopathology_slides",
"slide_id|patient_id|tissue_type|stain|magnification|scanner|file_size_gb|scan_date",
"dicom",
"daily",
"internal",
),
(
"Radiology DICOM Archive",
"s3://imaging-lake/radiology/dicom",
"radiology_dicom",
"study_id|patient_id|modality|body_part|series_count|slice_count|study_date",
"dicom",
"daily",
"internal",
),
(
"Spatial Transcriptomics",
"s3://seq-lake/spatial/visium",
"spatial_transcriptomics",
"sample_id|patient_id|spot_count|gene_count|tissue_type|resolution|run_date",
"parquet",
"monthly",
"internal",
),
# External
(
"ClinVar Variant Classifications",
"s3://ref-data/clinvar/variants",
"clinvar_variants",
"variation_id|gene|hgvs|clinical_significance|review_status|condition|last_evaluated",
"vcf",
"monthly",
"external",
"NCBI ClinVar",
),
(
"gnomAD Population Frequencies",
"s3://ref-data/gnomad/frequencies",
"gnomad_frequencies",
"variant_id|chromosome|position|ref|alt|af_global|af_afr|af_eas|af_nfe|filter",
"vcf",
"quarterly",
"external",
"Broad Institute gnomAD",
),
(
"COSMIC Somatic Mutations",
"s3://ref-data/cosmic/somatic",
"cosmic_somatic",
"cosmic_id|gene|mutation_aa|mutation_cds|primary_site|histology|sample_count",
"parquet",
"quarterly",
"external",
"Wellcome Sanger COSMIC",
),
(
"Illumina TruSight Panels",
"s3://vendor-data/illumina/trusight",
"illumina_trusight",
"panel_name|gene_count|region_count|total_bases|design_version|release_date",
"csv",
"quarterly",
"external",
"Illumina",
),
(
"10x Genomics References",
"s3://vendor-data/10x/references",
"tenx_references",
"reference_name|species|genome_build|gene_count|version|release_date",
"parquet",
"quarterly",
"external",
"10x Genomics",
),
]
CLINICAL_DATASETS = [
(
"Phase I Safety Trials",
"s3://clinical-lake/trials/phase1",
"phase1_safety",
"trial_id|patient_id|cohort|dose_level|dose_unit|ae_grade|ae_term|visit_date",
"sas7bdat",
"daily",
"internal",
),
(
"Phase II Efficacy Trials",
"s3://clinical-lake/trials/phase2",
"phase2_efficacy",
"trial_id|patient_id|arm|response|best_response|pfs_months|os_months|visit_date",
"sas7bdat",
"daily",
"internal",
),
(
"Phase III Pivotal Trials",
"s3://clinical-lake/trials/phase3",
"phase3_pivotal",
"trial_id|patient_id|site_id|arm|primary_endpoint|secondary_endpoint|status|date",
"sas7bdat",
"daily",
"internal",
),
(
"Dose Escalation Records",
"s3://clinical-lake/trials/dose_escalation",
"dose_escalation",
"trial_id|cohort|dose_mg|n_patients|dlt_count|mtd_reached|decision_date",
"csv",
"weekly",
"internal",
),
(
"Patient Demographics",
"s3://clinical-lake/patients/demographics",
"patient_demographics",
"patient_id|age|sex|race|ethnicity|country|site_id|enrollment_date",
"parquet",
"daily",
"internal",
),
(
"Lab Results (Central Lab)",
"s3://clinical-lake/labs/central",
"central_lab_results",
"patient_id|visit|test_name|result_value|unit|ref_low|ref_high|flag|collection_date",
"csv",
"daily",
"internal",
),
(
"ECG Monitoring Data",
"s3://clinical-lake/monitoring/ecg",
"ecg_monitoring",
"patient_id|visit|hr_bpm|qtcf_ms|pr_ms|qrs_ms|interpretation|recording_date",
"csv",
"daily",
"internal",
),
(
"Adverse Event Reports",
"s3://clinical-lake/safety/ae_internal",
"ae_internal",
"patient_id|trial_id|ae_term|soc|grade|serious|outcome|onset_date|resolve_date",
"csv",
"daily",
"internal",
),
(
"Informed Consent Tracker",
"s3://clinical-lake/regulatory/consent",
"consent_tracker",
"patient_id|trial_id|consent_version|signed_date|witness|amendments|withdrawal_date",
"csv",
"weekly",
"internal",
),
(
"Sample Biobank LIMS",
"s3://clinical-lake/biobank/lims",
"biobank_lims",
"sample_id|patient_id|sample_type|collection_date|storage_location|volume_ml|freeze_thaw_cycles",
"csv",
"daily",
"internal",
),
(
"Vital Signs Monitoring",
"s3://clinical-lake/monitoring/vitals",
"vitals_monitoring",
"patient_id|visit|sbp|dbp|heart_rate|temperature|weight_kg|height_cm|date",
"csv",
"daily",
"internal",
),
(
"Concomitant Medications",
"s3://clinical-lake/patients/conmeds",
"concomitant_meds",
"patient_id|medication|dose|route|frequency|start_date|end_date|indication",
"csv",
"daily",
"internal",
),
(
"Site Performance Metrics",
"s3://clinical-lake/operations/site_metrics",
"site_metrics",
"site_id|country|enrollment_target|enrolled|screen_fail_pct|query_rate|last_updated",
"csv",
"weekly",
"internal",
),
(
"Data Monitoring Committee",
"s3://clinical-lake/oversight/dmc",
"dmc_reports",
"trial_id|meeting_date|recommendation|unblinded|safety_signal|interim_analysis",
"csv",
"quarterly",
"internal",
),
# External
(
"FDA FAERS Adverse Events",
"s3://external-feeds/fda/faers",
"fda_faers",
"report_id|drug_name|reaction|outcome|age|sex|reporter_type|receive_date",
"csv",
"quarterly",
"external",
"FDA",
),
(
"EMA EudraVigilance Reports",
"s3://external-feeds/ema/eudravigilance",
"ema_eudravigilance",
"report_id|substance|reaction_meddra|serious|outcome|age_group|region|report_date",
"csv",
"quarterly",
"external",
"EMA",
),
(
"WHO VigiBase Global Reports",
"s3://external-feeds/who/vigibase",
"who_vigibase",
"report_id|drug|reaction|country|age|sex|reporter|report_type|year",
"csv",
"quarterly",
"external",
"WHO Uppsala",
),
(
"MedDRA Terminology",
"s3://ref-data/meddra/terms",
"meddra_terms",
"meddra_code|pt_name|hlt_name|hlgt_name|soc_name|version",
"csv",
"quarterly",
"external",
"MedDRA MSSO",
),
(
"ICD-10 Coding Reference",
"s3://ref-data/icd10/codes",
"icd10_codes",
"icd_code|description|category|chapter|block|is_billable",
"csv",
"quarterly",
"external",
"WHO ICD",
),
]
COMMERCIAL_DATASETS = [
(
"Post-Market Surveillance",
"s3://commercial-lake/surveillance/reports",
"post_market_surveillance",
"product_id|event_type|description|severity|reporter|report_date|country",
"csv",
"weekly",
"internal",
),
(
"Field Medical Insights",
"s3://commercial-lake/medical/insights",
"field_medical_insights",
"insight_id|territory|hcp_specialty|topic|sentiment|action_needed|date",
"csv",
"weekly",
"internal",
),
(
"KOL Engagement Tracker",
"s3://commercial-lake/medical/kol_tracker",
"kol_tracker",
"kol_id|name|institution|specialty|tier|engagement_count|last_contact|region",
"csv",
"monthly",
"internal",
),
(
"Market Access Analytics",
"s3://commercial-lake/market_access/analytics",
"market_access",
"product_id|market|formulary_status|tier|payer_type|lives_covered|effective_date",
"parquet",
"monthly",
"internal",
),
(
"Sales Territory Data",
"s3://commercial-lake/sales/territories",
"sales_territories",
"territory_id|rep_id|region|product|scripts_trx|scripts_nrx|market_share_pct|period",
"parquet",
"weekly",
"internal",
),
(
"Medical Affairs Publications",
"s3://commercial-lake/medical/publications",
"medical_publications",
"pub_id|title|journal|impact_factor|pub_type|therapeutic_area|pub_date",
"csv",
"monthly",
"internal",
),
(
"Launch Readiness Tracker",
"s3://commercial-lake/launch/readiness",
"launch_readiness",
"product_id|market|milestone|status|owner|target_date|actual_date|risk_level",
"csv",
"weekly",
"internal",
),
# External
(
"Flatiron RWE Oncology",
"s3://external-feeds/flatiron/oncology",
"flatiron_oncology",
"patient_token|diagnosis|stage|biomarkers|treatment_line|regimen|os_months|data_date",
"parquet",
"monthly",
"external",
"Flatiron Health",
),
(
"Tempus Genomic-Clinical",
"s3://external-feeds/tempus/genomic_clinical",
"tempus_genomic_clinical",
"patient_token|tumor_type|panel|variants_detected|tmb|msi_status|treatment|data_date",
"parquet",
"monthly",
"external",
"Tempus Labs",
),
(
"IQVIA Prescription Data",
"s3://external-feeds/iqvia/prescriptions",
"iqvia_prescriptions",
"product|ndc|channel|geography|trx_count|nrx_count|period|data_date",
"sas7bdat",
"weekly",
"external",
"IQVIA",
),
(
"Optum Claims and EHR",
"s3://external-feeds/optum/claims_ehr",
"optum_claims_ehr",
"patient_token|claim_type|diagnosis_icd|procedure_cpt|ndc|paid_amount|service_date",
"parquet",
"monthly",
"external",
"Optum/UHG",
),
(
"MarketScan Commercial Claims",
"s3://external-feeds/marketscan/claims",
"marketscan_claims",
"patient_token|age_group|plan_type|diagnosis|procedure|drug_ndc|paid|service_date",
"sas7bdat",
"monthly",
"external",
"Merative MarketScan",
),
(
"Symphony Health Rx Data",
"s3://external-feeds/symphony/rx",
"symphony_rx",
"product|ndc|prescriber_id|pharmacy_id|quantity|days_supply|date",
"csv",
"weekly",
"external",
"Symphony Health",
),
(
"Definitive Healthcare HCP",
"s3://external-feeds/defhc/hcp",
"definitive_hcp",
"npi|name|specialty|affiliation|address|prescribing_volume|tier|last_updated",
"csv",
"monthly",
"external",
"Definitive Healthcare",
),
]
# ── scale: expand template lists with combinatorial variations ──────
_SUFFIXES = [
"Batch A",
"Batch B",
"Batch C",
"Batch D",
"Batch E",
"HepG2",
"A549",
"MCF7",
"HEK293",
"Jurkat",
"US Cohort",
"EU Cohort",
"APAC Cohort",
"LATAM Cohort",
"Phase 1",
"Phase 2",
"v2",
"v3",
"Extended",
"QC Filtered",
"Normalized",
"Imputed",
"Annotated",
]
def _scale_templates(templates: list, scale: int) -> list:
"""Expand template list by scale factor using combinatorial suffixes.
Scale 1 = original templates (no change).
Scale N = original + (N-1) variations per template with suffixed names/paths/tables.
"""
if scale <= 1:
return templates
expanded = list(templates)
suffixes = list(_SUFFIXES)
for s in range(1, scale):
for entry in templates:
name = entry[0]
path = entry[1]
table = entry[2]
suffix = suffixes[(s - 1 + hash(name)) % len(suffixes)]
new_name = f"{name} — {suffix}"
slug_suffix = _slug(suffix)
new_path = f"{path}/{slug_suffix}"
new_table = f"{table}_{slug_suffix}"
new_entry = (new_name, new_path, new_table) + entry[3:]
expanded.append(new_entry)
return expanded
_seen_ids: set = set()
def _build_datasets(template_list, department, owners) -> List[DatasetRecord]:
records = []
for entry in template_list:
if len(entry) == 8:
name, path, table, schema, fmt, cadence, src_type = entry[:7]
provider = entry[7]
else:
name, path, table, schema, fmt, cadence, src_type = entry
provider = "internal"
# Ensure globally unique dataset IDs (hash collisions across departments)
n = hash((department, name)) % 9000 + 1000
ds_id = _id("DS", n)
while ds_id in _seen_ids:
n += 1
ds_id = _id("DS", n)
_seen_ids.add(ds_id)
records.append(
DatasetRecord(
dataset_id=ds_id,
name=name,
storage_path=path,
table_name=table,
schema_columns=schema,
format=fmt,
refresh_cadence=cadence,
source_type=src_type,
provider=provider,
owner=_pick(owners),
created_date=_date("2020-01-01", "2024-06-01"),
row_count=random.randint(500, 50_000_000),
department=department,
)
)
return records
def generate_all_datasets(scale: int = 1) -> Dict[str, List[DatasetRecord]]:
return {
"discovery": _build_datasets(_scale_templates(DISCOVERY_DATASETS, scale), "discovery", OWNERS_DISCOVERY),
"translational": _build_datasets(
_scale_templates(TRANSLATIONAL_DATASETS, scale), "translational", OWNERS_TRANSLATIONAL
),
"clinical": _build_datasets(_scale_templates(CLINICAL_DATASETS, scale), "clinical", OWNERS_CLINICAL),
"commercial": _build_datasets(_scale_templates(COMMERCIAL_DATASETS, scale), "commercial", OWNERS_COMMERCIAL),
}
# ── contracts ────────────────────────────────────────────────────────
def generate_contracts(all_datasets: Dict[str, List[DatasetRecord]]) -> List[ContractRecord]:
# Group external datasets by provider
by_provider: Dict[str, List[DatasetRecord]] = {}
for dept_datasets in all_datasets.values():
for ds in dept_datasets:
if ds.source_type == "external":
by_provider.setdefault(ds.provider, []).append(ds)
contracts = []
for i, (provider, datasets) in enumerate(sorted(by_provider.items()), 1):
# Pick a cadence SLA that matches the fastest dataset in the group
cadence_order = {c: j for j, c in enumerate(CADENCES)}
fastest = min(datasets, key=lambda d: cadence_order.get(d.refresh_cadence, 99))
effective = _date("2021-01-01", "2023-06-01")
contracts.append(
ContractRecord(
contract_id=_id("CTR", i),
provider=provider,
effective_date=effective,
expiry_date=_future_date(),
status="active",
covered_datasets=[
{"dataset_id": ds.dataset_id, "name": ds.name, "permitted_use": _pick(PERMITTED_USES)}
for ds in datasets
],
refresh_sla=fastest.refresh_cadence,
retention_limit_days=_pick([90, 180, 365, 730, 1825]),
classification=(
"restricted"
if any(
re.search(r"(?i)genom|proteom|transcriptom|omics|crispr|rna.seq|wgs", ds.name)
for ds in datasets
)
else _pick(["confidential", "restricted"])
),
geo_restriction=_pick(GEO_RESTRICTIONS),
deidentification_required=_pick([True, False]),
audit_rights=_pick(
[
"Annual audit with 30-day notice",
"Bi-annual audit with 60-day notice",
"Quarterly compliance review",
"On-demand audit with 14-day notice",
]
),
)
)
return contracts
# ── business products ────────────────────────────────────────────────
PRODUCT_TEMPLATES = [
# (name, description_template, department_sources, n_sources_range, consumers)
(
"Genomic Variant Panel",
"Integrated germline and somatic variant calls with population frequency annotations and clinical significance. Used by translational scientists for variant interpretation and biomarker discovery.",
["translational"],
(2, 5),
["Translational Science", "Biomarker Team", "Clinical Genomics"],
),
(
"Patient 360 Profile",
"Comprehensive patient view combining demographics, lab results, treatment history, and molecular profiling. Enables holistic patient stratification for trial enrollment and precision medicine.",
["clinical", "translational"],
(3, 6),
["Clinical Operations", "Medical Affairs", "Translational Science"],
),
(
"Drug-Target Interaction Atlas",
"Curated drug-target interactions combining internal screening data with public bioactivity databases. Supports target validation and lead identification.",
["discovery"],
(2, 4),
["Discovery Chemistry", "Computational Biology", "Target Sciences"],
),
(
"Safety Signal Dashboard",
"Aggregated adverse event data from internal trials and external pharmacovigilance databases. Powers real-time safety signal detection and regulatory reporting.",
["clinical"],
(2, 5),
["Drug Safety", "Regulatory Affairs", "Medical Affairs"],
),
(
"Real-World Evidence Platform",
"Integrated claims, EHR, and genomic-clinical data from multiple vendors. Supports health economics, comparative effectiveness, and label expansion studies.",
["commercial"],
(2, 4),
["HEOR", "Medical Affairs", "Market Access"],
),
(
"Compound Optimization Suite",
"End-to-end compound tracking from HTS hits through lead optimization. Includes ADME, selectivity, and PK data with SAR visualizations.",
["discovery"],
(3, 5),
["Medicinal Chemistry", "DMPK", "Discovery Biology"],
),
(
"Biomarker Discovery Engine",
"Multi-omic biomarker analysis combining proteomics, transcriptomics, and immunoassay results. Identifies predictive and prognostic biomarkers for clinical development.",
["translational"],
(3, 5),
["Biomarker Team", "Translational Science", "Clinical Development"],
),
(
"Clinical Trial Operations Hub",
"Centralized trial management data including site performance, enrollment tracking, consent management, and monitoring metrics.",
["clinical"],
(3, 5),
["Clinical Operations", "CRO Partners", "Regulatory Affairs"],
),
(
"Competitive Intelligence Feed",
"Market analytics combining prescription data, medical publications, and KOL engagement metrics for competitive landscape assessment.",
["commercial"],
(2, 4),
["Commercial Strategy", "Medical Affairs", "Market Access"],
),
(
"Tumor Profiling Service",
"Comprehensive tumor characterization combining WGS, RNA-seq, and IHC data with variant annotation from public databases.",
["translational"],
(3, 5),
["Precision Oncology", "Clinical Genomics", "Translational Science"],
),
(
"Pharmacovigilance Warehouse",
"Global adverse event aggregation from FDA FAERS, EMA EudraVigilance, and WHO VigiBase with MedDRA coding.",
["clinical"],
(3, 5),
["Drug Safety", "Regulatory Affairs", "Pharmacovigilance"],
),
(
"Market Share Tracker",
"Weekly prescription volume tracking across channels and geographies from multiple data vendors.",
["commercial"],
(2, 4),
["Commercial Analytics", "Sales Operations", "Brand Teams"],
),
(
"Single-Cell Atlas",
"Curated single-cell RNA-seq and spatial transcriptomics data for tissue-level gene expression profiling.",
["translational"],
(2, 3),
["Computational Biology", "Translational Science", "Discovery Biology"],
),
(
"HCP Engagement Platform",
"Integrated healthcare professional data combining engagement history, prescribing patterns, and KOL profiling.",
["commercial"],
(2, 4),
["Medical Affairs", "Field Medical", "Commercial Operations"],
),
(
"Regulatory Submission Package",
"Pre-assembled clinical data packages for regulatory submissions including safety, efficacy, and bioanalytical data.",
["clinical"],
(3, 5),
["Regulatory Affairs", "Clinical Development", "Biostatistics"],
),
(
"Target Identification Suite",
"Multi-modal target discovery combining CRISPR screens, protein structures, and interaction databases for novel target identification.",
["discovery"],
(3, 5),
["Target Sciences", "Computational Biology", "Discovery Biology"],
),
(
"Fragment-Based Drug Design",
"Fragment screening library data with structural biology and biophysical assay results for fragment-to-lead campaigns.",
["discovery"],
(2, 3),
["Structural Biology", "Medicinal Chemistry", "Computational Chemistry"],
),
(
"Imaging Analytics Platform",
"Integrated pathology and radiology imaging data with AI-derived features for disease characterization.",
["translational"],
(2, 3),
["Digital Pathology", "Radiology", "Translational Science"],
),
(
"Clinical Biobank Inventory",
"Sample tracking and biobank management data linking biospecimens to patient clinical data.",
["clinical"],
(2, 3),
["Biobank Operations", "Translational Science", "Clinical Operations"],
),
(
"Launch Analytics Dashboard",
"Market access, formulary status, and launch readiness metrics for commercial launch planning.",
["commercial"],
(2, 3),
["Launch Team", "Market Access", "Commercial Strategy"],
),
]
def _scale_product_templates(templates: list, scale: int) -> list:
"""Expand product templates by scale factor."""
if scale <= 1:
return templates
expanded = list(templates)
region_suffixes = ["NA", "EMEA", "APAC", "LATAM", "Global"]
for s in range(1, scale):
for entry in templates:
name, desc, dept_sources, n_range, consumers = entry
suffix = region_suffixes[(s - 1 + hash(name)) % len(region_suffixes)]
new_name = f"{name} — {suffix}"
expanded.append((new_name, desc, dept_sources, n_range, consumers))
return expanded
def generate_business_products(
all_datasets: Dict[str, List[DatasetRecord]],
contracts: List[ContractRecord],
scale: int = 1,
) -> List[BusinessProduct]:
products = []
# Flatten datasets by department for easy lookup
by_dept: Dict[str, List[DatasetRecord]] = {}
for dept, ds_list in all_datasets.items():
by_dept[dept] = ds_list
# Build dataset_id → strictest contract classification
classification_strength = {"restricted": 0, "confidential": 1, "internal": 2, "public": 3}
ds_contract_class: Dict[str, str] = {}
for ctr in contracts:
for ds in ctr.covered_datasets:
existing = ds_contract_class.get(ds["dataset_id"])
if existing is None or classification_strength.get(ctr.classification, 99) < classification_strength.get(
existing, 99
):
ds_contract_class[ds["dataset_id"]] = ctr.classification
dept_owners = {
"discovery": OWNERS_DISCOVERY,
"translational": OWNERS_TRANSLATIONAL,
"clinical": OWNERS_CLINICAL,
"commercial": OWNERS_COMMERCIAL,
}
limitations = [
"Data completeness varies by site; some sites have >30-day reporting lag.",
"External reference data refreshed quarterly; may not reflect latest releases.",
"De-identification may exclude rare variants with low population frequency.",
"Historical data prior to 2021 uses legacy schema and may have mapping gaps.",
"Some assay results pending QC review and marked as provisional.",
"Cross-vendor patient linkage is probabilistic with ~95% match rate.",
"Imaging data limited to sites with compatible scanner hardware.",
"Real-world evidence subject to selection bias inherent in claims data.",
]
for i, (name, desc, dept_sources, n_range, consumers) in enumerate(
_scale_product_templates(PRODUCT_TEMPLATES, scale), 1
):
# Pick source datasets from the specified departments
candidate_datasets = []
for dept in dept_sources:
candidate_datasets.extend(by_dept.get(dept, []))
n_sources = min(random.randint(*n_range), len(candidate_datasets))
selected = random.sample(candidate_datasets, n_sources)
# Determine refresh as the fastest among sources
cadence_order = {c: j for j, c in enumerate(CADENCES)}
fastest_source = min(selected, key=lambda d: cadence_order.get(d.refresh_cadence, 99))
# Inherit strictest classification from source datasets' contracts
# If no external sources, default to "internal"
strictest_class = "internal"
for ds in selected:
if ds.dataset_id in ds_contract_class:
ctr_class = ds_contract_class[ds.dataset_id]
if classification_strength.get(ctr_class, 99) < classification_strength.get(strictest_class, 99):
strictest_class = ctr_class
# Compute retention: minimum retention across all contracts covering selected datasets
# (most restrictive contract wins — can't keep data longer than any contract allows)
ds_contract_retention: Dict[str, int] = {}
for ctr in contracts:
for cds in ctr.covered_datasets:
existing = ds_contract_retention.get(cds["dataset_id"])
if existing is None or ctr.retention_limit_days < existing:
ds_contract_retention[cds["dataset_id"]] = ctr.retention_limit_days
min_retention = 0
for ds in selected:
if ds.dataset_id in ds_contract_retention:
r = ds_contract_retention[ds.dataset_id]
if min_retention == 0:
min_retention = r
else:
min_retention = min(min_retention, r)
primary_dept = dept_sources[0]
products.append(
BusinessProduct(
product_id=_id("BP", i),
name=name,
owner=_pick(dept_owners[primary_dept]),
classification=strictest_class,
refresh_frequency=fastest_source.refresh_cadence,
created_date=_date("2022-01-01", "2025-06-01"),
description=desc,
source_datasets=[
{
"dataset_id": ds.dataset_id,
"name": ds.name,
"domain": ds.department,
"source_type": ds.source_type,
"storage_path": ds.storage_path,
"table_name": ds.table_name,
}
for ds in selected
],
retention_days=min_retention,
completeness_pct=round(random.uniform(75.0, 99.5), 1),
last_validated=_date("2025-01-01", "2025-12-31"),
known_limitations=_pick(limitations),
consumers=consumers,
request_process=_pick(["self-serve", "approval-required", "restricted"]),
)
)
return products
# ── writers ──────────────────────────────────────────────────────────
CSV_COLUMNS = [
"dataset_id",
"name",
"storage_path",
"table_name",
"schema_columns",
"format",
"refresh_cadence",
"source_type",
"provider",
"owner",
"created_date",
"row_count",
]
def write_technical_catalogs(all_datasets: Dict[str, List[DatasetRecord]]):
out = RESOURCES / "technical_catalog"
for dept, records in all_datasets.items():
path = out / f"{dept}.csv"
with open(path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
w.writeheader()
for r in records:
row = {k: getattr(r, k) for k in CSV_COLUMNS}
w.writerow(row)
def write_contracts(contracts: List[ContractRecord]):
out = RESOURCES / "contracts"
for ctr in contracts:
path = out / f"{_slug(ctr.provider)}.md"
covered_table = "\n".join(
f"| {d['dataset_id']} | {d['name']} | {d['permitted_use']} |" for d in ctr.covered_datasets
)
content = f"""# Data Sharing Agreement: {ctr.provider}
- **Contract ID**: {ctr.contract_id}
- **Provider**: {ctr.provider}
- **Effective Date**: {ctr.effective_date}
- **Expiry Date**: {ctr.expiry_date}
- **Status**: {ctr.status}
## Covered Datasets
| dataset_id | name | permitted_use |
|---|---|---|
{covered_table}
## Terms
- **Refresh SLA**: {ctr.refresh_sla}
- **Retention Limit**: {ctr.retention_limit_days} days
- **Data Classification**: {ctr.classification}
- **Geographic Restriction**: {ctr.geo_restriction}
- **De-identification Required**: {"yes" if ctr.deidentification_required else "no"}
- **Audit Rights**: {ctr.audit_rights}
"""
path.write_text(content)
def write_business_products(products: List[BusinessProduct]):
out = RESOURCES / "business_catalog"
for bp in products:
path = out / f"{_slug(bp.name)}.md"
source_table = "\n".join(
f"| {s['dataset_id']} | {s['name']} | {s['domain']} | {s['source_type']} | {s['storage_path']} | {s['table_name']} |"
for s in bp.source_datasets
)
consumers_str = ", ".join(bp.consumers)
retention_line = (
f"- **Data Retention**: {bp.retention_days} days"
if bp.retention_days > 0
else "- **Data Retention**: unlimited (internal only)"
)
content = f"""# Data Product: {bp.name}
- **Product ID**: {bp.product_id}
- **Owner**: {bp.owner}
- **Classification**: {bp.classification}
- **Refresh Frequency**: {bp.refresh_frequency}
- **Created**: {bp.created_date}
{retention_line}
## Description
{bp.description}
## Source Datasets
| dataset_id | name | domain | source_type | storage_path | table_name |
|---|---|---|---|---|---|
{source_table}
## Data Quality
- **Completeness**: {bp.completeness_pct}%
- **Last Validated**: {bp.last_validated}
- **Known Limitations**: {bp.known_limitations}
## Access
- **Consumers**: {consumers_str}
- **Request Process**: {bp.request_process}
"""
path.write_text(content)
# ── .pltg generation ─────────────────────────────────────────────────
SRC = Path(__file__).parent / "src"
def _pltg_escape(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"')
def write_pltg_load_contracts(contracts: List[ContractRecord]):
"""Generate src/load_contracts.pltg — load-document for each contract."""
lines = ["; Auto-generated: load contract documents"]
for ctr in contracts:
slug = _slug(ctr.provider)
filepath = RESOURCES / "contracts" / f"{slug}.md"
if filepath.exists():
lines.append(f'(load-document "contract:{slug}" "../resources/contracts/{slug}.md")')
(SRC / "load_contracts.pltg").write_text("\n".join(lines) + "\n")
def write_pltg_load_business(products: List[BusinessProduct]):
"""Generate src/load_business.pltg — load-document for each product."""
lines = ["; Auto-generated: load business catalog documents"]
for bp in products:
slug = _slug(bp.name)
lines.append(f'(load-document "business:{slug}" "../resources/business_catalog/{slug}.md")')
(SRC / "load_business.pltg").write_text("\n".join(lines) + "\n")
def write_pltg_load_technical(all_datasets: Dict[str, List[DatasetRecord]]):
"""Generate src/load_technical.pltg — load-document for each tech CSV."""
lines = ["; Auto-generated: load technical catalog CSV documents"]
for dept in sorted(all_datasets.keys()):
lines.append(f'(load-document "tech:{dept}" "../resources/technical_catalog/{dept}.csv")')
(SRC / "load_technical.pltg").write_text("\n".join(lines) + "\n")
def write_pltg_technical(all_datasets: Dict[str, List[DatasetRecord]]):
"""Generate src/technical.pltg — facts from technical catalog CSVs."""
lines = ["; Auto-generated: technical catalog facts"]
for dept, records in sorted(all_datasets.items()):
doc_name = f"tech:{dept}"
lines.append(f"\n; --- {dept} ---")
for ds in records:
safe = ds.dataset_id.lower().replace("-", "_")
lines.extend(
[
f'(fact {safe}-path "{_pltg_escape(ds.storage_path)}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(ds.storage_path)}")',
f' :explanation "Storage path for {_pltg_escape(ds.name)}"))',
"",
f'(fact {safe}-table "{_pltg_escape(ds.table_name)}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(ds.table_name)}")',
f' :explanation "Table name for {_pltg_escape(ds.name)}"))',
"",
f'(fact {safe}-cadence "{ds.refresh_cadence}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ds.refresh_cadence}")',
f' :explanation "Refresh cadence for {_pltg_escape(ds.name)}"))',
"",
f'(fact {safe}-source-type "{ds.source_type}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ds.source_type}")',
f' :explanation "Source type for {_pltg_escape(ds.name)}"))',
"",
f'(fact {safe}-owner "{_pltg_escape(ds.owner)}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(ds.owner)}")',
f' :explanation "Owner of {_pltg_escape(ds.name)}"))',
"",
]
)
(SRC / "technical.pltg").write_text("\n".join(lines) + "\n")
def write_pltg_contracts(contracts: List[ContractRecord]):
"""Generate src/contracts.pltg — facts from contract .md files."""
lines = ["; Auto-generated: contract facts"]
for ctr in contracts:
slug = _slug(ctr.provider)
filepath = RESOURCES / "contracts" / f"{slug}.md"
if not filepath.exists():
continue
safe = slug.replace("-", "_")
doc_name = f"contract:{slug}"
lines.extend(
[
f"\n; --- {ctr.provider} ---",
f'(fact ctr-{safe}-sla "{ctr.refresh_sla}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ctr.refresh_sla}")',
f' :explanation "Refresh SLA for {_pltg_escape(ctr.provider)} contract"))',
"",
f'(fact ctr-{safe}-retention "{ctr.retention_limit_days} days"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ctr.retention_limit_days} days")',
f' :explanation "Retention limit for {_pltg_escape(ctr.provider)} contract"))',
"",
f'(fact ctr-{safe}-classification "{ctr.classification}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ctr.classification}")',
f' :explanation "Data classification for {_pltg_escape(ctr.provider)} contract"))',
"",
f'(fact ctr-{safe}-expiry "{ctr.expiry_date}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ctr.expiry_date}")',
f' :explanation "Expiry date for {_pltg_escape(ctr.provider)} contract"))',
"",
f'(fact ctr-{safe}-status "{ctr.status}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{ctr.status}")',
f' :explanation "Contract status for {_pltg_escape(ctr.provider)}"))',
"",
]
)
for ds in ctr.covered_datasets:
ds_safe = ds["dataset_id"].lower().replace("-", "_")
lines.extend(
[
f'(fact ctr-{safe}-covers-{ds_safe} true',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(ds["dataset_id"])}")',
f' :explanation "{_pltg_escape(ctr.provider)} contract covers {_pltg_escape(ds["name"])}"))',
"",
f'(fact ctr-{safe}-use-{ds_safe} "{ds["permitted_use"]}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(ds["permitted_use"])}")',
f' :explanation "Permitted use for {_pltg_escape(ds["name"])} under {_pltg_escape(ctr.provider)}"))',
"",
]
)
(SRC / "contracts.pltg").write_text("\n".join(lines) + "\n")
def write_pltg_business(products: List[BusinessProduct]):
"""Generate src/business.pltg — facts from business catalog .md files."""
lines = ["; Auto-generated: business catalog facts"]
for bp in products:
slug = _slug(bp.name)
safe = slug.replace("-", "_")
doc_name = f"business:{slug}"
lines.extend(
[
f"\n; --- {bp.name} ---",
f'(fact bp-{safe}-owner "{_pltg_escape(bp.owner)}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(bp.owner)}")',
f' :explanation "Owner of {_pltg_escape(bp.name)}"))',
"",
f'(fact bp-{safe}-classification "{bp.classification}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{bp.classification}")',
f' :explanation "Classification of {_pltg_escape(bp.name)}"))',
"",
f'(fact bp-{safe}-refresh "{bp.refresh_frequency}"',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{bp.refresh_frequency}")',
f' :explanation "Refresh frequency of {_pltg_escape(bp.name)}"))',
"",
]
)
if bp.retention_days > 0:
lines.extend(
[
f'(fact bp-{safe}-retention {bp.retention_days}',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{bp.retention_days} days")',
f' :explanation "Data retention for {_pltg_escape(bp.name)}"))',
"",
]
)
for src in bp.source_datasets:
ds_safe = src["dataset_id"].lower().replace("-", "_")
lines.extend(
[
f'(fact bp-{safe}-uses-{ds_safe} true',
f' :evidence (evidence "{doc_name}"',
f' :quotes ("{_pltg_escape(src["dataset_id"])}")',
f' :explanation "{_pltg_escape(bp.name)} uses {_pltg_escape(src["name"])}"))',
"",
]
)
(SRC / "business.pltg").write_text("\n".join(lines) + "\n")
def write_pltg_manifest():
"""Generate src/manifest.pltg — aggregated imports of all generated fact modules.
Scans for all generated .pltg files (which may be scattered across
subdirectories) and emits a single manifest that imports them all.
The checker imports manifest once and gets every fact available.
"""
skip = {"manifest"}
lines = [
"; Auto-generated: aggregated fact imports",
"; Collects all generated .pltg modules — do not hand-edit",
"",
]
# Collect all modules, but load documents (load_*) before facts
# so that evidence verification can find the document text.
load_modules = []
fact_modules = []
for pltg_file in sorted(SRC.rglob("*.pltg")):
if pltg_file.stem in skip:
continue
rel = pltg_file.relative_to(SRC).with_suffix("")
module_path = str(rel).replace("/", ".")
if pltg_file.stem.startswith("load_"):
load_modules.append(module_path)
else:
fact_modules.append(module_path)
for mod in load_modules + fact_modules:
lines.append(f'(import (quote {mod}))')
(SRC / "manifest.pltg").write_text("\n".join(lines) + "\n")
def write_all_pltg(
all_datasets: Dict[str, List[DatasetRecord]],
contracts: List[ContractRecord],
products: List[BusinessProduct],
):
"""Write all generated .pltg files."""
SRC.mkdir(parents=True, exist_ok=True)
write_pltg_load_contracts(contracts)
write_pltg_load_business(products)
write_pltg_load_technical(all_datasets)
write_pltg_technical(all_datasets)
write_pltg_contracts(contracts)
write_pltg_business(products)
write_pltg_manifest()
# ── phase 2: inject inconsistencies ─────────────────────────────────
@dataclass
class Corruption:
corruption_type: str
layer: str
file: str
field: str
old_value: str
new_value: str
description: str
def inject_inconsistencies(
all_datasets: Dict[str, List[DatasetRecord]],
contracts: List[ContractRecord],
products: List[BusinessProduct],
scale: int = 1,
) -> List[Corruption]:
corruptions: List[Corruption] = []
def _n(base: int, pool_size: int) -> int:
"""Scale corruption count proportionally, capped by pool size."""
return min(base * scale, pool_size)
# Flatten all datasets
flat_datasets = [ds for dept in all_datasets.values() for ds in dept]
_external_datasets = [ds for ds in flat_datasets if ds.source_type == "external"]
# Datasets reachable from business products — only corrupt what we can detect
reachable_ds_ids = {src["dataset_id"] for bp in products for src in bp.source_datasets}
reachable_datasets = [ds for ds in flat_datasets if ds.dataset_id in reachable_ds_ids]
reachable_contracts = [
c for c in contracts if any(ds["dataset_id"] in reachable_ds_ids for ds in c.covered_datasets)
]
# ── 1. Path drift (technical layer) ──
victims = random.sample(reachable_datasets, _n(5, len(reachable_datasets)))
for ds in victims:
old_path = ds.storage_path
new_path = old_path.replace("s3://", "s3://migrated-")
corruptions.append(
Corruption(
"path_drift",
"technical",
f"{ds.department}.csv",
"storage_path",
old_path,
new_path,
f"Storage path migrated for {ds.name}",
)
)
_mutate_csv(ds.department, ds.dataset_id, "storage_path", new_path)
# ── 2. Table rename (technical layer) ──
victims = random.sample(reachable_datasets, _n(3, len(reachable_datasets)))
for ds in victims:
old_table = ds.table_name
new_table = old_table + "_v2"
corruptions.append(
Corruption(
"table_rename",
"technical",
f"{ds.department}.csv",
"table_name",
old_table,
new_table,
f"Table renamed for {ds.name}",
)
)
_mutate_csv(ds.department, ds.dataset_id, "table_name", new_table)
# ── 3. Cadence change (technical layer) ──
victims = random.sample(reachable_datasets, _n(4, len(reachable_datasets)))
for ds in victims:
old_cadence = ds.refresh_cadence
new_cadence = _pick([c for c in CADENCES if c != old_cadence])
corruptions.append(
Corruption(
"cadence_mismatch",
"technical",
f"{ds.department}.csv",
"refresh_cadence",
old_cadence,
new_cadence,
f"Refresh cadence changed for {ds.name}",
)
)
_mutate_csv(ds.department, ds.dataset_id, "refresh_cadence", new_cadence)
# ── 4. Contract gap (delete contract for external dataset) ──
if len(contracts) >= 3:
victim_contracts = random.sample(contracts, _n(2, len(contracts)))
for ctr in victim_contracts:
filepath = RESOURCES / "contracts" / f"{_slug(ctr.provider)}.md"
if filepath.exists():
filepath.unlink()
corruptions.append(
Corruption(
"contract_gap",
"contracts",
f"{_slug(ctr.provider)}.md",
"file",
"exists",
"deleted",
f"Contract deleted for provider {ctr.provider} covering {len(ctr.covered_datasets)} datasets",
)
)
# ── 5. SLA mismatch (contract layer) ──
modifiable = [c for c in reachable_contracts if (RESOURCES / "contracts" / f"{_slug(c.provider)}.md").exists()]
if len(modifiable) >= 3:
victims = random.sample(modifiable, _n(3, len(modifiable)))
for ctr in victims:
old_sla = ctr.refresh_sla
new_sla = _pick([c for c in CADENCES if c != old_sla])
corruptions.append(
Corruption(
"sla_mismatch",
"contracts",
f"{_slug(ctr.provider)}.md",
"refresh_sla",
old_sla,
new_sla,
f"Contract SLA changed for {ctr.provider}",
)
)
_mutate_contract_field(ctr.provider, "Refresh SLA", new_sla)
# ── 6. Retention conflict (contract layer) ──
if len(modifiable) >= 2:
victims = random.sample(modifiable, _n(2, len(modifiable)))
for ctr in victims:
old_ret = str(ctr.retention_limit_days)
new_ret = str(_pick([30, 60])) # very short
corruptions.append(
Corruption(
"retention_conflict",
"contracts",
f"{_slug(ctr.provider)}.md",
"retention_limit",
f"{old_ret} days",
f"{new_ret} days",
f"Retention limit shortened for {ctr.provider}",
)
)
_mutate_contract_field(ctr.provider, "Retention Limit", f"{new_ret} days")
# ── 7. Expired contract (contract layer) ──
if len(modifiable) >= 2:
victims = random.sample(modifiable, _n(2, len(modifiable)))
for ctr in victims:
old_expiry = ctr.expiry_date
new_expiry = _past_date()
corruptions.append(
Corruption(
"expired_contract",
"contracts",
f"{_slug(ctr.provider)}.md",
"expiry_date",
old_expiry,
new_expiry,
f"Contract expiry backdated for {ctr.provider}",
)
)
_mutate_contract_field(ctr.provider, "Expiry Date", new_expiry)
_mutate_contract_field(ctr.provider, "Status", "expired")
# ── 8. Phantom reference (business layer) ──
victims = random.sample(products, _n(4, len(products)))
for bp in victims:
if bp.source_datasets:
phantom_id = _id("DS", random.randint(9900, 9999))
phantom_entry = {
"dataset_id": phantom_id,
"name": f"Phantom Dataset {phantom_id}",
"domain": "unknown",
"source_type": "internal",
"storage_path": f"s3://phantom-lake/{phantom_id.lower()}",
"table_name": f"phantom_{phantom_id.lower().replace('-', '_')}",
}
corruptions.append(
Corruption(
"phantom_reference",
"business",
f"{_slug(bp.name)}.md",
"source_datasets",
"none",
phantom_id,
f"Added phantom dataset reference to {bp.name}",
)
)
_add_phantom_source(bp.name, phantom_entry)
# ── 9. Owner drift (business layer) ──
victims = random.sample(products, _n(3, len(products)))
for bp in victims:
old_owner = bp.owner
all_owners = OWNERS_DISCOVERY + OWNERS_TRANSLATIONAL + OWNERS_CLINICAL + OWNERS_COMMERCIAL
new_owner = _pick([o for o in all_owners if o != old_owner])
corruptions.append(
Corruption(
"owner_drift",
"business",
f"{_slug(bp.name)}.md",
"owner",
old_owner,
new_owner,
f"Owner changed for {bp.name}",
)
)
_mutate_business_field(bp.name, "Owner", new_owner)
# ── 10. Classification conflict (business layer) ──
victims = random.sample(products, _n(3, len(products)))
for bp in victims:
old_class = bp.classification
# Set to "public" — likely conflicts with restricted/confidential contracts
new_class = "public" if old_class != "public" else "internal"
corruptions.append(
Corruption(
"classification_conflict",
"business",
f"{_slug(bp.name)}.md",
"classification",
old_class,
new_class,
f"Classification weakened for {bp.name}",
)
)
_mutate_business_field(bp.name, "Classification", new_class)
# ── 11. Omics classification weakened (contract layer) ──
# Contracts covering omics data must be "restricted" — weaken to "confidential"
omics_contracts = [
c
for c in modifiable
if c.classification == "restricted"
and any(
re.search(r"(?i)genom|proteom|transcriptom|omics|crispr|rna.seq|wgs", ds["name"])
for ds in c.covered_datasets
)
]
for ctr in random.sample(omics_contracts, _n(2, len(omics_contracts))):
corruptions.append(
Corruption(
"omics_classification_weakened",
"contracts",
f"{_slug(ctr.provider)}.md",
"classification",
"restricted",
"confidential",
f"Omics contract classification weakened for {ctr.provider}",
)
)
_mutate_contract_field(ctr.provider, "Data Classification", "confidential")
# ── 12. Refresh mismatch (business layer) ──
victims = random.sample(products, _n(3, len(products)))
for bp in victims:
old_freq = bp.refresh_frequency
new_freq = _pick([c for c in CADENCES if c != old_freq])
corruptions.append(
Corruption(
"refresh_mismatch",
"business",
f"{_slug(bp.name)}.md",
"refresh_frequency",
old_freq,
new_freq,
f"Refresh frequency changed for {bp.name}",
)
)
_mutate_business_field(bp.name, "Refresh Frequency", new_freq)
return corruptions
# ── mutation helpers ─────────────────────────────────────────────────
def _mutate_csv(department: str, dataset_id: str, field: str, new_value: str):
path = RESOURCES / "technical_catalog" / f"{department}.csv"
rows = []
with open(path) as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames
for row in reader:
if row["dataset_id"] == dataset_id:
row[field] = new_value
rows.append(row)
with open(path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
w.writerows(rows)
def _mutate_contract_field(provider: str, field_label: str, new_value: str):
path = RESOURCES / "contracts" / f"{_slug(provider)}.md"
if not path.exists():
return
text = path.read_text()
import re
text = re.sub(
rf"(\*\*{re.escape(field_label)}\*\*): .+",
rf"\1: {new_value}",
text,
)
path.write_text(text)
def _mutate_business_field(product_name: str, field_label: str, new_value: str):
path = RESOURCES / "business_catalog" / f"{_slug(product_name)}.md"
if not path.exists():
return
text = path.read_text()
import re
text = re.sub(
rf"(\*\*{re.escape(field_label)}\*\*): .+",
rf"\1: {new_value}",
text,
)
path.write_text(text)
def _add_phantom_source(product_name: str, phantom: dict):
path = RESOURCES / "business_catalog" / f"{_slug(product_name)}.md"
if not path.exists():
return
text = path.read_text()
# Add a row to the source datasets table
new_row = f"| {phantom['dataset_id']} | {phantom['name']} | {phantom['domain']} | {phantom['source_type']} | {phantom['storage_path']} | {phantom['table_name']} |"
# Insert before the "## Data Quality" section
text = text.replace("## Data Quality", f"{new_row}\n\n## Data Quality")
path.write_text(text)
# ── main ─────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Generate synthetic data governance demo data")
parser.add_argument("--clean", action="store_true", help="Wipe and regenerate resources/")
parser.add_argument("--seed", type=int, default=SEED, help="Random seed")
parser.add_argument("--scale", type=int, default=1, help="Scale factor (1=~68 datasets, 5=~340, 10=~680)")
parser.add_argument(
"--consistent-only", action="store_true", help="Generate only the consistent baseline (no corruptions)"
)
args = parser.parse_args()
random.seed(args.seed)
_seen_ids.clear()
if args.clean:
for sub in ["technical_catalog", "contracts", "business_catalog"]:
d = RESOURCES / sub
if d.exists():
shutil.rmtree(d)
d.mkdir(parents=True, exist_ok=True)
else:
for sub in ["technical_catalog", "contracts", "business_catalog"]:
(RESOURCES / sub).mkdir(parents=True, exist_ok=True)
# Phase 1: consistent baseline
print("Phase 1: Generating consistent baseline...")
all_datasets = generate_all_datasets(scale=args.scale)
total_ds = sum(len(v) for v in all_datasets.values())
write_technical_catalogs(all_datasets)
print(f" Technical catalog: {total_ds} datasets across {len(all_datasets)} CSVs")
contracts = generate_contracts(all_datasets)
write_contracts(contracts)
print(f" Contracts: {len(contracts)} agreements")
products = generate_business_products(all_datasets, contracts, scale=args.scale)
write_business_products(products)
print(f" Business catalog: {len(products)} data products")
write_all_pltg(all_datasets, contracts, products)
print(" Generated .pltg modules in src/")
if args.consistent_only:
print("\nDone (consistent baseline only, no corruptions).")
return
# Phase 2: inject inconsistencies
print("\nPhase 2: Injecting inconsistencies...")
corruptions = inject_inconsistencies(all_datasets, contracts, products, scale=args.scale)
# Re-generate .pltg after corruptions (reads mutated files)
write_all_pltg(all_datasets, contracts, products)
# Write manifest
manifest = {
"seed": args.seed,
"total_datasets": total_ds,
"total_contracts": len(contracts),
"total_products": len(products),
"corruptions": [asdict(c) for c in corruptions],
"corruption_summary": {},
}
for c in corruptions:
manifest["corruption_summary"][c.corruption_type] = manifest["corruption_summary"].get(c.corruption_type, 0) + 1
manifest_path = Path(__file__).parent / "manifest.json"
manifest_path.write_text(json.dumps(manifest, indent=2))
print(f" Injected {len(corruptions)} corruptions:")
for ctype, count in sorted(manifest["corruption_summary"].items()):
print(f" {ctype}: {count}")
print(f"\n Manifest written to {manifest_path}")
print("Done.")
if __name__ == "__main__":
main()
; ==========================================================
; Biopharma Data Governance — Cross-Layer Consistency
; ==========================================================
;
; Three layers: technical catalog (CSVs), contracts (.md),
; business catalog (.md). Facts extracted per layer, then
; policy rules + checker derive compliance and catch drift.
;
; Generated .pltg modules live in src/ — run generate.py first.
; Hand-written: policy_rules.pltg, checker.pltg
; ==========================================================
(print "================================================================")
(print "Biopharma Data Governance — Cross-Layer Consistency")
(print "================================================================")
; --- Phase 0: Load source documents ---
(print "\n--- Phase 0: Load source documents ---")
; Technical catalogs
(load-document "tech:discovery" "resources/technical_catalog/discovery.csv")
(load-document "tech:translational" "resources/technical_catalog/translational.csv")
(load-document "tech:clinical" "resources/technical_catalog/clinical.csv")
(load-document "tech:commercial" "resources/technical_catalog/commercial.csv")
; Contracts (generated — one per provider)
(import (quote src.load_contracts))
; Business catalog (generated — one per product)
(import (quote src.load_business))
(print " Documents loaded.")
; --- Phase 1: Load all facts via manifest ---
(print "\n--- Phase 1: Load facts (manifest) ---")
(import (quote src.manifest))
(print " All facts registered.")
; --- Phase 2: Policy rules + checker ---
(print "\n--- Phase 2: Policy compliance checks ---")
(import (quote checker))
(print " Checker complete.")
; --- Phase 3: Consistency report ---
(print "\n--- Phase 3: Consistency report ---")
(consistency)
(print "\n================================================================")
(print "================================================================")
"""
General-purpose operators for the data governance demo.
Four effects registered in the System engine env:
(csv-rows doc-name prefix) — parse loaded CSV doc → tagged row lists
(regex-match pattern text) — all regex matches → list of strings or false
(list-tree-paths pattern) — list loaded document names matching glob
(doc-text pattern) — concatenated text of all docs matching glob
Effects receive (system, *args) — the System auto-wraps them.
"""
import csv
import fnmatch
import io
import re
from parseltongue.core.atoms import Symbol
def csv_rows(system, doc_name, prefix):
"""(csv-rows doc-name prefix) → ((prefix col1 col2 ...) ...)
Reads a loaded CSV document and returns each row as a tagged list.
The prefix becomes a Symbol — e.g. (csv-rows "tech:clinical" "dx")
returns ((dx "DS-7570" "Phase I Safety Trials" ...) ...).
"""
doc_name = str(doc_name)
prefix = str(prefix)
content = system.engine.documents.get(doc_name, "")
if not content:
return []
reader = csv.DictReader(io.StringIO(content))
tag = Symbol(prefix)
result = []
for row in reader:
result.append([tag] + list(row.values()))
return result
def regex_match(system, pattern, text):
"""(regex-match pattern text) → list of all matches, or false
Variadic: returns ALL matches, not just the first.
If the pattern has groups, returns the first group from each match.
Otherwise returns the full match string from each.
Returns false if no matches at all.
(regex-match "DS-\\d+" "covers DS-1488 and DS-6408") → ("DS-1488" "DS-6408")
(regex-match "(\\d+) days" "retain 90 days") → ("90")
"""
pattern = str(pattern)
text = str(text)
try:
matches = list(re.finditer(pattern, text))
except re.error:
# Pattern contains unescaped regex metacharacters — retry as literal
matches = list(re.finditer(re.escape(pattern), text))
if not matches:
return False
has_groups = matches[0].lastindex is not None and matches[0].lastindex > 0
if has_groups:
return [m.group(1) for m in matches]
return [m.group(0) for m in matches]
def list_tree_paths(system, pattern):
"""(list-tree-paths pattern) → ("doc-name1" "doc-name2" ...)
Lists loaded document names matching a glob pattern.
E.g. (list-tree-paths "tech:*") → ("tech:clinical" "tech:discovery" ...)
"""
pattern = str(pattern)
result = []
for name in sorted(system.engine.documents.keys()):
if fnmatch.fnmatch(name, pattern):
result.append(name)
return result
def doc_text(system, pattern):
"""(doc-text pattern) → concatenated text of all docs matching glob, or false
Reads loaded documents matching a glob pattern and returns their
concatenated content. Useful for cross-document regex searches.
E.g. (regex-match "DS-1488" (doc-text "contract:*"))
"""
pattern = str(pattern)
parts = []
for name in sorted(system.engine.documents.keys()):
if fnmatch.fnmatch(name, pattern):
parts.append(system.engine.documents[name])
return "\n".join(parts) if parts else False
def s(system, name):
"""(s name) → resolve string as symbol via engine._eval.
Converts a string to a Symbol and evaluates it through the engine,
so stain/vital instrumentation captures the resolution edge.
"""
return system.engine._eval(Symbol(str(name)), system.engine.env)
# Convenience: all effects as a dict for System(effects=...)
GOVERNANCE_EFFECTS = {
"csv-rows": csv_rows,
"regex-match": regex_match,
"list-tree-paths": list_tree_paths,
"doc-text": doc_text,
"s": s,
}
# Parseltongue bench config (auto-generated by pg-bench init)
[detect]
languages = ["csv", "html", "json", "parseltongue", "python", "shell"]
[index]
extensions = [".bash", ".csv", ".htm", ".html", ".json", ".md", ".pg", ".pg.md", ".pgmd", ".pltg", ".py", ".pyi", ".sh", ".tsv", ".txt", ".zsh"]
; ==========================================================
; Data Governance Policy Rules
; ==========================================================
;
; Hand-written. Not generated. Grounded in governance_policy.md.
;
; Three layers:
; 1. Axioms — formal rules from the policy document
; 2. Per-item compliance predicates — rewrite axioms that
; check one dataset/product against a rule
; 3. Variadic reductions — splat axioms that fold over
; lists of compliance results
;
; The generator produces facts. These rules consume them.
; ==========================================================
(load-document "governance_policy" "resources/governance_policy.md")
; ══════════════════════════════════════════════════════════
; 1. AXIOMS — the policy itself
; ══════════════════════════════════════════════════════════
; 2.1 Commercial datasets require valid contract + use policy
(axiom commercial-requires-contract
(implies (= ?source-type "commercial")
(and (= ?has-contract true) (= ?has-use-policy true)))
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract with valid start and end dates and a specified permitted-use policy.")
:explanation "Commercial data needs contract + permitted use"))
; 2.1 Non-compliant commercial datasets must be quarantined
(axiom quarantine-uncovered
(implies
(and (= ?source-type "commercial") (= ?has-contract false))
(= ?must-quarantine true))
:evidence (evidence "governance_policy"
:quotes ("A commercial dataset without a valid contract, or with an expired contract, is non-compliant and must be quarantined.")
:explanation "Uncovered commercial data is quarantined"))
; 2.2 Expired contracts are not valid
(axiom expired-not-valid
(implies (= ?contract-status "expired")
(= ?contract-valid false))
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "Expired = invalid"))
; 1.1 Classification propagates: product ≥ most restrictive source
(axiom classification-propagates
(implies
(and (= ?src-class "restricted") (= ?product-uses-src true))
(= ?product-class "restricted"))
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "Restricted propagates upward"))
; 1.2 Omics data is always restricted
(axiom omics-is-restricted
(implies (= ?is-omics true) (= ?ds-class "restricted"))
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "Omics = restricted"))
; 3.1 Technical cadence must not exceed contractual SLA
(axiom sla-must-align
(implies (= ?tech-faster-than-sla true)
(= ?sla-risk true))
:evidence (evidence "governance_policy"
:quotes ("If the technical cadence is faster than the contractual SLA, the platform may be pulling data more frequently than the provider guarantees, creating a reliability risk.")
:explanation "Over-frequent pull = reliability risk"))
; 5.1 Phantom references
(axiom no-phantoms
(implies (= ?exists-in-tech false)
(= ?is-phantom true))
:evidence (evidence "governance_policy"
:quotes ("A reference to a non-existent dataset is a phantom reference and indicates catalog drift.")
:explanation "Missing from tech catalog = phantom"))
; 4.1 Retention within contract limits
(axiom retention-within-limit
(implies (= ?retention-exceeds true)
(= ?retention-violation true))
:evidence (evidence "governance_policy"
:quotes ("Data products must not retain records longer than the shortest retention limit specified by any contract covering their source datasets.")
:explanation "Retention must respect contract ceiling"))
; ══════════════════════════════════════════════════════════
; 2. PER-ITEM COMPLIANCE PREDICATES — rewrite axioms
; ══════════════════════════════════════════════════════════
;
; (contract-ok ?source-type ?has-contract ?has-use-policy) → bool
; (class-ok ?src-class ?product-class) → bool
;
; Generator emits derives that :bind to specific facts.
; contract-ok: commercial → needs contract + use policy; else exempt
(defterm contract-ok
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract with valid start and end dates and a specified permitted-use policy.")
:explanation "per-dataset contract compliance predicate"))
(axiom contract-ok-rule
(= (contract-ok ?source-type ?has-contract ?has-use-policy)
(if (= ?source-type "commercial")
(and ?has-contract ?has-use-policy)
true))
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract with valid start and end dates and a specified permitted-use policy.")
:explanation "Non-commercial datasets are exempt"))
; class-ok: product class must be ≥ source class
(defterm class-ok
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "per-source classification compliance predicate"))
(axiom class-ok-restricted
(= (class-ok "restricted" ?product-class)
(= ?product-class "restricted"))
:evidence (evidence "governance_policy"
:quotes ("Any data product that consumes a restricted source dataset must itself be classified as restricted.")
:explanation "Restricted source requires restricted product"))
(axiom class-ok-confidential
(= (class-ok "confidential" ?product-class)
(or (= ?product-class "restricted")
(= ?product-class "confidential")))
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Confidential source requires confidential or above"))
(axiom class-ok-internal
(= (class-ok "internal" ?product-class) true)
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Internal sources impose no upward constraint"))
(axiom class-ok-public
(= (class-ok "public" ?product-class) true)
:evidence (evidence "governance_policy"
:quotes ("A product's effective classification is the most restrictive classification among all its source datasets.")
:explanation "Public sources impose no upward constraint"))
; omics-ok: if data contains omics keywords, classification must be restricted
; ?has-omics is truthy (regex-match result) or false, ?classification is a string
(defterm omics-ok
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "omics data must be classified restricted"))
(axiom omics-ok-rule
(= (omics-ok ?has-omics ?classification)
(if ?has-omics
(= ?classification "restricted")
true))
:evidence (evidence "governance_policy"
:quotes ("All genomic, proteomic, and transcriptomic datasets — collectively \"omics data\" — are classified as restricted regardless of their origin.")
:explanation "Omics data requires restricted classification; non-omics is exempt"))
; contract-valid: active → true, expired → false
(defterm contract-valid
:evidence (evidence "governance_policy"
:quotes ("An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "contract validity predicate"))
(axiom contract-valid-rule
(= (contract-valid ?status)
(= ?status "active"))
:evidence (evidence "governance_policy"
:quotes ("A contract is valid when its status is \"active\" and its expiry date has not passed." "An expired contract revokes all permitted uses for the datasets it covers.")
:explanation "Only active contracts are valid"))
; cadence-rank: map cadence string to numeric rank (lower = faster)
; hourly=1, daily=2, weekly=3, monthly=4, quarterly=5
(defterm cadence-rank
:evidence (evidence "governance_policy"
:quotes ("the contractual refresh SLA for a dataset must be at least as frequent as the technical refresh cadence")
:explanation "cadence frequency ranking for SLA comparison"))
(axiom cadence-rank-rule
(= (cadence-rank ?c)
(if (= ?c "hourly") 1
(if (= ?c "daily") 2
(if (= ?c "weekly") 3
(if (= ?c "monthly") 4
5)))))
:evidence (evidence "governance_policy"
:quotes ("the contractual refresh SLA for a dataset must be at least as frequent as the technical refresh cadence")
:explanation "Ordered ranking: hourly(1) < daily(2) < weekly(3) < monthly(4) < quarterly(5)"))
; sla-ok: tech cadence must not be faster than contractual SLA
; True when tech rank >= SLA rank (i.e. same speed or slower)
(defterm sla-ok
:evidence (evidence "governance_policy"
:quotes ("If the technical cadence is faster than the contractual SLA, the platform may be pulling data more frequently than the provider guarantees, creating a reliability risk.")
:explanation "SLA alignment predicate — tech cadence vs contract SLA"))
(axiom sla-ok-rule
(= (sla-ok ?tech-cadence ?contract-sla)
(>= (cadence-rank ?tech-cadence) (cadence-rank ?contract-sla)))
:evidence (evidence "governance_policy"
:quotes ("If the technical cadence is faster than the contractual SLA, the platform may be pulling data more frequently than the provider guarantees, creating a reliability risk.")
:explanation "Tech cadence must not exceed (be faster than) contract SLA"))
; retention-ok: product retention must not exceed contract limit
; Both args are numbers (days). True when product ≤ contract.
(defterm retention-ok
:evidence (evidence "governance_policy"
:quotes ("Data products must not retain records longer than the shortest retention limit specified by any contract covering their source datasets.")
:explanation "retention limit compliance predicate"))
(axiom retention-ok-rule
(= (retention-ok ?product-days ?contract-days)
(<= ?product-days ?contract-days))
:evidence (evidence "governance_policy"
:quotes ("Data products must not retain records longer than the shortest retention limit specified by any contract covering their source datasets.")
:explanation "Product retention must not exceed contract ceiling"))
; ══════════════════════════════════════════════════════════
; 3. VARIADIC REDUCTIONS — splat axioms
; ══════════════════════════════════════════════════════════
; all-true: AND over N booleans
(defterm all-true
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "variadic AND — all items must be compliant"))
(axiom all-true-base
(= (all-true ?x) ?x)
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "Single item: its own compliance"))
(axiom all-true-step
(= (all-true ?x ?y ?...rest)
(and ?x (all-true ?y ?...rest)))
:evidence (evidence "governance_policy"
:quotes ("Any dataset sourced from a commercial provider must be covered by a contract")
:explanation "All items must pass"))
; count-violations: count how many are false
(defterm count-violations
:evidence (evidence "governance_policy"
:quotes ("A commercial dataset without a valid contract, or with an expired contract, is non-compliant and must be quarantined.")
:explanation "variadic violation counter"))
(axiom count-violations-base
(= (count-violations ?x)
(if ?x 0 1))
:evidence (evidence "governance_policy"
:quotes ("A commercial dataset without a valid contract, or with an expired contract, is non-compliant and must be quarantined.")
:explanation "0 if compliant, 1 if not"))
(axiom count-violations-step
(= (count-violations ?x ?y ?...rest)
(+ (if ?x 0 1) (count-violations ?y ?...rest)))
:evidence (evidence "governance_policy"
:quotes ("A commercial dataset without a valid contract, or with an expired contract, is non-compliant and must be quarantined.")
:explanation "Sum violations across all items"))
; any-true: OR over N booleans
(defterm any-true
:evidence (evidence "governance_policy"
:quotes ("Data products consuming datasets covered only by expired contracts must be flagged for review.")
:explanation "variadic OR — any violation triggers flag"))
(axiom any-true-base
(= (any-true ?x) ?x)
:evidence (evidence "governance_policy"
:quotes ("Data products consuming datasets covered only by expired contracts must be flagged for review.")
:explanation "Single item check"))
(axiom any-true-step
(= (any-true ?x ?y ?...rest)
(or ?x (any-true ?y ?...rest)))
:evidence (evidence "governance_policy"
:quotes ("Data products consuming datasets covered only by expired contracts must be flagged for review.")
:explanation "Any item triggers"))
# Enterprise Data Governance Policy
**Effective Date:** 2024-01-15
**Version:** 3.2
**Owner:** Chief Data Officer
## 1. Data Classification
All datasets in the enterprise data platform must carry one of four classification levels: **public**, **internal**, **confidential**, or **restricted**. Classification is assigned at ingestion and propagated through the lineage graph.
### 1.1 Classification Propagation
Any data product that consumes a restricted source dataset must itself be classified as restricted. A product's effective classification is the most restrictive classification among all its source datasets. A product classified below its effective classification is in violation.
### 1.2 Omics Data
All genomic, proteomic, and transcriptomic datasets — collectively "omics data" — are classified as restricted regardless of their origin. Omics data shall not appear as a source in any data product classified below restricted.
### 1.3 Patient-Level Data
Datasets containing patient-level records are classified as confidential at minimum. Patient-level data with genomic markers is classified as restricted.
## 2. Contract Requirements
### 2.1 Commercial Dataset Contracts
Any dataset sourced from a commercial provider must be covered by a contract with valid start and end dates and a specified permitted-use policy. A commercial dataset without a valid contract, or with an expired contract, is non-compliant and must be quarantined.
### 2.2 Contract Validity
A contract is valid when its status is "active" and its expiry date has not passed. An expired contract revokes all permitted uses for the datasets it covers. Data products consuming datasets covered only by expired contracts must be flagged for review.
### 2.3 Permitted Use
Each contract specifies permitted uses for the datasets it covers. A data product may only consume a dataset for a use that falls within the contract's permitted-use scope. Using a dataset outside its permitted use is a policy violation.
## 3. Refresh and SLA
### 3.1 SLA Alignment
The contractual refresh SLA for a dataset must be at least as frequent as the technical refresh cadence configured in the data platform. If the technical cadence is faster than the contractual SLA, the platform may be pulling data more frequently than the provider guarantees, creating a reliability risk.
### 3.2 Product Refresh
A data product's advertised refresh frequency must not exceed the slowest refresh cadence among its source datasets. Advertising a faster refresh than sources can deliver is misleading.
## 4. Retention
### 4.1 Retention Limits
Data products must not retain records longer than the shortest retention limit specified by any contract covering their source datasets. Where no contract specifies a limit, the enterprise default of 730 days applies.
### 4.2 Regulatory Hold
Retention limits may be extended by a documented regulatory hold. Without an active hold, exceeding the contractual retention limit is a violation.
## 5. Lineage and Referential Integrity
### 5.1 Source Existence
Every dataset referenced by a data product must exist in the technical catalog. A reference to a non-existent dataset is a phantom reference and indicates catalog drift.
### 5.2 Path Consistency
The storage path recorded in the business catalog for a source dataset must match the path in the technical catalog. Path divergence indicates that the business catalog is stale.
### 5.3 Table Consistency
The table name recorded in the business catalog for a source dataset must match the table name in the technical catalog. Table name divergence indicates a schema migration that was not propagated to the business catalog.
## 6. Ownership
### 6.1 Domain Alignment
The owner of a data product should belong to the department that owns the majority of the product's source datasets. Cross-department ownership requires documented approval from both department heads.
# Data Governance Registration Protocol
**Version:** 2.1
**Owner:** Data Platform Engineering
## Purpose
This protocol defines the naming convention and mandatory registration requirements for all datasets, contracts, and data products in the enterprise data platform. Every entity must be registered as a set of named facts following the conventions below. Automated compliance checks discover and validate registrations by parsing source catalogs and matching fact names against these patterns.
## 1. Dataset Registration
Every dataset in the technical catalog must produce the following facts, where `{id}` is the lowercase dataset identifier with hyphens replaced by underscores (e.g., DS-1234 becomes ds_1234):
| Fact Name Pattern | Type | Description |
|------------------------|--------|-----------------------------------|
| `{id}-path` | string | Storage path in the data platform |
| `{id}-cadence` | string | Refresh cadence (daily, weekly…) |
| `{id}-source-type` | string | "internal" or "commercial" |
| `{id}-table` | string | Table name in the warehouse |
| `{id}-owner` | string | Responsible person or team |
A dataset is **fully registered** when all five facts are present. A dataset with fewer than five facts is **partially registered** and must be remediated.
## 2. Contract Registration
Every provider contract must produce the following facts, where `{slug}` is the provider name slug (e.g., iqvia, flatiron_health):
| Fact Name Pattern | Type | Description |
|-------------------------------|--------|--------------------------------------|
| `ctr-{slug}-sla` | string | Contractual refresh SLA |
| `ctr-{slug}-retention` | string | Retention limit text |
| `ctr-{slug}-classification` | string | Data classification level |
| `ctr-{slug}-expiry` | string | Contract expiry date |
| `ctr-{slug}-status` | string | "active" or "expired" |
| `ctr-{slug}-covers-{ds_id}` | bool | Whether contract covers this dataset |
| `ctr-{slug}-use-{ds_id}` | string | Permitted use for this dataset |
A contract is **fully registered** when the first five facts are present. Coverage and use facts are per-dataset extensions.
## 3. Business Product Registration
Every business data product must produce the following facts, where `{slug}` is the product name slug:
| Fact Name Pattern | Type | Description |
|------------------------------|--------|--------------------------------------|
| `bp-{slug}-owner` | string | Product owner |
| `bp-{slug}-classification` | string | Product classification level |
| `bp-{slug}-refresh` | string | Advertised refresh frequency |
| `bp-{slug}-uses-{ds_id}` | bool | Whether product consumes this source |
A product is **fully registered** when the first three facts are present.
## 4. Compliance Discovery
The compliance checker discovers entities by parsing source documents:
- Technical catalog CSVs are parsed row-by-row; each row's `dataset_id` column identifies a dataset.
- Contract markdown files are parsed for provider metadata and covered-dataset tables.
- Business catalog markdown files are parsed for product metadata and source-dataset tables.
For each discovered entity, the checker constructs expected fact names from the patterns above and verifies their existence. Missing facts indicate incomplete registration.
## 5. Cross-Layer Validation
After verifying registration completeness, the checker applies policy rules (defined in the governance policy) to validate cross-layer consistency:
- Commercial datasets must have active contracts with specified use policies.
- Product classification must be at least as restrictive as its most restrictive source.
- Technical refresh cadence must not exceed contractual SLA.
- Business catalog paths and tables must match technical catalog.
- Product retention must not exceed contractual limits.
Violations are counted per rule. A dataset estate is **policy-consistent** when the total violation count across all rules is zero.
#!/bin/bash
# Data Governance demo visualization.
#
# Requires lib_paths=[parseltongue/core/] for correct module
# qualification — pg-bench provides this via Bench.STD_PATH.
# Without it, sub-module facts lack the "src.manifest." prefix
# and (s ...) resolution fails.
set -e
DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$DIR"
SCALE="${1:-1}"
EFFECTS="parseltongue.core.demos.data_governance_pltg.operators:GOVERNANCE_EFFECTS"
EVAL_CMD='(fmt "viz" (scope hologram (dissect (stain policy-check))))'
mkdir -p viz-results
cleanup() {
pkill -f 'pg-bench|bench_cli' 2>/dev/null || true
}
trap cleanup EXIT
start_bench() {
pkill -f 'pg-bench|bench_cli' 2>/dev/null || true
sleep 1
rm -rf .parseltongue-bench/
pg-bench serve checker.pltg --effects "$EFFECTS" &disown 2>/dev/null
pg-bench wait 2>/dev/null
}
# ── Phase 1: Clean data estate ──
echo "=== Phase 1: Generating consistent data estate (scale=$SCALE) ==="
python generate.py --clean --consistent-only --scale "$SCALE"
echo "Starting bench..."
start_bench
echo "Generating clean visualization..."
pg-bench eval "$EVAL_CMD" > viz-results/clean.html 2>/dev/null
open viz-results/clean.html
echo "Opened viz-results/clean.html"
echo ""
echo "=== Press Enter to inject corruptions ==="
read -r
# ── Phase 2: Corrupted data estate ──
echo "=== Phase 2: Injecting corruptions ==="
python generate.py --clean --scale "$SCALE"
echo "Restarting bench with corrupted data..."
start_bench
echo "Generating corrupted visualization..."
pg-bench eval "$EVAL_CMD" > viz-results/corrupt.html 2>/dev/null
open viz-results/corrupt.html
echo "Opened viz-results/corrupt.html"
echo ""
echo "=== Done ==="
echo "Clean: viz-results/clean.html"
echo "Corrupt: viz-results/corrupt.html"
; ==========================================================
; Utility axioms for data governance checker
; ==========================================================
;
; concat: map prefix+suffix over list elements
; ==========================================================
; ── cons-prepend: prepend element to list ──
(defterm cons-prepend
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "prepend element to list"))
(axiom cons-prepend-rule
(= (cons-prepend ?x (?...rest))
(?x ?...rest))
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "splice element into list head"))
; ── concat: prefix + list elements + suffix ──
; (concat "pre" ("a") "suf") → "preasuf"
; (concat "pre" ("a" "b") "suf") → ("preasuf" "prebsuf")
(defterm concat
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "map prefix+suffix over list elements"))
(axiom concat-one
(= (concat ?pre (?x) ?suf)
(+ (+ ?pre ?x) ?suf))
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "single-element list: return string"))
(axiom concat-two
(= (concat ?pre (?x ?y) ?suf)
((+ (+ ?pre ?x) ?suf) (+ (+ ?pre ?y) ?suf)))
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "two-element list: return pair"))
(axiom concat-many
(= (concat ?pre (?x ?y ?z ?...rest) ?suf)
(strict (cons-prepend (+ (+ ?pre ?x) ?suf) (strict (concat ?pre (?y ?z ?...rest) ?suf)))))
:evidence (evidence "governance_protocol"
:quotes ("Technical catalog CSVs are parsed row-by-row")
:explanation "3+ element list: prepend head, recurse on tail"))
; ── resolve-all: map s over a list of names ──
; (resolve-all ("name1" "name2")) → (val1 val2)
(defterm resolve-all
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "resolve list of fact names to their values"))
(axiom resolve-all-one
(= (resolve-all (?x))
((s ?x)))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "single name: resolve and wrap in list"))
(axiom resolve-all-two
(= (resolve-all (?x ?y))
((s ?x) (s ?y)))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "two names: resolve both"))
(axiom resolve-all-many
(= (resolve-all (?x ?y ?z ?...rest))
(strict (cons-prepend (s ?x) (strict (resolve-all (?y ?z ?...rest))))))
:evidence (evidence "governance_protocol"
:quotes ("each row's `dataset_id` column identifies a dataset")
:explanation "3+ names: resolve head, prepend onto recursive resolve of tail"))