Self-Healing Probes

The entire analysis flow — loading documents, checking consistency, diagnosing failures, and recovering — is written in Parseltongue DSL. Python only registers primitive effects and feeds the script. Effects become the instruction set for self-healing probes.

Files

"""
Demo: Self-Healing Probes

Scenario: The entire analysis flow — loading documents, checking
consistency, diagnosing failures, and recovering — is written in
Parseltongue DSL. Python only registers primitive effects and feeds
the script. Effects become the instruction set for self-healing probes.

Effects (the instruction set):
  (load-data name path)          — load a document
  (check-diff name)              — evaluate a diff, true if consistent
  (check-consistency)            — full system consistency report
  (snapshot name)                — save a fact's current value
  (patch-fact name value)        — overwrite a fact
  (rollback name)                — restore a fact from snapshot

The DSL script uses (if (not (check-diff ...)) ...) to
conditionally patch or rollback.

Usage:
    python -m parseltongue.core.demos.self_healing.demo
"""

import logging
import os
import sys

from parseltongue.core import System, load_source
from parseltongue.core.atoms import Evidence

RESOURCE_DIR = os.path.join(os.path.dirname(__file__), "resources")

# ── Primitive Effects ────────────────────────────────────────

_snapshots: dict[str, tuple] = {}


def load_data(system, name, path):
    resolved = os.path.join(RESOURCE_DIR, str(path))
    system.load_document(str(name), resolved)
    print(f"  [load-data] '{name}' ← {path}")
    return True


def check_diff(system, name):
    result = system.eval_diff(str(name))
    print(f"  [check-diff] {result}")
    return not result.values_diverge


def check_consistency(system):
    report = system.consistency()
    print(f"  [consistency] {report}")
    return report.consistent


def snapshot(system, name):
    name = str(name)
    data = system.facts[name]
    _snapshots[name] = (data.wff, data.origin)
    print(f"  [snapshot] Saved {name} = {data.wff}")
    return True


def patch_fact(system, name, value, document, quote, explanation):
    name, document, quote, explanation = str(name), str(document), str(quote), str(explanation)
    print(f"  [patch-fact] {name} ← {value}")
    evidence = Evidence(document=document, quotes=[quote], explanation=explanation, verify_manual=True)
    system.set_fact(name, value, evidence)
    return True


def rollback(system, name):
    name = str(name)
    value, origin = _snapshots.pop(name)
    system.set_fact(name, value, origin)
    print(f"  [rollback] {name} ← {value} (restored)")
    return True


# ── The Script ───────────────────────────────────────────────

SELF_HEAL_SCRIPT = r"""
; ── Load source ──
(load-data "auth_module" "auth_module.py.txt")

; ── Extract facts ──
(fact hash-algorithm "sha256"
  :evidence (evidence "auth_module"
    :quotes ("HASH_ALGORITHM = \"sha256\"")
    :explanation "Token hashing algorithm"))

(fact session-hash-algorithm "md5"
  :evidence (evidence "auth_module"
    :quotes ("hashlib.md5(f\"{user_id}:{now}\".encode()).hexdigest()")
    :explanation "Session ID uses MD5"))

; ── Detect divergence ──
(diff hash-consistency
    :replace hash-algorithm
    :with session-hash-algorithm)

; ── Initial state ──
(check-consistency)

; ── If divergent: snapshot + patch.  If consistent: rollback. ──
(snapshot "session-hash-algorithm")

(if (not (check-diff hash-consistency))
    (patch-fact "session-hash-algorithm" "sha256"
        "auth_module"
        "HASH_ALGORITHM = \"sha256\""
        "Remediation probe: session hash should match token hash")
    (rollback "session-hash-algorithm"))

; ── After patch ──
(check-consistency)

; ── Same conditional again: now consistent → rollback ──
(if (not (check-diff "hash-consistency"))
    (patch-fact "session-hash-algorithm" "sha256"
        "auth_module"
        "HASH_ALGORITHM = \"sha256\""
        "Remediation probe: session hash should match token hash")
    (rollback "session-hash-algorithm"))

; ── Back to original ──
(check-consistency)
"""


def main():
    plog = logging.getLogger("parseltongue")
    plog.setLevel(logging.WARNING)
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("  [%(levelname)s] %(message)s"))
    plog.addHandler(handler)

    print("=" * 60)
    print("Parseltongue — Self-Healing Probes")
    print("=" * 60)

    s = System(
        overridable=True,
        effects={
            "load-data": load_data,
            "check-diff": check_diff,
            "check-consistency": check_consistency,
            "snapshot": snapshot,
            "patch-fact": patch_fact,
            "rollback": rollback,
        },
    )

    print(s)
    print("\n" + "-" * 60 + "\n")

    load_source(s, SELF_HEAL_SCRIPT)

    print("\n" + "=" * 60)
    print(f"Final: {s}")


if __name__ == "__main__":
    main()
"""Authentication module for the user service.

Handles token generation, validation, and session management.
Tokens expire after 3600 seconds (1 hour) by default.
"""

import hashlib
import time
from typing import Optional

TOKEN_EXPIRY = 3600  # seconds
MAX_SESSIONS = 5
HASH_ALGORITHM = "sha256"


def generate_token(user_id: str, secret: str) -> str:
    """Generate an authentication token for the given user.

    Args:
        user_id: The unique user identifier.
        secret: The application secret key.

    Returns:
        A hex-encoded SHA-256 hash token.
    """
    timestamp = str(int(time.time()))
    payload = f"{user_id}:{timestamp}:{secret}"
    return hashlib.sha256(payload.encode()).hexdigest()


def validate_token(token: str, user_id: str, secret: str, max_age: int = TOKEN_EXPIRY) -> bool:
    """Validate a token against user credentials.

    Args:
        token: The token string to validate.
        user_id: The user who owns the token.
        secret: The application secret key.
        max_age: Maximum token age in seconds. Defaults to TOKEN_EXPIRY.

    Returns:
        True if the token is valid and not expired, False otherwise.
    """
    if not token or not user_id:
        return False
    # Token validation logic would check against stored tokens
    return len(token) == 64  # SHA-256 hex length


def create_session(user_id: str, metadata: Optional[dict] = None) -> dict:
    """Create a new session for the user.

    If the user already has MAX_SESSIONS active sessions, the oldest
    session is revoked before creating a new one.

    Args:
        user_id: The user identifier.
        metadata: Optional session metadata (IP, user agent, etc).

    Returns:
        A dict with session_id, user_id, created_at, and expires_at fields.
    """
    now = int(time.time())
    return {
        "session_id": hashlib.md5(f"{user_id}:{now}".encode()).hexdigest(),
        "user_id": user_id,
        "created_at": now,
        "expires_at": now + TOKEN_EXPIRY,
    }


def revoke_session(session_id: str) -> bool:
    """Revoke an active session by its ID.

    Returns:
        True if the session was found and revoked, False if not found.
    """
    # In practice this would delete from session store
    return True

Output

Click "Run in browser" to execute this demo with Pyodide.