Skip to main content
Semantic hooks let you react to what Khora extracts, in real time, as documents are ingested or recalls run. Subscribe to an event type, optionally attach a filter, and your callback fires when a matching event passes. Use them for notifications, dashboards, or downstream pipelines.
from khora.hooks import SemanticFilter

async def on_entity(event):
    print(f"New entity: {event.data.get('name')} ({event.data.get('entity_type')})")

# Fire on every new entity:
kb.subscribe("entity.created", on_entity)

# Or only on organizations:
kb.subscribe("entity.created", on_entity,
             filter=SemanticFilter(name="orgs", entity_types=["ORGANIZATION"]))
subscribe(event_type, callback, filter=None) returns a subscription id. unsubscribe(id) removes it. kb.hooks exposes the underlying dispatcher.

The 3-level filter cascade

Filters are evaluated cheapest-first. Each level only runs if the previous passed, so you pay LLM cost only for genuinely ambiguous cases:
LevelMechanismCostWhat it does
0Type pre-filter + structural match DSLFreeMatch on entity/relationship types or patterns over event.data
1Embedding similaritySub-millisecondCosine gate against the filter’s description
2LLM yes/noPer-call LLMNano-model adjudication for ambiguous events (default OFF)

Level 0: types and the match DSL

Type filters need no computation. The match field adds EventBridge-style structural patterns over event.data (pure data, no code execution):
SemanticFilter(
    name="high_confidence_acme",
    entity_types=["ORGANIZATION"],
    match={
        "name": [{"prefix": "Acme"}],
        "confidence": [{"numeric": [">=", 0.8]}],
    },
)
Operators include prefix, suffix, equals-ignore-case, wildcard, numeric, anything-but, exists, and contains-all. Top-level $or gives disjunction, other keys combine with AND. Nested dot-notation is intentionally unsupported, so pre-flatten anything you want to match into event.data.

Level 1: embedding pre-screen

Give the filter a description and Khora compares its embedding against the entity/relationship embedding (binary-quantized Hamming, then cosine on survivors):
SemanticFilter(name="ai_research",
               description="Research related to AI and machine learning",
               similarity_threshold=0.5)

Level 2: LLM evaluation

Level 2 is default OFF (KHORA_HOOKS_LLM_EVALUATION_ENABLED=true to enable) and costs real money, a nano-LLM call per passing event. It only fires when the filter supplies examples (without them the LLM has no calibration and is skipped).
SemanticFilter(
    name="strategic_mentions",
    description="Any mention of a strategic business decision",
    entity_types=["EVENT", "CONCEPT"],
    examples=["The board approved a $50M acquisition.", "Q3 strategy: pivot to enterprise."],
    anti_examples=["Lunch was tasty today."],
    llm_confidence_threshold=0.5,
)

Event types

Subscribe to any of these stable string event types (canonical enum: EventType in khora.core.models.event):
GroupEvents
Entityentity.created, entity.updated, entity.merged, entity.deleted
Relationshiprelationship.created, relationship.updated, relationship.deleted
Chunkchunk.created, chunk.embedded, chunk.entities_resolved
Documentdocument.created/updated/deleted, document.processed, document.failed
Episode / Namespaceepisode.*, namespace.*
Sync / Recallsync.started/completed/failed/checkpoint, recall.requested/results_ready/completed

Co-occurrence filtering

A single entity.created event carries one entity, so the match DSL can’t express “alert when X and Y appear in the same chunk.” Subscribe to chunk.entities_resolved instead. It fires once per chunk with the full set under event.data["entity_ids"] and entity_names_by_type, and you do the set check in your callback:
async def flag_cooccurrence(event):
    by_type = event.data.get("entity_names_by_type", {})
    if "Acme" in set(by_type.get("PERSON", [])) and \
       any("security" in c.lower() for c in by_type.get("CONCEPT", [])):
        await alert(event.data["chunk_id"])

kb.subscribe("chunk.entities_resolved", flag_cooccurrence)

Use case: a proactive agent that watches its own memory

Give an agent a long-term memory and the obvious design is a poll loop: every few seconds the agent re-asks its memory “has anything relevant to my goal shown up?” That’s the wrong shape for a memory that’s constantly being written to. It’s polling a firehose. Semantic hooks invert it. The agent registers its interest once (a standing subscription describing what would matter to it), and Khora calls back the instant ingestion produces a matching entity or relationship. The agent’s memory stops being something it has to interrogate and becomes something that notifies it.
from khora.hooks import SemanticFilter

# The agent is tracking one goal: renewal risk for a key account.
async def on_signal(event):
    # Fires the moment a matching fact lands — wake the agent to act on it.
    await agent.handle_signal(event.data)

kb.subscribe(
    "entity.created", on_signal,
    filter=SemanticFilter(
        name="acme_renewal_risk",
        description="Churn, dissatisfaction, or escalation signals about Acme Corp",
        entity_types=["ORGANIZATION", "PERSON", "EVENT"],
    ),
)
# The agent keeps ingesting email, tickets, and call notes; on_signal fires
# only when something about Acme's renewal risk actually arrives.
Why a hook beats a poll loop here:
  • No polling: less cost, simpler architecture. A poll re-runs a search over the entire memory on every tick, whether or not anything changed, paying for embeddings and graph traversal again and again, and forcing you to build the scaffolding around it: a scheduler, a “what’s new since last time?” cursor, and de-duplication so the agent doesn’t re-fire on facts it already handled. A hook removes all of it. The filter is evaluated once per new fact, at ingest time, and its first level (type / structural match) is free. Cost tracks new information, not corpus-size × poll-frequency, and there’s no loop, no cursor, no dedupe bookkeeping to maintain.
  • Quick reaction. A poll can only react on its next tick, so your worst-case latency is the whole poll interval. The agent is always a beat behind. A hook fires inline, the instant the matching entity or relationship is created, so the agent responds in real time. For anything where lateness costs you (a renewal slipping, a risk signal, a competitor move), that gap is the difference between acting and reacting.

Cost controls

Level 2 ships three layers of protection, all OpenTelemetry-instrumented:
  1. Default-OFF gate: Level 1 is final unless you explicitly enable LLM evaluation.
  2. Token budgets: rolling-hour caps, per-namespace (llm_max_tokens_per_namespace_per_hour, default 10000) and per-subscription (default 0 = off, so one noisy filter can’t drain the namespace). On breach, the batch fails open (preserves the Level 1 match) and emits khora.hooks.llm.throttled_total.
  3. Decision cache + coalescing: identical events short-circuit the LLM (TTL + LRU cache keyed on a hashed event summary; a burst of 50 identical events → 1 LLM call).
Tune everything through KHORA_HOOKS_* env vars (ENABLED, FILTER_MODEL default gpt-4.1-nano, DEFAULT_SIMILARITY_THRESHOLD, MAX_CONCURRENT_CALLBACKS, CALLBACK_TIMEOUT_SECONDS, the LLM/budget/cache knobs).
code

Examples

See the whole pipeline in runnable tutorials: remember, recall, abstain, forget.
input

Ingestion

Where extraction events fire during the write path.