Skip to main content
Everything on this page is part of Khora’s stable public API, pinned by __all__ in khora/__init__.py. Additive changes land in minor releases; breaking changes require a major bump. Private imports (khora.engines.*, khora.query.engine, khora.pipelines.flows) are not stable.
from khora import (
    Khora, KhoraConfig, KhoraError, SearchMode,
    RememberResult, RecallResult, BatchResult, BatchHandle, DocumentResult, Stats, LLMUsage,
    DocumentSource, EventType, SemanticFilter, context_text,
    create_engine, list_engines, register_engine,
    ExpertiseConfig, EntityTypeConfig, RelationshipTypeConfig,
)

Khora

The primary facade. Delegates to a pluggable engine (default vectorcypher).
Khora(
    database_url: str | KhoraConfig | None = None,
    *,
    engine: str = "vectorcypher",
    graph_url: str | None = None,
    embedding_model: str = "text-embedding-3-small",
    engine_kwargs: dict[str, Any] | None = None,
    run_migrations: bool = False,
)
Pass a PostgreSQL URL or a full KhoraConfig, or nothing, to read KHORA_DATABASE_URL / KHORA_NEO4J_URL. run_migrations=True runs Alembic under an advisory lock on connect. Credential fields are pydantic.SecretStr (rendered as '**********', call .get_secret_value() to read).
async with Khora(...) as kb:      # connect() / disconnect() automatic
    ...

# or manually:
kb = Khora(...); await kb.connect()
try: ...
finally: await kb.disconnect()

Namespaces

ns = await kb.create_namespace(*, config_overrides=None)            # MemoryNamespace
ns = await kb.get_namespace(namespace_id: UUID)                     # | None
ns = await kb.get_namespace_by_stable_id(namespace_id: str | UUID)  # stable-id lookup
create_namespace is keyword-only, no positional name. Use ns.namespace_id (the stable public id) everywhere below, not the row-level ns.id. See Namespaces & isolation.

Writing

remember

result: RememberResult = await kb.remember(
    content: str,
    *,
    namespace: str | UUID,
    title: str = "", source: str = "", source_type: str = "library",
    source_name: str | None = None, source_url: str | None = None,
    metadata: dict | None = None,
    entity_types: list[str],            # required
    relationship_types: list[str],      # required
    expertise: ExpertiseConfig | None = None,
    chunk_strategy: ChunkStrategy | None = None,   # "fixed" | "semantic" | "recursive" | "conversation"
    external_id: str | None = None,     # None or non-blank ≤512 chars
    session_id: UUID | None = None,
)
Ingests through the three-phase pipeline (see Ingestion). session_id propagates to the document and its chunks for session-scoped recall and forget_session.

remember_batch

result: BatchResult = await kb.remember_batch(
    documents: list[dict],
    *,
    namespace: str | UUID,
    max_concurrent: int = 10,
    deduplicate: bool = True,
    infer_relationships: bool = True,
    on_progress: Callable[[int, int], None] | None = None,
    entity_types: list[str], relationship_types: list[str],
    expertise: ExpertiseConfig | None = None,
    chunk_strategy: ChunkStrategy | None = None,
    # ... shared provenance kwargs as remember()
)
Concurrent ingestion with cross-document dedup. Each dict accepts the same per-document fields as remember(), and per-doc values override the top-level kwargs.

submit_batch

handle: BatchHandle = await kb.submit_batch(
    documents: list[dict],
    *,
    on_result: Callable[[int, int, DocumentResult], None],
    namespace: str | UUID,
    entity_types: list[str], relationship_types: list[str],
    max_concurrent: int = 20,
    reprocess_archived: bool = False,
    session_id: UUID | None = None,
    # ... shared provenance kwargs
)
Deferred ingestion: stages every doc as PENDING, returns immediately. Requires kb.start_pending_processor() (after connect()) or it raises. await handle.wait() blocks until every document’s on_result has fired.

Reading

recall

result: RecallResult = await kb.recall(
    query: str,
    *,
    namespace: str | UUID,
    limit: int = 10,
    mode: SearchMode = SearchMode.HYBRID,
    min_similarity: float = 0.0,
    start_time: datetime | None = None,
    end_time: datetime | None = None,
)
start_time / end_time are honored on all three engines (both naive or both aware). Fusion weights, reranking, HyDE, and recency are global (KhoraConfig.query / KHORA_QUERY_*). There’s no config= kwarg. See Retrieval.

context_text

from khora import context_text
text: str = context_text(result: RecallResult, *, max_chunks: int = 5)
Renders a RecallResult into a flat LLM-context string (chunks grouped by document title, then --- Entities --- and --- Relationships --- sections).

Entity & document reads

entity   = await kb.get_entity(entity_id, namespace=ns.namespace_id)        # Entity | None
document = await kb.get_document(document_id, namespace=ns.namespace_id)     # Document | None
entities = await kb.list_entities(namespace=ns.namespace_id, entity_type=None, limit=100)
related  = await kb.find_related_entities(entity_id, namespace=ns.namespace_id, max_depth=2)
stats    = await kb.stats(namespace=ns.namespace_id)                        # Stats
namespace is required on these (accepts str | UUID). Cross-namespace ids resolve to None / empty rather than leaking the foreign row. The isolation contract holds at every layer (see Namespaces & isolation).

Deleting

removed: bool = await kb.forget(document_id: UUID, *, namespace: str | UUID)
deleted: int  = await kb.forget_session(namespace_id: UUID, session_id: UUID)
forget_session cascade-deletes every document tagged with session_id (chunks via FK cascade, graph cleanup via the engine). For TTL cleanup, the opt-in helper khora.gc.expire_sessions(*, kb, before, namespace_id=None) calls forget_session for each session whose newest document predates before. Khora runs no scheduler. Call it from your own loop.

Result types

All result types are frozen, slotted dataclasses. RememberResult: document_id, namespace_id, chunks_created, entities_extracted, relationships_created, metadata, llm_usage. BatchResult: total / processed / skipped / failed, chunks / entities / relationships, metadata, llm_usage. BatchHandle: id, total, and await handle.wait(). DocumentResult (per-doc on_result payload): document_id, namespace_id, success, error, per-doc counts, llm_usage, skipped. Stats: documents / chunks / entities / relationships, last_activity_at.

RecallResult

FieldTypeNotes
querystrThe original query
namespace_idUUIDNamespace searched
chunkslist[RecallChunk]Scored chunks (score is a typed field)
entitieslist[RecallEntity]Scored entities with provenance ids
relationshipslist[RecallRelationship]Connections between entities from VectorCypher’s graph traversal
documentslist[DocumentProjection]Deduplicated source docs
engine_infodictEngine telemetry: always carries "engine", plus max_raw_vector_score
usagelist[LLMUsage]Tokens spent during recall
Producer invariant: every chunks[i].document_id and every id in entities[i].source_document_ids / relationships[i].source_document_ids appears in documents[]. RecallChunk carries id, document_id, content, score, occurred_at, connected_entity_ids, chunker_info.

SearchMode

SearchMode.VECTOR    # pgvector / HNSW only
SearchMode.GRAPH     # Cypher / graph traversal only
SearchMode.KEYWORD   # BM25 / full-text only
SearchMode.HYBRID    # vector + graph + keyword, fused via RRF (default)
SearchMode.ALL       # every available channel

Engines

from khora import create_engine, list_engines, register_engine

list_engines()                                # registered engine names
register_engine("my_engine", "my.module", "MyEngineClass")   # lazy registration
vectorcypher is the default and the engine these docs cover. Prefer the engine= argument to Khora(...) over create_engine directly. Custom engines must implement the full MemoryEngineProtocol. See VectorCypher.

Expertise

ExpertiseConfig (a stable public API) defines a domain ontology, with entity/relationship types plus a system prompt, correlation rules, and inference rules. See Expertise & ontologies for the full guide:
from khora import ExpertiseConfig, EntityTypeConfig, RelationshipTypeConfig

expertise = ExpertiseConfig(
    name="medical_research",
    entity_types=[EntityTypeConfig(name="DRUG", description="..."), ...],
    relationship_types=[RelationshipTypeConfig(name="TREATS", description="..."), ...],
)
await kb.remember(content, namespace=ns.namespace_id, expertise=expertise,
                  entity_types=expertise.get_entity_type_names(),
                  relationship_types=expertise.get_relationship_type_names())

Hooks & errors

kb.subscribe(event_type, callback, filter=None) / kb.unsubscribe(id) / kb.hooks. See Semantic hooks. All domain errors subclass KhoraError. Catch it at system boundaries.
from khora import KhoraError
try:
    await kb.remember(...)
except KhoraError as exc:
    ...
input

Ingestion

What remember() / remember_batch() / submit_batch() do under the hood.
search

Retrieval

What recall() does and how to read a RecallResult.