Vault Event Sourcing¶
Audit log and rollback system for vault mutations.
Status: Implemented (v3)
Problem¶
DM agent tools modify vault state (PC, timeline, threads) during gameplay. If a session is deleted or rewound, these changes persist as orphaned state.
Solution¶
Track all vault mutations as events tied to messages. On session delete or message rewind, replay inverse operations to restore previous state.
Architecture¶
Hybrid Approach¶
The vault files remain the source of truth (Obsidian compatibility). Events serve as an audit log enabling undo operations.
Tool Call → Log Event (pending) → Write to Vault → Mark Event (applied)
↓
Session Delete → Read Events → Apply Inverse Operations → Vault Restored
Core Principles¶
- Two-phase write: Log intent before mutation, mark applied after
- Conflict-aware rollback: Compare-and-swap semantics (only revert if current == after)
- Stable markers: Use HTML comment markers with ULID for timeline/thread identification
- Best-effort recovery: Skip conflicts, report failures, continue processing
- Deterministic ordering: Events ordered by
(created_at DESC, id DESC)for tie-breaking
Event Types¶
| Event Type | Tool | Stable ID |
|---|---|---|
pc_updated |
update_pc |
PC entity ID (e.g., jake) |
timeline_appended |
add_timeline_events |
ULID (e.g., 01JGXK...) |
thread_added |
add_thread |
ULID (e.g., 01JGXK...) |
thread_resolved |
resolve_thread |
Thread ULID |
ID Generation: All event/marker IDs are server-generated ULIDs. Never model-generated. ULIDs sort chronologically and are collision-free.
Database Schema¶
CREATE TABLE rpg.vault_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES rpg.sessions(id) ON DELETE CASCADE,
message_id UUID REFERENCES rpg.chat_messages(id) ON DELETE SET NULL,
-- Event classification
event_type TEXT NOT NULL,
entity_type TEXT, -- 'pc', 'timeline', 'threads'
entity_id TEXT NOT NULL, -- ULID for timeline/threads, entity ID for PC
-- Two-phase commit status
status TEXT NOT NULL DEFAULT 'pending',
-- Event data with before/after state for rollback
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT valid_event_type CHECK (event_type IN (
'pc_updated', 'timeline_appended', 'thread_added', 'thread_resolved'
)),
CONSTRAINT valid_status CHECK (status IN ('pending', 'applied', 'failed'))
);
-- Rollback queries: newest-first per session with deterministic tie-break
CREATE INDEX idx_vault_events_session_order
ON rpg.vault_events(session_id, created_at DESC, id DESC);
-- Message-based rollback (rewind feature)
CREATE INDEX idx_vault_events_message_order
ON rpg.vault_events(message_id, created_at DESC, id DESC)
WHERE message_id IS NOT NULL;
-- Find pending events (cleanup/recovery)
CREATE INDEX idx_vault_events_pending
ON rpg.vault_events(status, created_at)
WHERE status = 'pending';
Stable Markers¶
Line numbers drift when files are edited. Use HTML comment markers with ULIDs instead.
Timeline Markers¶
When appending to timeline.md, wrap with sentinels:
## Day 5
### Morning
<!-- rpg:event:01JGXKAB1234567890ABCDEF:begin -->
- Jake met with Marlena at The Salty Sigil
- Discussed infiltration plan for the salon
<!-- rpg:event:01JGXKAB1234567890ABCDEF:end -->
### Afternoon
<!-- rpg:event:01JGXKCD5678901234GHIJKL:begin -->
- Jake traveled to the Gilded Quill
<!-- rpg:event:01JGXKCD5678901234GHIJKL:end -->
Insertion rules:
1. Find ## Day {day} section (create if missing)
2. Find ### {time_of_day} subsection within day (create if missing)
3. Insert marker block containing ONLY the new bullet lines
4. Marker wraps the inserted content, not existing content
Thread Markers¶
When adding to open-threads.md, include thread ULID:
## High Priority
<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->
### The Fading Muffle (URGENT)
- The stolen Vintner's Guild token has a scrying anchor
- **Status:** Clock is ticking
- **Next step:** Execute infiltration before it fails
<!-- rpg:thread:01JGXKGH3456789012STUVWX -->
### Silent Circle Salon (TONIGHT)
- Infiltrate as Courier Valen
Thread block boundary rules (for removal):
1. Find marker line <!-- rpg:thread:{ulid} -->
2. Block ends at the FIRST of:
- Next <!-- rpg:thread: marker
- Next ## section header (level 2)
- End of file
3. Remove: marker line + all lines until end boundary (exclusive of boundary)
Benefits: - Survives hand-edits in Obsidian - Survives reformatting - Invisible when rendered - Unique, searchable anchor - ULID prevents collisions
Event Payloads¶
pc_updated¶
{
"entity_id": "jake",
"changes": {
"location": {"before": "the-salty-sigil", "after": "gilded-quill"},
"hp_current": {"before": 13, "after": 10},
"gold": {"before": 50, "after": 45}
}
}
Inverse: For each field where current == after, set to before.
timeline_appended¶
{
"entity_id": "01JGXKAB1234567890ABCDEF",
"day": 5,
"time_of_day": "Morning",
"events": [
{"text": "Jake met with Marlena", "time": "Morning"}
],
"marker_begin": "<!-- rpg:event:01JGXKAB1234567890ABCDEF:begin -->",
"marker_end": "<!-- rpg:event:01JGXKAB1234567890ABCDEF:end -->",
"file_line_range": {"start": 156, "end": 160}
}
Inverse algorithm:
1. Find content between marker_begin and marker_end (most stable)
2. Fallback: Remove lines start to end if markers missing
3. Fallback: Report failure in RollbackResult.failures
thread_added¶
{
"entity_id": "01JGXKEF9012345678MNOPQR",
"name": "The Missing Artifact",
"priority": "high",
"description": "Find the stolen crown before the ritual",
"marker": "<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->"
}
Inverse: Remove block from marker to next boundary (see boundary rules above).
thread_resolved¶
{
"entity_id": "01JGXKEF9012345678MNOPQR",
"name": "Rescue Gareth",
"resolution": "Jake found Gareth in Sunfall Hills gully",
"marker": "<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->",
"previous_state": {
"section": "## High Priority",
"content": "<!-- rpg:thread:01JGXKEF9012345678MNOPQR -->\n### Rescue Gareth\n- Gareth walked into the hills..."
}
}
Inverse:
1. Remove resolved entry from ## Completed/Resolved
2. Re-insert previous_state.content into previous_state.section
Two-Phase Write Pattern¶
Avoid orphan mutations (vault written but event not logged).
async def update_pc_with_logging(
vault: Vault,
session_id: UUID,
message_id: UUID,
changes: dict,
) -> PCState:
pc = _get_active_pc(vault)
# 1. Capture before-state
before = {field: pc.frontmatter.get(field) for field in changes.keys()}
# 2. Log event as PENDING
event_id = await event_logger.log_event(
session_id=session_id,
message_id=message_id,
event_type="pc_updated",
entity_id=pc.id,
payload={"changes": {
field: {"before": before[field], "after": value}
for field, value in changes.items()
if before[field] != value
}},
status="pending",
)
try:
# 3. Apply vault mutation
vault.update_entity(EntityType.PC, pc.id, set_values=changes)
# 4. Mark event as APPLIED
await event_logger.mark_applied(event_id)
except Exception as e:
# Mark as failed, vault may be in inconsistent state
await event_logger.mark_failed(event_id, str(e))
raise
return get_pc_state(vault)
Rollback only processes status='applied' events.
Rollback Semantics¶
Conflict-Aware (Compare-and-Swap)¶
Rollback is conditional: only revert if current value matches the event's after value.
def _inverse_pc_update(self, payload: dict, vault: Vault) -> FieldRollbackResult:
pc = vault.get_entity(EntityType.PC, payload["entity_id"])
reverted = []
skipped = []
set_values = {}
for field, change in payload["changes"].items():
current = pc.frontmatter.get(field)
if current == change["after"]:
# Current matches expected - safe to revert
set_values[field] = change["before"]
reverted.append(field)
else:
# Changed by another session or hand-edit - skip
skipped.append({
"field": field,
"expected": change["after"],
"current": current,
"reason": "current != after"
})
if set_values:
vault.update_entity(EntityType.PC, payload["entity_id"], set_values=set_values)
return FieldRollbackResult(reverted=reverted, skipped=skipped)
Ordering for Rollback¶
Events are processed newest-first with deterministic ordering:
SELECT * FROM rpg.vault_events
WHERE session_id = $1 AND status = 'applied'
ORDER BY created_at DESC, id DESC
The id DESC tie-break ensures consistent ordering even if created_at has microsecond collisions.
Rewind Semantics¶
rollback_from_message(message_id) undoes events from message N onwards (inclusive), newest-first.
- "Rewind to before message N" = undo N and everything after
- Events are processed in reverse chronological order
- Each event is independently conflict-checked
RollbackResult¶
Machine-readable result for debugging and audit UI:
@dataclass
class RollbackResult:
events_seen: int
events_reversed: int
skipped_conflicts: list[dict] # Events skipped due to current != after
failures: list[dict] # Events that failed to rollback
success: bool # True if no failures (skipped conflicts are OK)
Example:
{
"events_seen": 18,
"events_reversed": 14,
"skipped_conflicts": [
{
"event_id": "evt_abc123",
"event_type": "pc_updated",
"reason": "current != after",
"field": "location",
"expected": "gilded-quill",
"current": "the-salty-sigil"
}
],
"failures": [
{
"event_id": "evt_def456",
"event_type": "timeline_appended",
"reason": "marker not found, line_range invalid"
}
],
"success": false
}
Pending Event Cleanup¶
Policy (v1 - no inference)¶
On startup and periodically (every 5 minutes):
- Find
status='pending'events older than 2 minutes - Mark as
status='failed'witherror='timeout_pending' - Log warning with event details
Do not attempt to verify if mutation was applied. That's a rabbit hole for v1.
async def cleanup_stale_pending(cutoff_minutes: int = 2):
"""Mark old pending events as failed."""
result = await conn.execute("""
UPDATE rpg.vault_events
SET status = 'failed',
payload = payload || '{"error": "timeout_pending"}'::jsonb
WHERE status = 'pending'
AND created_at < NOW() - INTERVAL '%s minutes'
RETURNING id
""", cutoff_minutes)
if result:
logger.warning(f"Marked {len(result)} stale pending events as failed")
Pending events are never rolled back. Only applied events are considered.
Implementation¶
1. Event Logger¶
# api/vault/events.py
class VaultEventLogger:
"""Logs vault mutations for audit and rollback."""
async def log_event(
self,
session_id: UUID,
message_id: UUID,
event_type: str,
entity_id: str,
payload: dict,
status: str = "pending",
) -> UUID:
"""Log a vault event. Returns event ID."""
async def mark_applied(self, event_id: UUID) -> None:
"""Mark event as successfully applied."""
async def mark_failed(self, event_id: UUID, error: str) -> None:
"""Mark event as failed with error message."""
async def get_session_events(
self,
session_id: UUID,
status: str = "applied",
) -> list[VaultEvent]:
"""Get events for a session, ordered by (created_at DESC, id DESC)."""
2. Rollback Engine¶
# api/vault/rollback.py
class VaultRollback:
"""Rolls back vault state by reversing events."""
async def rollback_session(
self,
session_id: UUID,
vault: Vault,
) -> RollbackResult:
"""Rollback all applied events for a session (newest first)."""
async def rollback_from_message(
self,
session_id: UUID,
message_id: UUID,
vault: Vault,
) -> RollbackResult:
"""Rollback events from message N onwards (inclusive, newest first)."""
def _inverse_pc_update(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
"""Apply inverse of PC update with conflict detection."""
def _inverse_timeline_append(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
"""Remove timeline entries by marker (fallback to line range)."""
def _inverse_thread_added(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
"""Remove thread block by marker."""
def _inverse_thread_resolved(self, event: VaultEvent, vault: Vault) -> EventRollbackResult:
"""Unresolve thread, restore to previous section."""
3. Delete Session Hook¶
async def delete_session(session_id: UUID, vault: Vault):
rollback = VaultRollback(database_url)
# 1. Rollback vault changes
result = await rollback.rollback_session(session_id, vault)
logger.info(
f"Rollback complete: {result.events_reversed}/{result.events_seen} reversed, "
f"{len(result.skipped_conflicts)} conflicts, {len(result.failures)} failures"
)
if result.failures:
logger.warning(f"Rollback failures: {result.failures}")
# 2. Delete session (cascades to messages and events)
await store.delete_session(session_id)
return result
API Integration¶
Session Delete with Rollback¶
Wire rollback into the session delete endpoint to restore vault state:
# api/routes/chat.py
from api.vault_rollback import create_rollback_engine
from api.vault_events import get_vault_event_store
@router.delete("/sessions/{session_id}")
async def delete_session(campaign_id: str, session_id: UUID):
vault_path = settings.campaigns_path / campaign_id
store = await get_chat_store()
# 1. Rollback vault changes
event_store = await get_vault_event_store(store._pool)
rollback = await create_rollback_engine(vault_path, event_store)
result = await rollback.rollback_session(session_id)
logger.info(
f"Rollback: {result.events_reversed}/{result.events_seen} reversed, "
f"{len(result.skipped_conflicts)} conflicts"
)
if result.failures:
logger.warning(f"Rollback failures: {result.failures}")
# 2. Delete session (cascades to messages and events)
await store.delete_session(session_id)
return {
"deleted": True,
"rollback": {
"events_seen": result.events_seen,
"events_reversed": result.events_reversed,
"conflicts": len(result.skipped_conflicts),
"failures": len(result.failures),
}
}
Message Rewind¶
Rollback from a specific message onwards (for "undo" feature):
@router.delete("/sessions/{session_id}/messages/{message_id}")
async def rewind_to_message(campaign_id: str, session_id: UUID, message_id: UUID):
vault_path = settings.campaigns_path / campaign_id
store = await get_chat_store()
# 1. Rollback vault changes from message onwards
event_store = await get_vault_event_store(store._pool)
rollback = await create_rollback_engine(vault_path, event_store)
result = await rollback.rollback_from_message(session_id, message_id)
# 2. Delete messages from message_id onwards
await store.delete_messages_from(session_id, message_id)
return {
"rewound_to": str(message_id),
"rollback": {
"events_seen": result.events_seen,
"events_reversed": result.events_reversed,
}
}
Implementation Status¶
Completed¶
- [x]
rpg.vault_eventstable with two-phase write schema - [x] ULID markers in
lib/ops/timeline.py - [x] ULID markers in
lib/ops/threads.py - [x]
VaultEventStore(api/vault_events.py) - two-phase write - [x]
VaultOps(api/vault_ops.py) - wrapper with event logging - [x]
VaultRollback(api/vault_rollback.py) - rollback engine - [x] Pending event cleanup on startup (
api/main.py)
Wired¶
- [x] Wire rollback to
DELETE /sessions/{session_id} - [x] Wire rollback to
DELETE /sessions/{session_id}/messages/{message_id}
Future¶
- [ ] PC update event logging (requires tool refactor)
- [ ] Audit view in chat UI
- [ ] Branching/forking sessions
Implementation Details¶
File Structure¶
api/
├── vault_events.py # VaultEventStore - two-phase write to DB
├── vault_ops.py # VaultOps - wraps lib/ops with event logging
├── vault_rollback.py # VaultRollback - rollback engine
└── routes/chat.py # Endpoints with rollback wiring
lib/
├── models.py # PatchResult with data field for metadata
└── ops/
├── timeline.py # ULID markers for timeline entries
└── threads.py # ULID markers for threads
supabase/migrations/
└── 005_vault_events.sql # Database schema + cleanup function
Key Components¶
1. lib/ops/timeline.py - Timeline with ULID Markers¶
Refactored to use v3 schema with time_of_day anchoring:
def apply_append_timeline_op(op: dict, vault_path: Path, dry_run: bool = False) -> PatchResult:
"""
V3 Schema: day, time_of_day, events
Inserts events under ### {time_of_day} within ## Day {day},
wrapped with ULID marker comments for deterministic rollback.
"""
day = op["day"]
time_of_day = op["time_of_day"] # e.g., "Morning", "Afternoon"
events = op["events"]
# Generate server-side ULID
event_ulid = str(ulid.new())
marker_begin = f"<!-- rpg:event:{event_ulid}:begin -->"
marker_end = f"<!-- rpg:event:{event_ulid}:end -->"
# Build bullet lines from events
bullet_lines = [f"- {e['text']}" for e in events]
insert_block = "\n".join([marker_begin, *bullet_lines, marker_end])
# Insert under ### {time_of_day} within ## Day {day}
# Creates sections if they don't exist
return PatchResult(
op_type="append_timeline",
path="canon/timeline.md",
description=f"Day {day} {time_of_day}: +{len(events)} events",
success=True,
data={
"event_ulid": event_ulid,
"marker_begin": marker_begin,
"marker_end": marker_end,
"day": day,
"time_of_day": time_of_day,
"line_range": (start_line, end_line),
},
)
2. lib/ops/threads.py - Threads with ULID Markers¶
def apply_add_threads_op(op: dict, vault_path: Path, dry_run: bool = False) -> PatchResult:
"""
Adds threads with ULID marker comments.
Format in open-threads.md:
<!-- rpg:thread:01JGXK... -->
### Thread Name
- Description
- **Status:** New
"""
threads_added = []
for thread in op["threads"]:
thread_ulid = str(ulid.new())
marker = f"<!-- rpg:thread:{thread_ulid} -->"
thread_block = f"\n{marker}\n### {name}\n- {description}\n"
# Insert into appropriate priority section
threads_added.append({
"ulid": thread_ulid,
"name": name,
"marker": marker,
})
return PatchResult(
op_type="add_threads",
success=True,
data={"threads": threads_added},
)
def apply_resolve_thread_op(op: dict, vault_path: Path, dry_run: bool = False) -> PatchResult:
"""
Resolves thread by ULID marker (preferred) or name (fallback).
Moves to ## Completed section with strikethrough.
"""
# Find by marker first: <!-- rpg:thread:{ulid} -->
# Fallback to name: ### {name}
# Move to Completed section as ### ~~{name}~~
3. api/vault_events.py - Event Store¶
class VaultEventStore:
"""Two-phase write event store."""
async def log_pending(
self,
session_id: UUID,
event_type: EventType, # pc_updated, timeline_appended, thread_added, thread_resolved
entity_id: str,
payload: dict,
message_id: Optional[UUID] = None,
) -> VaultEvent:
"""Phase 1: Log event as 'pending' BEFORE mutation."""
async def mark_applied(self, event_id: UUID) -> bool:
"""Phase 2: Mark as 'applied' AFTER successful mutation."""
async def mark_failed(self, event_id: UUID, error: str) -> bool:
"""Mark as 'failed' if mutation fails."""
async def get_session_events(self, session_id: UUID, status: str = None) -> list[VaultEvent]:
"""Get events ordered by (created_at DESC, id DESC)."""
async def cleanup_stale_pending(self, age_hours: int = 1) -> int:
"""Mark old pending events as failed (crash recovery)."""
4. api/vault_ops.py - Operations Wrapper¶
class VaultOps:
"""Wraps lib/ops functions with automatic event logging."""
def __init__(self, vault_path: Path, session_id: UUID, event_store: VaultEventStore):
self.vault_path = vault_path
self.session_id = session_id
self.event_store = event_store
async def append_timeline(self, day: int, time_of_day: str, events: list) -> PatchResult:
"""
1. Log pending event
2. Call apply_append_timeline_op()
3. Update event with ULID and markers from result
4. Mark applied
"""
async def add_threads(self, threads: list) -> PatchResult:
"""Same two-phase pattern for threads."""
async def resolve_thread(self, name: str = None, thread_ulid: str = None) -> PatchResult:
"""Resolve by ULID or name."""
5. api/vault_rollback.py - Rollback Engine¶
@dataclass
class RollbackResult:
success: bool
events_seen: int = 0
events_reversed: int = 0
skipped_conflicts: list[str] = field(default_factory=list)
failures: list[str] = field(default_factory=list)
class VaultRollback:
"""Reverses vault mutations using event history."""
async def rollback_session(self, session_id: UUID, dry_run: bool = False) -> RollbackResult:
"""Rollback all applied events for a session."""
events = await self.event_store.get_session_events(session_id, status="applied")
return await self._rollback_events(events, dry_run)
async def rollback_from_message(self, session_id: UUID, message_id: UUID) -> RollbackResult:
"""Rollback events from message onwards (rewind)."""
events = await self.event_store.get_events_from_message(session_id, message_id)
return await self._rollback_events(events, dry_run)
def _reverse_timeline_append(self, event: VaultEvent, dry_run: bool) -> bool:
"""Remove marker block: <!-- rpg:event:{ulid}:begin --> ... <!-- rpg:event:{ulid}:end -->"""
# Uses regex to find and remove the entire block
pattern = rf"^{re.escape(marker_begin)}\s*\n.*?^{re.escape(marker_end)}\s*\n?"
def _reverse_thread_add(self, event: VaultEvent, dry_run: bool) -> bool:
"""Remove thread block from marker to next boundary."""
# Boundary = next marker OR next ## header OR EOF
def _reverse_thread_resolve(self, event: VaultEvent, dry_run: bool) -> bool:
"""Remove from Completed section. Note: original content NOT restored in v1."""
def _reverse_pc_update(self, event: VaultEvent, dry_run: bool) -> bool:
"""Compare-and-swap: only revert if current == after."""
6. Database Migration (005_vault_events.sql)¶
-- Table
CREATE TABLE rpg.vault_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES rpg.sessions(id) ON DELETE CASCADE,
message_id UUID REFERENCES rpg.chat_messages(id) ON DELETE SET NULL,
event_type TEXT NOT NULL,
entity_type TEXT,
entity_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for rollback queries
CREATE INDEX idx_vault_events_session_order
ON rpg.vault_events(session_id, created_at DESC, id DESC);
-- Cleanup function
CREATE OR REPLACE FUNCTION rpg.cleanup_stale_pending_events(age interval)
RETURNS integer AS $$
DECLARE
cleaned integer;
BEGIN
WITH updated AS (
UPDATE rpg.vault_events
SET status = 'failed',
payload = payload || '{"error": "timeout_pending"}'::jsonb
WHERE status = 'pending'
AND created_at < NOW() - age
RETURNING id
)
SELECT count(*) INTO cleaned FROM updated;
RETURN cleaned;
END;
$$ LANGUAGE plpgsql;
API Endpoints¶
| Endpoint | Method | Rollback Behavior |
|---|---|---|
/campaigns/{id}/chat/sessions/{session_id} |
DELETE | Rolls back all session events, then deletes |
/campaigns/{id}/chat/sessions/{session_id}/messages/{message_id} |
DELETE | Rolls back from message onwards, then deletes messages |
Response includes rollback stats:
{
"success": true,
"deleted_session_id": "abc-123",
"rollback": {
"events_seen": 5,
"events_reversed": 5,
"conflicts": 0,
"failures": 0
}
}
Commits¶
| Commit | Description |
|---|---|
2d80ec4 |
feat: v3 ULID markers for vault ops |
e20576b |
feat: vault event logging layer (two-phase write) |
2907196 |
feat: vault rollback engine |
c4f32c0 |
feat: wire rollback to session/message delete endpoints |
Testing Plan¶
Unit Tests¶
Timeline Operations (test_timeline_ops.py)¶
def test_append_timeline_creates_markers():
"""Verify ULID markers wrap inserted content."""
result = apply_append_timeline_op({
"day": 1,
"time_of_day": "Morning",
"events": [{"text": "Hero woke up"}],
}, vault_path)
assert result.success
assert result.data["event_ulid"]
assert "<!-- rpg:event:" in result.data["marker_begin"]
content = (vault_path / "canon/timeline.md").read_text()
assert result.data["marker_begin"] in content
assert result.data["marker_end"] in content
assert "- Hero woke up" in content
def test_append_timeline_creates_day_section():
"""New day section created if missing."""
def test_append_timeline_creates_time_subsection():
"""New ### Morning subsection created if missing."""
def test_append_timeline_preserves_existing_content():
"""Existing entries not modified."""
def test_append_multiple_events_single_marker():
"""Multiple events wrapped in single marker block."""
Thread Operations (test_thread_ops.py)¶
def test_add_thread_with_marker():
"""Thread prefixed with ULID marker."""
result = apply_add_threads_op({
"threads": [{"name": "Find the key", "priority": "high", "description": "Search the dungeon"}]
}, vault_path)
content = (vault_path / "canon/open-threads.md").read_text()
assert "<!-- rpg:thread:" in content
assert "### Find the key" in content
def test_add_thread_correct_priority_section():
"""Thread inserted into correct ## Priority section."""
def test_resolve_thread_by_ulid():
"""Resolve finds thread by ULID marker."""
def test_resolve_thread_by_name_fallback():
"""Resolve finds thread by name if no ULID."""
def test_resolve_moves_to_completed():
"""Resolved thread appears in ## Completed with strikethrough."""
Rollback Operations (test_rollback.py)¶
def test_rollback_timeline_removes_marker_block():
"""Timeline rollback removes content between markers."""
# 1. Append timeline event
# 2. Verify markers in file
# 3. Rollback
# 4. Verify markers and content removed
def test_rollback_thread_removes_to_boundary():
"""Thread rollback removes from marker to next boundary."""
# Add two threads, remove first, verify second intact
def test_rollback_respects_thread_boundaries():
"""Removing thread A doesn't affect thread B below it."""
def test_rollback_handles_missing_markers():
"""Gracefully fails if markers deleted by hand-edit."""
def test_rollback_pc_compare_and_swap():
"""PC rollback only reverts if current == after."""
# 1. Update PC location A -> B
# 2. Hand-edit to C
# 3. Rollback should skip (conflict)
def test_rollback_pc_success():
"""PC rollback reverts when values match."""
def test_rollback_ordering_newest_first():
"""Events processed in reverse chronological order."""
def test_rollback_deterministic_tiebreak():
"""Same timestamp events ordered by id DESC."""
Integration Tests¶
Event Logging (test_event_logging.py)¶
@pytest.mark.asyncio
async def test_two_phase_write_success():
"""Event goes pending -> applied on success."""
ops = await create_vault_ops(vault_path, session_id, pool)
result = await ops.append_timeline(day=1, time_of_day="Morning", events=[...])
events = await event_store.get_session_events(session_id, status="applied")
assert len(events) == 1
assert events[0].event_type == "timeline_appended"
@pytest.mark.asyncio
async def test_two_phase_write_failure():
"""Event stays pending/failed on mutation error."""
@pytest.mark.asyncio
async def test_cleanup_stale_pending():
"""Old pending events marked as failed."""
Session Delete (test_session_delete.py)¶
@pytest.mark.asyncio
async def test_session_delete_rolls_back_timeline():
"""Deleting session removes timeline entries."""
# 1. Create session
# 2. Append timeline events
# 3. Verify events in file
# 4. Delete session
# 5. Verify events removed from file
@pytest.mark.asyncio
async def test_session_delete_rolls_back_threads():
"""Deleting session removes threads."""
@pytest.mark.asyncio
async def test_session_delete_returns_rollback_stats():
"""Response includes events_seen, events_reversed."""
@pytest.mark.asyncio
async def test_session_delete_with_conflicts():
"""Conflicts reported but don't block deletion."""
Message Rewind (test_message_rewind.py)¶
@pytest.mark.asyncio
async def test_rewind_rolls_back_from_message():
"""Events from message onwards are reversed."""
# 1. Send 3 messages, each creating timeline events
# 2. Rewind to message 2
# 3. Verify message 2 and 3 events removed
# 4. Message 1 events still present
@pytest.mark.asyncio
async def test_rewind_deletes_messages():
"""Messages from rewind point deleted."""
End-to-End Tests¶
Full Session Lifecycle (test_e2e_session.py)¶
@pytest.mark.asyncio
async def test_full_session_lifecycle():
"""
1. Create campaign and session
2. Chat: "I explore the forest"
- DM adds timeline event
- DM adds thread "Mysterious sounds"
3. Chat: "I investigate the sounds"
- DM updates timeline
- DM resolves thread
4. Delete session
5. Verify vault is back to original state
"""
@pytest.mark.asyncio
async def test_interleaved_sessions():
"""
1. Session A: adds timeline event
2. Session B: adds timeline event
3. Delete Session A
4. Verify Session B's event still present
"""
Obsidian Compatibility (test_obsidian_compat.py)¶
def test_markers_survive_obsidian_formatting():
"""Markers preserved after Obsidian auto-format."""
def test_markers_invisible_in_preview():
"""HTML comments don't render in Obsidian preview."""
def test_rollback_after_hand_edit():
"""Rollback works after user adds content above markers."""
Load Tests¶
@pytest.mark.asyncio
async def test_concurrent_timeline_appends():
"""Multiple concurrent appends don't corrupt file."""
@pytest.mark.asyncio
async def test_rollback_many_events():
"""Rollback handles 100+ events efficiently."""
Test Fixtures¶
@pytest.fixture
def temp_vault(tmp_path):
"""Create temporary vault with standard structure."""
vault = tmp_path / "test-campaign"
(vault / "canon").mkdir(parents=True)
# Create timeline.md
(vault / "canon/timeline.md").write_text("""---
title: Timeline
---
# Timeline
## Day 1
### Morning
- Campaign begins
""")
# Create open-threads.md
(vault / "canon/open-threads.md").write_text("""---
title: Open Threads
---
# Open Threads
## High Priority
_No active threads._
## Medium Priority
_No active threads._
## Completed
_None yet._
""")
return vault
@pytest.fixture
async def db_pool():
"""Create test database connection pool."""
@pytest.fixture
async def event_store(db_pool):
"""Create VaultEventStore with test pool."""
Test Execution¶
# Run all vault event tests
pytest tests/vault/ -v
# Run with coverage
pytest tests/vault/ --cov=api/vault --cov=lib/ops --cov-report=html
# Run specific test category
pytest tests/vault/test_rollback.py -v
# Run integration tests (requires DB)
pytest tests/vault/ -m integration -v
CI Pipeline¶
# .github/workflows/test.yml
vault-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_DB: rpg_test
POSTGRES_USER: test
POSTGRES_PASSWORD: test
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt -r requirements-test.txt
- name: Run migrations
run: |
psql -h localhost -U test -d rpg_test -f supabase/migrations/001_chat_history.sql
psql -h localhost -U test -d rpg_test -f supabase/migrations/005_vault_events.sql
- name: Run tests
run: pytest tests/vault/ -v --cov
Edge Cases¶
Interleaved Sessions¶
Multiple active sessions modifying vault concurrently.
Handled by: Compare-and-swap rollback. Each event is independently checked. Session A's rollback won't clobber Session B's changes if B wrote after A.
Obsidian Hand-Edits¶
User edits vault files directly in Obsidian.
Handled by: - Markers survive hand-edits (HTML comments are preserved) - Compare-and-swap detects when values don't match - Skipped conflicts are reported, not errors
Pending Events (Crash Recovery)¶
Server crashes between log and apply.
Handled by: Cleanup job marks stale pending as failed. No inference attempted.
Markers Deleted¶
User removes HTML comment markers from vault files.
Handled by: Fallback chain:
1. Try marker-based removal
2. Fallback to line range (if still valid)
3. Report failure in RollbackResult.failures
Merge Checklist¶
- [ ] Markers use server-generated ULID; collisions impossible
- [ ] Thread rollback boundary rules implemented + tested
- [ ] Timeline insertion uses day/time anchors; marker wraps only inserted lines
- [ ] Rollback queries order by
(created_at DESC, id DESC) - [ ] Rollback ignores non-applied events
- [ ] Pending cleanup job marks stale pending as failed (no inference)
Critical Tests¶
- Interleaved PC updates: A then B change same field; deleting A should skip conflict
- Obsidian edit drift: Insert extra lines above timeline marker; rollback still finds marker
- Marker deleted: Rollback produces failure entry, continues with other events
- Thread boundary: Removing
thr_Xdoesn't removethr_Ybelow it - Rewind ordering: Two events same
created_at; ordering deterministic viaid
Future Enhancements¶
Branching¶
Create alternate timelines by: 1. Rollback to message N 2. Fork session with new ID 3. Continue from that point
Audit View¶
UI to see what changed in each message:
Message: "I go to the Gilded Quill"
→ location: the-salty-sigil → gilded-quill
→ timeline: +2 events (01JGXKAB...)
Powered by RollbackResult structure.
entity_created Event¶
When DM invents new NPCs/locations, log creation as event so delete/rewind can remove them:
{
"event_type": "entity_created",
"entity_type": "npc",
"entity_id": "new-merchant",
"payload": {"path": "canon/npcs/new-merchant.md"}
}
PC Revision Tracking (Optional)¶
For stronger conflict detection, add _revision to PC frontmatter: