ADR-0237shipped

Thread-Oriented Conversation Intelligence

Context

ADR-0236 wired realtime message indexing, but the current shape is still flat — individual messages with no durable thread grouping, no vault correlation, no gap detection, and no operator-facing synthesis at the conversation level.

The gateway can see raw messages, but it still can’t reliably answer:

  • What are people talking about that matters to active projects?
  • Which conversations should become vault notes, project updates, or decisions?
  • Which email threads need Joel’s reply and why?
  • What important topics are appearing in channels without any vault coverage?

Current implementation constraints

  • channel_messages exists but only stores flat message records.
  • channel-message-classify currently writes classification, topics, urgency, and summary, but not SKOS workload concepts.
  • channel-message-classify currently accepts slack, discord, and telegram messages, but not email, even though channel-message-ingest already ingests email.
  • email_threads already embeds subject + summary with MiniLM, and vault_notes already lives in the same 384-dim auto-embedding space.
  • Gateway context gathering still benefits from thread-level synthesis more than flat message excerpts.

Data landscape

CollectionDocsEmbeddingNotes
channel_messages0 (new, ADR-0236)None todayFlat messages, has thread_id, no embedding or concept facets
slack_messages265NoneStale backfill, has thread_ts
email_threads28384d auto (MiniLM)VIP pipeline, subject+summary embed
vault_notes3,826384d auto (MiniLM)Has project, tags fields
memory_observations18,854Agent memory
docs_chunks_v2223,165PDF/docs corpus

Embedding baseline: Typesense auto-embedding with ts/all-MiniLM-L12-v2 (384-dim). Ollama nomic-embed-text (768-dim) remains available for bulk/offline workflows but is not the runtime choice for this conversation surface.

Workload taxonomy baseline: joelclaw:scheme:workload:v1 from the SKOS taxonomy skill.

Decision

Build a three-layer conversation intelligence system:

  1. Messages remain canonical atomic records in channel_messages
  2. Threads become lightweight aggregate records in a new conversation_threads collection
  3. Vault correlation happens from message embeddings, not by duplicating raw message text into thread docs

This keeps ingestion cheap, thread views useful, and retrieval aligned with existing vault_notes embeddings.

Layer 1: Message-Level Classification + Embedding

Extend channel_messages with MiniLM embeddings and SKOS workload facets:

{ name: "embedding", type: "float[]", embed: { from: ["text"], model_config: MINI_LM_MODEL_CONFIG } },
{ name: "primary_concept_id", type: "string", facet: true, optional: true },
{ name: "concept_ids", type: "string[]", facet: true, optional: true },
{ name: "taxonomy_version", type: "string", facet: true, optional: true },
{ name: "concept_source", type: "string", facet: true, optional: true },

channel-message-classify is extended to:

  • accept email as a first-class channel_type
  • emit primary_concept_id
  • emit ordered concept_ids
  • emit taxonomy_version
  • emit concept_source
  • keep existing classification, topics, urgency, actionable, and summary

That puts channel_messages and vault_notes in the same embedding space, enabling direct similarity search without a separate projection layer.

Layer 2: Thread Index

Create a new conversation_threads collection for aggregate thread metadata only:

interface ConversationThread {
  id: string;                     // "slack:{channel_id}:{thread_ts}" or "email:{conversation_id}"
  source: "slack" | "email";
  channel_id: string;
  channel_name: string;
  thread_id: string;
  participants: string[];
  message_count: number;
  first_message_at: number;
  last_message_at: number;
  status: "active" | "stale" | "resolved";
 
  primary_concept_id: string;
  concept_ids: string[];
  taxonomy_version: string;
 
  summary: string;
  related_projects: string[];
  related_contacts: string[];
  vault_gap: boolean;
  vault_gap_signal: string;
  urgency: "low" | "normal" | "high" | "critical";
  needs_joel: boolean;
  enriched_at: number;
 
  embedding: number[];            // auto-embed from summary
}

Rules:

  • conversation_threads does not duplicate raw message bodies
  • message lookup happens through channel_messages filtered by channel_id + thread_id
  • summary embedding comes from the thread summary field, not concatenated messages
  • aggregation is deterministic and cheap; LLM enrichment is debounced

Layer 3: Vault Correlation

Thread enrichment works from classified message records:

  1. Fetch all messages for the thread from channel_messages
  2. For each message, vector-search vault_notes by message embedding
  3. Deduplicate nearest-note hits across the thread
  4. Derive related_projects from matched vault note project facets
  5. Derive related_contacts from vault/contact hits when present
  6. Mark vault_gap when thread messages consistently miss vault coverage beyond a similarity threshold
  7. Generate a one-sentence thread summary plus needs_joel, urgency, and vault_gap_signal

The LLM prompt should be concept-aware and thread-aware. It should summarize from classified messages plus nearest vault matches, not from unbounded raw text dumps.

Event Flow

Event names follow the existing request/occurred convention — no imperative command events.

Message arrives (Slack/Front)

channel/message.received → channel-message-ingest
  → upsert to channel_messages
  → emit channel/message.classify.requested

channel-message-classify
  → classify message
  → write concepts + urgency + summary to channel_messages
  → emit conversation/thread.updated

conversation/thread.updated → conversation-thread-aggregate
  → aggregate participants, counts, timestamps, concept union
  → if enrichment threshold met: emit conversation/thread.enrichment.requested

conversation/thread.enrichment.requested → conversation-thread-enrich
  → vector search thread messages against vault_notes
  → compute project/contact matches and vault gap signal
  → summarize thread
  → update conversation_threads

Debounce Rules

ConditionEnrich?Reason
First message in new threadYesNeed initial thread record and summary
5+ new messages since last enrichmentYesContext materially changed
30+ minutes since last enrichment and new messages existYesCatch drift/late arrivals
48h no activityNo enrich; mark stalePreserve signal, skip wasted LLM work

Gateway Context Contract (ADR-0235 Follow-On)

The gateway should shift from flat message snippets toward thread-level retrieval.

Primary queries:

searchTypesense("conversation_threads", "*", {
  filter_by: "needs_joel:true || urgency:=[high,critical]",
  sort_by: "last_message_at:desc",
  per_page: "10",
});
 
searchTypesense("conversation_threads", "*", {
  filter_by: "vault_gap:true && status:active",
  sort_by: "last_message_at:desc",
  per_page: "5",
});

Operator-facing context format:

### Conversations Needing Attention
- 🔴 #lc-egghead: "Course creation broken for Alvaro" (creeland, 3 msgs) → 06-video-ingest
- 🟡 Email: "AI Hero 2026 Workspace" (Alex, Matt, 8 msgs) → needs reply
 
### Vault Gaps
- #lc-egghead thread about Rails transaction errors → no vault coverage
- Email about AI Hero close date → no decision captured

Non-Goals

  • Replacing channel_messages as the canonical atomic store
  • Copying full message transcripts into conversation_threads
  • Building a general-purpose CRM or ticketing system here
  • Solving Discord/Telegram threading semantics in the first cut
  • Moving runtime embeddings off the current Typesense MiniLM baseline

Implementation Plan

Required skills preflight

  • system-bus — existing channel intelligence and worker conventions
  • inngest-durable-functions — durable function design for new thread functions
  • inngest-steps — event chaining, step boundaries, and memoized enrichment flow
  • inngest-events — event naming and payload contracts
  • inngest-flow-control — debounce/throttle/concurrency rules for enrichment
  • inngest-middleware — keep gateway/telemetry context aligned
  • gateway — gateway context retrieval and operator-facing synthesis
  • skos-taxonomy — correct concept_ids / primary_concept_id contract
  • o11y-logging — no silent failure in enrichment/search paths

Affected paths

  • packages/system-bus/src/lib/typesense.ts
  • packages/system-bus/src/inngest/functions/channel-message-classify.ts
  • packages/system-bus/src/inngest/functions/channel-message-ingest.ts
  • packages/system-bus/src/inngest/functions/index.ts
  • packages/system-bus/src/inngest/functions/index.host.ts
  • packages/system-bus/src/inngest/functions/conversation-thread-aggregate.ts (new)
  • packages/system-bus/src/inngest/functions/conversation-thread-enrich.ts (new)
  • packages/system-bus/src/inngest/functions/conversation-thread-stale-sweep.ts (new)
  • pi/extensions/gateway/index.ts
  • docs/inngest-functions.md
  • docs/gateway.md

Ordered implementation steps

  1. Schema

    • Extend CHANNEL_MESSAGES_COLLECTION_SCHEMA with embedding and SKOS concept fields
    • Add CONVERSATION_THREADS_COLLECTION and schema to typesense.ts
    • Add helper(s) to ensure the new collection exists
  2. Classifier upgrade

    • Extend channel-message-classify.ts to accept email
    • Update the prompt/decoder to return SKOS workload concepts
    • Persist primary_concept_id, concept_ids, taxonomy_version, concept_source
    • Emit conversation/thread.updated after successful classification
  3. Thread aggregation

    • Add conversation-thread-aggregate.ts
    • Aggregate thread identity, participant list, message counts, timestamps, and concept union
    • Apply deterministic status rules (active, stale, resolved)
    • Decide whether enrichment should fire based on debounce thresholds
  4. Thread enrichment

    • Add conversation-thread-enrich.ts
    • Fetch thread messages from channel_messages
    • Search vault_notes using message embeddings
    • Compute related_projects, related_contacts, vault_gap, vault_gap_signal
    • Summarize the thread and set needs_joel / urgency
    • Write enriched fields back to conversation_threads
  5. Stale sweep

    • Add an hourly function to mark old inactive threads stale
    • Do not trigger enrichment during stale-only transitions
  6. Gateway retrieval

    • Update pi/extensions/gateway/index.ts to query conversation_threads
    • Prefer thread summaries and vault gaps over flat slack_messages excerpts
    • Keep the ADR-0235 demand-driven pattern intact: silent accumulation, surface on demand
  7. Backfill / migration

    • Backfill the existing slack_messages corpus into channel_messages
    • Re-run classification so concept fields and embeddings are populated
    • Build initial conversation_threads records from that backfill
    • Retire slack_messages from gateway retrieval once thread-based retrieval is verified
  8. Observability + docs

    • Emit OTEL for aggregate/enrich/stale-sweep success and failure paths
    • Update docs/inngest-functions.md and docs/gateway.md in the same change set

Verification

  • channel_messages stores MiniLM embeddings and SKOS concept facets in Typesense
  • channel-message-classify accepts email and persists workload concept metadata
  • conversation_threads exists and stores aggregate thread metadata without raw message duplication
  • thread enrichment performs vault-note similarity search and writes related_projects, vault_gap, and summary
  • gateway context retrieval uses conversation_threads for “needs Joel” and “vault gap” surfaces
  • backfilled Slack history can produce thread records without manual hand-editing
  • OTEL shows successful and failed aggregate/enrichment runs distinctly
  • docs are updated alongside implementation

Consequences

  • Gateway answers shift from flat message excerpts to conversation-level signal
  • Per-message embeddings enable vault correlation without a separate sync layer
  • SKOS concept fields make channel traffic facetable by workload concept
  • channel_messages becomes the canonical runtime message store; slack_messages becomes migration-only legacy data
  • Thread-level enrichment adds modest cost, but debounce rules keep it bounded
  • Vault gap detection becomes a continuous operational signal instead of an occasional manual review task

More Information

  • 2026-03-28 — Backend thread pipeline shipped in babf57dd (channel_messages embeddings + concepts, conversation_threads, aggregate/enrich/stale-sweep functions).
  • 2026-03-28 — Gateway demand-driven context switched to prefer conversation_threads for project momentum, relationship threads, and momentum risks.