Skip to content

Vault Event Sourcing

Audit log and rollback system for vault mutations.

Status: Implemented (v3)


Problem

DM agent tools modify vault state (PC, timeline, threads) during gameplay. If a session is deleted or rewound, these changes persist as orphaned state.

Solution

Track all vault mutations as events tied to messages. On session delete or message rewind, replay inverse operations to restore previous state.


Architecture

Hybrid Approach

The vault files remain the source of truth (Obsidian compatibility). Events serve as an audit log enabling undo operations.

Tool Call → Log Event (pending) → Write to Vault → Mark Event (applied)
Session Delete → Read Events → Apply Inverse Operations → Vault Restored

Core Principles

  1. Two-phase write: Log intent before mutation, mark applied after
  2. Conflict-aware rollback: Compare-and-swap semantics (only revert if current == after)
  3. Stable markers: Use HTML comment markers with ULID for timeline/thread identification
  4. Best-effort recovery: Skip conflicts, report failures, continue processing
  5. Deterministic ordering: Events ordered by (created_at DESC, id DESC) for tie-breaking

Event Types

Event Type Tool Stable ID
pc_updated update_pc PC entity ID (e.g., jake)
timeline_appended add_timeline_events ULID (e.g., 01JGXK...)
thread_added add_thread ULID (e.g., 01JGXK...)
thread_resolved resolve_thread Thread ULID

ID Generation: All event/marker IDs are server-generated ULIDs. Never model-generated. ULIDs sort chronologically and are collision-free.


Database Schema

CREATE TABLE rpg.vault_events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id UUID NOT NULL REFERENCES rpg.sessions(id) ON DELETE CASCADE,
    message_id UUID REFERENCES rpg.chat_messages(id) ON DELETE SET NULL,

    -- Event classification
    event_type TEXT NOT NULL,
    entity_type TEXT,           -- 'pc', 'timeline', 'threads'
    entity_id TEXT NOT NULL,    -- ULID for timeline/threads, entity ID for PC

    -- Two-phase commit status
    status TEXT NOT NULL DEFAULT 'pending',

    -- Event data with before/after state for rollback
    payload JSONB NOT NULL,

    created_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT valid_event_type CHECK (event_type IN (
        'pc_updated', 'timeline_appended', 'thread_added', 'thread_resolved'
    )),
    CONSTRAINT valid_status CHECK (status IN ('pending', 'applied', 'failed'))
);

-- Rollback queries: newest-first per session with deterministic tie-break
CREATE INDEX idx_vault_events_session_order
    ON rpg.vault_events(session_id, created_at DESC, id DESC);

-- Message-based rollback (rewind feature)
CREATE INDEX idx_vault_events_message_order
    ON rpg.vault_events(message_id, created_at DESC, id DESC)
    WHERE message_id IS NOT NULL;

-- Find pending events (cleanup/recovery)
CREATE INDEX idx_vault_events_pending
    ON rpg.vault_events(status, created_at)
    WHERE status = 'pending';

Stable Markers

Line numbers drift when files are edited. Use HTML comment markers with ULIDs instead.

Timeline Markers

When appending to timeline.md, wrap with sentinels:

## Day 5

### Morning
<!-- rpg:event:01JGXKAB1234567890ABCDEF:begin -->
- Jake met with Marlena at The Salty Sigil
- Discussed infiltration plan for the salon
<!-- rpg:event:01JGXKAB1234567890ABCDEF:end -->

### Afternoon
<!-- rpg:event:01JGXKCD5678901234GHIJKL:begin -->
- Jake traveled to the Gilded Quill
<!-- rpg:event:01JGXKCD5678901234GHIJKL:end -->

Insertion rules: 1. Find ## Day {day} section (create if missing) 2. Find ### {time_of_day} subsection within day (create if missing) 3. Insert marker block containing ONLY the new bullet lines 4. Marker wraps the inserted content, not existing content

Thread Markers

When adding to open-threads.md, include thread ULID:

## High Priority

<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->
### The Fading Muffle (URGENT)
- The stolen Vintner's Guild token has a scrying anchor
- **Status:** Clock is ticking
- **Next step:** Execute infiltration before it fails

<!-- rpg:thread:01JGXKGH3456789012STUVWX -->
### Silent Circle Salon (TONIGHT)
- Infiltrate as Courier Valen

Thread block boundary rules (for removal): 1. Find marker line <!-- rpg:thread:{ulid} --> 2. Block ends at the FIRST of: - Next <!-- rpg:thread: marker - Next ## section header (level 2) - End of file 3. Remove: marker line + all lines until end boundary (exclusive of boundary)

Benefits: - Survives hand-edits in Obsidian - Survives reformatting - Invisible when rendered - Unique, searchable anchor - ULID prevents collisions


Event Payloads

pc_updated

{
  "entity_id": "jake",
  "changes": {
    "location": {"before": "the-salty-sigil", "after": "gilded-quill"},
    "hp_current": {"before": 13, "after": 10},
    "gold": {"before": 50, "after": 45}
  }
}

Inverse: For each field where current == after, set to before.

timeline_appended

{
  "entity_id": "01JGXKAB1234567890ABCDEF",
  "day": 5,
  "time_of_day": "Morning",
  "events": [
    {"text": "Jake met with Marlena", "time": "Morning"}
  ],
  "marker_begin": "<!-- rpg:event:01JGXKAB1234567890ABCDEF:begin -->",
  "marker_end": "<!-- rpg:event:01JGXKAB1234567890ABCDEF:end -->",
  "file_line_range": {"start": 156, "end": 160}
}

Inverse algorithm: 1. Find content between marker_begin and marker_end (most stable) 2. Fallback: Remove lines start to end if markers missing 3. Fallback: Report failure in RollbackResult.failures

thread_added

{
  "entity_id": "01JGXKEF9012345678MNOPQR",
  "name": "The Missing Artifact",
  "priority": "high",
  "description": "Find the stolen crown before the ritual",
  "marker": "<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->"
}

Inverse: Remove block from marker to next boundary (see boundary rules above).

thread_resolved

{
  "entity_id": "01JGXKEF9012345678MNOPQR",
  "name": "Rescue Gareth",
  "resolution": "Jake found Gareth in Sunfall Hills gully",
  "marker": "<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->",
  "previous_state": {
    "section": "## High Priority",
    "content": "<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->\n### Rescue Gareth\n- Gareth walked into the hills..."
  }
}

Inverse: 1. Remove resolved entry from ## Completed/Resolved 2. Re-insert previous_state.content into previous_state.section


Two-Phase Write Pattern

Avoid orphan mutations (vault written but event not logged).

async def update_pc_with_logging(
    vault: Vault,
    session_id: UUID,
    message_id: UUID,
    changes: dict,
) -> PCState:
    pc = _get_active_pc(vault)

    # 1. Capture before-state
    before = {field: pc.frontmatter.get(field) for field in changes.keys()}

    # 2. Log event as PENDING
    event_id = await event_logger.log_event(
        session_id=session_id,
        message_id=message_id,
        event_type="pc_updated",
        entity_id=pc.id,
        payload={"changes": {
            field: {"before": before[field], "after": value}
            for field, value in changes.items()
            if before[field] != value
        }},
        status="pending",
    )

    try:
        # 3. Apply vault mutation
        vault.update_entity(EntityType.PC, pc.id, set_values=changes)

        # 4. Mark event as APPLIED
        await event_logger.mark_applied(event_id)

    except Exception as e:
        # Mark as failed, vault may be in inconsistent state
        await event_logger.mark_failed(event_id, str(e))
        raise

    return get_pc_state(vault)

Rollback only processes status='applied' events.


Rollback Semantics

Conflict-Aware (Compare-and-Swap)

Rollback is conditional: only revert if current value matches the event's after value.

def _inverse_pc_update(self, payload: dict, vault: Vault) -> FieldRollbackResult:
    pc = vault.get_entity(EntityType.PC, payload["entity_id"])
    reverted = []
    skipped = []
    set_values = {}

    for field, change in payload["changes"].items():
        current = pc.frontmatter.get(field)
        if current == change["after"]:
            # Current matches expected - safe to revert
            set_values[field] = change["before"]
            reverted.append(field)
        else:
            # Changed by another session or hand-edit - skip
            skipped.append({
                "field": field,
                "expected": change["after"],
                "current": current,
                "reason": "current != after"
            })

    if set_values:
        vault.update_entity(EntityType.PC, payload["entity_id"], set_values=set_values)

    return FieldRollbackResult(reverted=reverted, skipped=skipped)

Ordering for Rollback

Events are processed newest-first with deterministic ordering:

SELECT * FROM rpg.vault_events
WHERE session_id = $1 AND status = 'applied'
ORDER BY created_at DESC, id DESC

The id DESC tie-break ensures consistent ordering even if created_at has microsecond collisions.

Rewind Semantics

rollback_from_message(message_id) undoes events from message N onwards (inclusive), newest-first.

  • "Rewind to before message N" = undo N and everything after
  • Events are processed in reverse chronological order
  • Each event is independently conflict-checked

RollbackResult

Machine-readable result for debugging and audit UI:

@dataclass
class RollbackResult:
    events_seen: int
    events_reversed: int
    skipped_conflicts: list[dict]  # Events skipped due to current != after
    failures: list[dict]           # Events that failed to rollback
    success: bool                  # True if no failures (skipped conflicts are OK)

Example:

{
  "events_seen": 18,
  "events_reversed": 14,
  "skipped_conflicts": [
    {
      "event_id": "evt_abc123",
      "event_type": "pc_updated",
      "reason": "current != after",
      "field": "location",
      "expected": "gilded-quill",
      "current": "the-salty-sigil"
    }
  ],
  "failures": [
    {
      "event_id": "evt_def456",
      "event_type": "timeline_appended",
      "reason": "marker not found, line_range invalid"
    }
  ],
  "success": false
}


Pending Event Cleanup

Policy (v1 - no inference)

On startup and periodically (every 5 minutes):

  1. Find status='pending' events older than 2 minutes
  2. Mark as status='failed' with error='timeout_pending'
  3. Log warning with event details

Do not attempt to verify if mutation was applied. That's a rabbit hole for v1.

async def cleanup_stale_pending(cutoff_minutes: int = 2):
    """Mark old pending events as failed."""
    result = await conn.execute("""
        UPDATE rpg.vault_events
        SET status = 'failed',
            payload = payload || '{"error": "timeout_pending"}'::jsonb
        WHERE status = 'pending'
          AND created_at < NOW() - INTERVAL '%s minutes'
        RETURNING id
    """, cutoff_minutes)

    if result:
        logger.warning(f"Marked {len(result)} stale pending events as failed")

Pending events are never rolled back. Only applied events are considered.


Implementation

1. Event Logger

# api/vault/events.py

class VaultEventLogger:
    """Logs vault mutations for audit and rollback."""

    async def log_event(
        self,
        session_id: UUID,
        message_id: UUID,
        event_type: str,
        entity_id: str,
        payload: dict,
        status: str = "pending",
    ) -> UUID:
        """Log a vault event. Returns event ID."""

    async def mark_applied(self, event_id: UUID) -> None:
        """Mark event as successfully applied."""

    async def mark_failed(self, event_id: UUID, error: str) -> None:
        """Mark event as failed with error message."""

    async def get_session_events(
        self,
        session_id: UUID,
        status: str = "applied",
    ) -> list[VaultEvent]:
        """Get events for a session, ordered by (created_at DESC, id DESC)."""

2. Rollback Engine

# api/vault/rollback.py

class VaultRollback:
    """Rolls back vault state by reversing events."""

    async def rollback_session(
        self,
        session_id: UUID,
        vault: Vault,
    ) -> RollbackResult:
        """Rollback all applied events for a session (newest first)."""

    async def rollback_from_message(
        self,
        session_id: UUID,
        message_id: UUID,
        vault: Vault,
    ) -> RollbackResult:
        """Rollback events from message N onwards (inclusive, newest first)."""

    def _inverse_pc_update(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
        """Apply inverse of PC update with conflict detection."""

    def _inverse_timeline_append(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
        """Remove timeline entries by marker (fallback to line range)."""

    def _inverse_thread_added(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
        """Remove thread block by marker."""

    def _inverse_thread_resolved(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
        """Unresolve thread, restore to previous section."""

3. Delete Session Hook

async def delete_session(session_id: UUID, vault: Vault):
    rollback = VaultRollback(database_url)

    # 1. Rollback vault changes
    result = await rollback.rollback_session(session_id, vault)

    logger.info(
        f"Rollback complete: {result.events_reversed}/{result.events_seen} reversed, "
        f"{len(result.skipped_conflicts)} conflicts, {len(result.failures)} failures"
    )

    if result.failures:
        logger.warning(f"Rollback failures: {result.failures}")

    # 2. Delete session (cascades to messages and events)
    await store.delete_session(session_id)

    return result

API Integration

Session Delete with Rollback

Wire rollback into the session delete endpoint to restore vault state:

# api/routes/chat.py

from api.vault_rollback import create_rollback_engine
from api.vault_events import get_vault_event_store

@router.delete("/sessions/{session_id}")
async def delete_session(campaign_id: str, session_id: UUID):
    vault_path = settings.campaigns_path / campaign_id
    store = await get_chat_store()

    # 1. Rollback vault changes
    event_store = await get_vault_event_store(store._pool)
    rollback = await create_rollback_engine(vault_path, event_store)
    result = await rollback.rollback_session(session_id)

    logger.info(
        f"Rollback: {result.events_reversed}/{result.events_seen} reversed, "
        f"{len(result.skipped_conflicts)} conflicts"
    )

    if result.failures:
        logger.warning(f"Rollback failures: {result.failures}")

    # 2. Delete session (cascades to messages and events)
    await store.delete_session(session_id)

    return {
        "deleted": True,
        "rollback": {
            "events_seen": result.events_seen,
            "events_reversed": result.events_reversed,
            "conflicts": len(result.skipped_conflicts),
            "failures": len(result.failures),
        }
    }

Message Rewind

Rollback from a specific message onwards (for "undo" feature):

@router.delete("/sessions/{session_id}/messages/{message_id}")
async def rewind_to_message(campaign_id: str, session_id: UUID, message_id: UUID):
    vault_path = settings.campaigns_path / campaign_id
    store = await get_chat_store()

    # 1. Rollback vault changes from message onwards
    event_store = await get_vault_event_store(store._pool)
    rollback = await create_rollback_engine(vault_path, event_store)
    result = await rollback.rollback_from_message(session_id, message_id)

    # 2. Delete messages from message_id onwards
    await store.delete_messages_from(session_id, message_id)

    return {
        "rewound_to": str(message_id),
        "rollback": {
            "events_seen": result.events_seen,
            "events_reversed": result.events_reversed,
        }
    }

Implementation Status

Completed

  • [x] rpg.vault_events table with two-phase write schema
  • [x] ULID markers in lib/ops/timeline.py
  • [x] ULID markers in lib/ops/threads.py
  • [x] VaultEventStore (api/vault_events.py) - two-phase write
  • [x] VaultOps (api/vault_ops.py) - wrapper with event logging
  • [x] VaultRollback (api/vault_rollback.py) - rollback engine
  • [x] Pending event cleanup on startup (api/main.py)

Wired

  • [x] Wire rollback to DELETE /sessions/{session_id}
  • [x] Wire rollback to DELETE /sessions/{session_id}/messages/{message_id}

Future

  • [ ] PC update event logging (requires tool refactor)
  • [ ] Audit view in chat UI
  • [ ] Branching/forking sessions

Implementation Details

File Structure

api/
├── vault_events.py      # VaultEventStore - two-phase write to DB
├── vault_ops.py         # VaultOps - wraps lib/ops with event logging
├── vault_rollback.py    # VaultRollback - rollback engine
└── routes/chat.py       # Endpoints with rollback wiring

lib/
├── models.py            # PatchResult with data field for metadata
└── ops/
    ├── timeline.py      # ULID markers for timeline entries
    └── threads.py       # ULID markers for threads

supabase/migrations/
└── 005_vault_events.sql # Database schema + cleanup function

Key Components

1. lib/ops/timeline.py - Timeline with ULID Markers

Refactored to use v3 schema with time_of_day anchoring:

def apply_append_timeline_op(op: dict, vault_path: Path, dry_run: bool = False) -> PatchResult:
    """
    V3 Schema: day, time_of_day, events

    Inserts events under ### {time_of_day} within ## Day {day},
    wrapped with ULID marker comments for deterministic rollback.
    """
    day = op["day"]
    time_of_day = op["time_of_day"]  # e.g., "Morning", "Afternoon"
    events = op["events"]

    # Generate server-side ULID
    event_ulid = str(ulid.new())
    marker_begin = f"<!-- rpg:event:{event_ulid}:begin -->"
    marker_end = f"<!-- rpg:event:{event_ulid}:end -->"

    # Build bullet lines from events
    bullet_lines = [f"- {e['text']}" for e in events]
    insert_block = "\n".join([marker_begin, *bullet_lines, marker_end])

    # Insert under ### {time_of_day} within ## Day {day}
    # Creates sections if they don't exist

    return PatchResult(
        op_type="append_timeline",
        path="canon/timeline.md",
        description=f"Day {day} {time_of_day}: +{len(events)} events",
        success=True,
        data={
            "event_ulid": event_ulid,
            "marker_begin": marker_begin,
            "marker_end": marker_end,
            "day": day,
            "time_of_day": time_of_day,
            "line_range": (start_line, end_line),
        },
    )

2. lib/ops/threads.py - Threads with ULID Markers

def apply_add_threads_op(op: dict, vault_path: Path, dry_run: bool = False) -> PatchResult:
    """
    Adds threads with ULID marker comments.

    Format in open-threads.md:
    <!-- rpg:thread:01JGXK... -->
    ### Thread Name
    - Description
    - **Status:** New
    """
    threads_added = []
    for thread in op["threads"]:
        thread_ulid = str(ulid.new())
        marker = f"<!-- rpg:thread:{thread_ulid} -->"

        thread_block = f"\n{marker}\n### {name}\n- {description}\n"
        # Insert into appropriate priority section

        threads_added.append({
            "ulid": thread_ulid,
            "name": name,
            "marker": marker,
        })

    return PatchResult(
        op_type="add_threads",
        success=True,
        data={"threads": threads_added},
    )

def apply_resolve_thread_op(op: dict, vault_path: Path, dry_run: bool = False) -> PatchResult:
    """
    Resolves thread by ULID marker (preferred) or name (fallback).
    Moves to ## Completed section with strikethrough.
    """
    # Find by marker first: <!-- rpg:thread:{ulid} -->
    # Fallback to name: ### {name}
    # Move to Completed section as ### ~~{name}~~

3. api/vault_events.py - Event Store

class VaultEventStore:
    """Two-phase write event store."""

    async def log_pending(
        self,
        session_id: UUID,
        event_type: EventType,  # pc_updated, timeline_appended, thread_added, thread_resolved
        entity_id: str,
        payload: dict,
        message_id: Optional[UUID] = None,
    ) -> VaultEvent:
        """Phase 1: Log event as 'pending' BEFORE mutation."""

    async def mark_applied(self, event_id: UUID) -> bool:
        """Phase 2: Mark as 'applied' AFTER successful mutation."""

    async def mark_failed(self, event_id: UUID, error: str) -> bool:
        """Mark as 'failed' if mutation fails."""

    async def get_session_events(self, session_id: UUID, status: str = None) -> list[VaultEvent]:
        """Get events ordered by (created_at DESC, id DESC)."""

    async def cleanup_stale_pending(self, age_hours: int = 1) -> int:
        """Mark old pending events as failed (crash recovery)."""

4. api/vault_ops.py - Operations Wrapper

class VaultOps:
    """Wraps lib/ops functions with automatic event logging."""

    def __init__(self, vault_path: Path, session_id: UUID, event_store: VaultEventStore):
        self.vault_path = vault_path
        self.session_id = session_id
        self.event_store = event_store

    async def append_timeline(self, day: int, time_of_day: str, events: list) -> PatchResult:
        """
        1. Log pending event
        2. Call apply_append_timeline_op()
        3. Update event with ULID and markers from result
        4. Mark applied
        """

    async def add_threads(self, threads: list) -> PatchResult:
        """Same two-phase pattern for threads."""

    async def resolve_thread(self, name: str = None, thread_ulid: str = None) -> PatchResult:
        """Resolve by ULID or name."""

5. api/vault_rollback.py - Rollback Engine

@dataclass
class RollbackResult:
    success: bool
    events_seen: int = 0
    events_reversed: int = 0
    skipped_conflicts: list[str] = field(default_factory=list)
    failures: list[str] = field(default_factory=list)

class VaultRollback:
    """Reverses vault mutations using event history."""

    async def rollback_session(self, session_id: UUID, dry_run: bool = False) -> RollbackResult:
        """Rollback all applied events for a session."""
        events = await self.event_store.get_session_events(session_id, status="applied")
        return await self._rollback_events(events, dry_run)

    async def rollback_from_message(self, session_id: UUID, message_id: UUID) -> RollbackResult:
        """Rollback events from message onwards (rewind)."""
        events = await self.event_store.get_events_from_message(session_id, message_id)
        return await self._rollback_events(events, dry_run)

    def _reverse_timeline_append(self, event: VaultEvent, dry_run: bool) -> bool:
        """Remove marker block: <!-- rpg:event:{ulid}:begin --> ... <!-- rpg:event:{ulid}:end -->"""
        # Uses regex to find and remove the entire block
        pattern = rf"^{re.escape(marker_begin)}\s*\n.*?^{re.escape(marker_end)}\s*\n?"

    def _reverse_thread_add(self, event: VaultEvent, dry_run: bool) -> bool:
        """Remove thread block from marker to next boundary."""
        # Boundary = next marker OR next ## header OR EOF

    def _reverse_thread_resolve(self, event: VaultEvent, dry_run: bool) -> bool:
        """Remove from Completed section. Note: original content NOT restored in v1."""

    def _reverse_pc_update(self, event: VaultEvent, dry_run: bool) -> bool:
        """Compare-and-swap: only revert if current == after."""

6. Database Migration (005_vault_events.sql)

-- Table
CREATE TABLE rpg.vault_events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id UUID NOT NULL REFERENCES rpg.sessions(id) ON DELETE CASCADE,
    message_id UUID REFERENCES rpg.chat_messages(id) ON DELETE SET NULL,
    event_type TEXT NOT NULL,
    entity_type TEXT,
    entity_id TEXT NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending',
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes for rollback queries
CREATE INDEX idx_vault_events_session_order
    ON rpg.vault_events(session_id, created_at DESC, id DESC);

-- Cleanup function
CREATE OR REPLACE FUNCTION rpg.cleanup_stale_pending_events(age interval)
RETURNS integer AS $$
DECLARE
    cleaned integer;
BEGIN
    WITH updated AS (
        UPDATE rpg.vault_events
        SET status = 'failed',
            payload = payload || '{"error": "timeout_pending"}'::jsonb
        WHERE status = 'pending'
          AND created_at < NOW() - age
        RETURNING id
    )
    SELECT count(*) INTO cleaned FROM updated;
    RETURN cleaned;
END;
$$ LANGUAGE plpgsql;

API Endpoints

Endpoint Method Rollback Behavior
/campaigns/{id}/chat/sessions/{session_id} DELETE Rolls back all session events, then deletes
/campaigns/{id}/chat/sessions/{session_id}/messages/{message_id} DELETE Rolls back from message onwards, then deletes messages

Response includes rollback stats:

{
  "success": true,
  "deleted_session_id": "abc-123",
  "rollback": {
    "events_seen": 5,
    "events_reversed": 5,
    "conflicts": 0,
    "failures": 0
  }
}

Commits

Commit Description
2d80ec4 feat: v3 ULID markers for vault ops
e20576b feat: vault event logging layer (two-phase write)
2907196 feat: vault rollback engine
c4f32c0 feat: wire rollback to session/message delete endpoints

Testing Plan

Unit Tests

Timeline Operations (test_timeline_ops.py)

def test_append_timeline_creates_markers():
    """Verify ULID markers wrap inserted content."""
    result = apply_append_timeline_op({
        "day": 1,
        "time_of_day": "Morning",
        "events": [{"text": "Hero woke up"}],
    }, vault_path)

    assert result.success
    assert result.data["event_ulid"]
    assert "<!-- rpg:event:" in result.data["marker_begin"]

    content = (vault_path / "canon/timeline.md").read_text()
    assert result.data["marker_begin"] in content
    assert result.data["marker_end"] in content
    assert "- Hero woke up" in content

def test_append_timeline_creates_day_section():
    """New day section created if missing."""

def test_append_timeline_creates_time_subsection():
    """New ### Morning subsection created if missing."""

def test_append_timeline_preserves_existing_content():
    """Existing entries not modified."""

def test_append_multiple_events_single_marker():
    """Multiple events wrapped in single marker block."""

Thread Operations (test_thread_ops.py)

def test_add_thread_with_marker():
    """Thread prefixed with ULID marker."""
    result = apply_add_threads_op({
        "threads": [{"name": "Find the key", "priority": "high", "description": "Search the dungeon"}]
    }, vault_path)

    content = (vault_path / "canon/open-threads.md").read_text()
    assert "<!-- rpg:thread:" in content
    assert "### Find the key" in content

def test_add_thread_correct_priority_section():
    """Thread inserted into correct ## Priority section."""

def test_resolve_thread_by_ulid():
    """Resolve finds thread by ULID marker."""

def test_resolve_thread_by_name_fallback():
    """Resolve finds thread by name if no ULID."""

def test_resolve_moves_to_completed():
    """Resolved thread appears in ## Completed with strikethrough."""

Rollback Operations (test_rollback.py)

def test_rollback_timeline_removes_marker_block():
    """Timeline rollback removes content between markers."""
    # 1. Append timeline event
    # 2. Verify markers in file
    # 3. Rollback
    # 4. Verify markers and content removed

def test_rollback_thread_removes_to_boundary():
    """Thread rollback removes from marker to next boundary."""
    # Add two threads, remove first, verify second intact

def test_rollback_respects_thread_boundaries():
    """Removing thread A doesn't affect thread B below it."""

def test_rollback_handles_missing_markers():
    """Gracefully fails if markers deleted by hand-edit."""

def test_rollback_pc_compare_and_swap():
    """PC rollback only reverts if current == after."""
    # 1. Update PC location A -> B
    # 2. Hand-edit to C
    # 3. Rollback should skip (conflict)

def test_rollback_pc_success():
    """PC rollback reverts when values match."""

def test_rollback_ordering_newest_first():
    """Events processed in reverse chronological order."""

def test_rollback_deterministic_tiebreak():
    """Same timestamp events ordered by id DESC."""

Integration Tests

Event Logging (test_event_logging.py)

@pytest.mark.asyncio
async def test_two_phase_write_success():
    """Event goes pending -> applied on success."""
    ops = await create_vault_ops(vault_path, session_id, pool)

    result = await ops.append_timeline(day=1, time_of_day="Morning", events=[...])

    events = await event_store.get_session_events(session_id, status="applied")
    assert len(events) == 1
    assert events[0].event_type == "timeline_appended"

@pytest.mark.asyncio
async def test_two_phase_write_failure():
    """Event stays pending/failed on mutation error."""

@pytest.mark.asyncio
async def test_cleanup_stale_pending():
    """Old pending events marked as failed."""

Session Delete (test_session_delete.py)

@pytest.mark.asyncio
async def test_session_delete_rolls_back_timeline():
    """Deleting session removes timeline entries."""
    # 1. Create session
    # 2. Append timeline events
    # 3. Verify events in file
    # 4. Delete session
    # 5. Verify events removed from file

@pytest.mark.asyncio
async def test_session_delete_rolls_back_threads():
    """Deleting session removes threads."""

@pytest.mark.asyncio
async def test_session_delete_returns_rollback_stats():
    """Response includes events_seen, events_reversed."""

@pytest.mark.asyncio
async def test_session_delete_with_conflicts():
    """Conflicts reported but don't block deletion."""

Message Rewind (test_message_rewind.py)

@pytest.mark.asyncio
async def test_rewind_rolls_back_from_message():
    """Events from message onwards are reversed."""
    # 1. Send 3 messages, each creating timeline events
    # 2. Rewind to message 2
    # 3. Verify message 2 and 3 events removed
    # 4. Message 1 events still present

@pytest.mark.asyncio
async def test_rewind_deletes_messages():
    """Messages from rewind point deleted."""

End-to-End Tests

Full Session Lifecycle (test_e2e_session.py)

@pytest.mark.asyncio
async def test_full_session_lifecycle():
    """
    1. Create campaign and session
    2. Chat: "I explore the forest"
       - DM adds timeline event
       - DM adds thread "Mysterious sounds"
    3. Chat: "I investigate the sounds"
       - DM updates timeline
       - DM resolves thread
    4. Delete session
    5. Verify vault is back to original state
    """

@pytest.mark.asyncio
async def test_interleaved_sessions():
    """
    1. Session A: adds timeline event
    2. Session B: adds timeline event
    3. Delete Session A
    4. Verify Session B's event still present
    """

Obsidian Compatibility (test_obsidian_compat.py)

def test_markers_survive_obsidian_formatting():
    """Markers preserved after Obsidian auto-format."""

def test_markers_invisible_in_preview():
    """HTML comments don't render in Obsidian preview."""

def test_rollback_after_hand_edit():
    """Rollback works after user adds content above markers."""

Load Tests

@pytest.mark.asyncio
async def test_concurrent_timeline_appends():
    """Multiple concurrent appends don't corrupt file."""

@pytest.mark.asyncio
async def test_rollback_many_events():
    """Rollback handles 100+ events efficiently."""

Test Fixtures

@pytest.fixture
def temp_vault(tmp_path):
    """Create temporary vault with standard structure."""
    vault = tmp_path / "test-campaign"
    (vault / "canon").mkdir(parents=True)

    # Create timeline.md
    (vault / "canon/timeline.md").write_text("""---
title: Timeline
---

# Timeline

## Day 1

### Morning
- Campaign begins
""")

    # Create open-threads.md
    (vault / "canon/open-threads.md").write_text("""---
title: Open Threads
---

# Open Threads

## High Priority

_No active threads._

## Medium Priority

_No active threads._

## Completed

_None yet._
""")

    return vault

@pytest.fixture
async def db_pool():
    """Create test database connection pool."""

@pytest.fixture
async def event_store(db_pool):
    """Create VaultEventStore with test pool."""

Test Execution

# Run all vault event tests
pytest tests/vault/ -v

# Run with coverage
pytest tests/vault/ --cov=api/vault --cov=lib/ops --cov-report=html

# Run specific test category
pytest tests/vault/test_rollback.py -v

# Run integration tests (requires DB)
pytest tests/vault/ -m integration -v

CI Pipeline

# .github/workflows/test.yml
vault-tests:
  runs-on: ubuntu-latest
  services:
    postgres:
      image: postgres:15
      env:
        POSTGRES_DB: rpg_test
        POSTGRES_USER: test
        POSTGRES_PASSWORD: test
  steps:
    - uses: actions/checkout@v4
    - name: Setup Python
      uses: actions/setup-python@v5
      with:
        python-version: '3.11'
    - name: Install dependencies
      run: pip install -r requirements.txt -r requirements-test.txt
    - name: Run migrations
      run: |
        psql -h localhost -U test -d rpg_test -f supabase/migrations/001_chat_history.sql
        psql -h localhost -U test -d rpg_test -f supabase/migrations/005_vault_events.sql
    - name: Run tests
      run: pytest tests/vault/ -v --cov

Edge Cases

Interleaved Sessions

Multiple active sessions modifying vault concurrently.

Handled by: Compare-and-swap rollback. Each event is independently checked. Session A's rollback won't clobber Session B's changes if B wrote after A.

Obsidian Hand-Edits

User edits vault files directly in Obsidian.

Handled by: - Markers survive hand-edits (HTML comments are preserved) - Compare-and-swap detects when values don't match - Skipped conflicts are reported, not errors

Pending Events (Crash Recovery)

Server crashes between log and apply.

Handled by: Cleanup job marks stale pending as failed. No inference attempted.

Markers Deleted

User removes HTML comment markers from vault files.

Handled by: Fallback chain: 1. Try marker-based removal 2. Fallback to line range (if still valid) 3. Report failure in RollbackResult.failures


Merge Checklist

  • [ ] Markers use server-generated ULID; collisions impossible
  • [ ] Thread rollback boundary rules implemented + tested
  • [ ] Timeline insertion uses day/time anchors; marker wraps only inserted lines
  • [ ] Rollback queries order by (created_at DESC, id DESC)
  • [ ] Rollback ignores non-applied events
  • [ ] Pending cleanup job marks stale pending as failed (no inference)

Critical Tests

  1. Interleaved PC updates: A then B change same field; deleting A should skip conflict
  2. Obsidian edit drift: Insert extra lines above timeline marker; rollback still finds marker
  3. Marker deleted: Rollback produces failure entry, continues with other events
  4. Thread boundary: Removing thr_X doesn't remove thr_Y below it
  5. Rewind ordering: Two events same created_at; ordering deterministic via id

Future Enhancements

Branching

Create alternate timelines by: 1. Rollback to message N 2. Fork session with new ID 3. Continue from that point

Audit View

UI to see what changed in each message:

Message: "I go to the Gilded Quill"
  → location: the-salty-sigil → gilded-quill
  → timeline: +2 events (01JGXKAB...)

Powered by RollbackResult structure.

entity_created Event

When DM invents new NPCs/locations, log creation as event so delete/rewind can remove them:

{
  "event_type": "entity_created",
  "entity_type": "npc",
  "entity_id": "new-merchant",
  "payload": {"path": "canon/npcs/new-merchant.md"}
}

PC Revision Tracking (Optional)

For stronger conflict detection, add _revision to PC frontmatter:

hp_current: 13
location: gilded-quill
_revision: 42