Skip to main content

API Reference

Complete reference for the public Python API.

Defacto

The main entry point. Handles ingestion, building, querying, and lifecycle operations.

Constructor

from defacto import Defacto

d = Defacto(config, **kwargs)

config can be a directory path (string), a definitions dict, or a Definitions object. Defacto auto-detects the type.

ParameterTypeDefaultDescription
configstr, dict, or DefinitionsrequiredYAML directory path, definitions dict, or Definitions object
databasestrSQLite (auto)Database URL or file path
batch_sizeint100Events per processing batch
workersint1Rust thread pool size
shard_idintNoneShard index (0-based)
total_shardsintNoneTotal number of shards
namespacestr"defacto"Postgres schema prefix
kafkadictNone{"bootstrap_servers": "...", "topic": "..."}
cold_ledgerstrNoneS3 path for Delta Lake cold storage
dead_letterdictNone{"type": "file", "path": "..."} or {"type": "kafka", "topic": "..."}
log_levelstr"INFO"DEBUG, INFO, WARNING, ERROR
log_formatstr"console""console" or "json"

Ingestion

ingest()

result = d.ingest(source, events, process=None)
ParameterTypeDescription
sourcestrSource name (must match a source definition)
eventslist[dict]Raw event dicts
processbool or str or NoneTrue: process inline. None: append only. String: process using that version

Returns IngestResult.

Building

build()

result = d.build(version=None, full=False, from_raw=False)
ParameterTypeDefaultDescription
versionstractive versionDefinition version to build
fullboolFalseForce full rebuild
from_rawboolFalseForce re-normalization from raw input

Returns BuildResult.

build_status()

status = d.build_status(version=None)

Returns BuildStatus with cursor, pending_events, dirty, last_build_time, last_build_mode.

Lifecycle

tick()

result = d.tick(version=None, as_of=None)

Evaluates time rules for all entities. Returns TickResult.

merge()

result = d.merge(from_entity_id, into_entity_id, reason="")

Merges two entities. Returns MergeResult.

erase()

result = d.erase(entity_id)

Permanently deletes an entity and cascades through merge chains. Returns EraseResult.

redact()

count = d.redact(entity_id)

Redacts sensitive fields in an entity's events. Returns the number of events redacted.

Queries

table()

df = d.table("customer").execute()

Returns a DefactoTable with current state (where valid_to is null).

history()

df = d.history("customer").execute()
df = d.history("customer").as_of("2024-01-15").execute()

Returns a DefactoTable with full SCD Type 2 history.

tables()

collection = d.tables("customer", "order")
collection = d.tables() # all entity types

Returns a TableCollection.

query()

df = d.query("SELECT * FROM customer_history WHERE mrr > 100").execute()

Raw SQL with validation against entity definitions. Returns a DefactoTable.

timeline()

timeline = d.timeline(entity_id)

Returns a Timeline with chronological state change entries.

assert_entity()

d.assert_entity(entity_id, state="active", plan="pro", mrr=99.0)

Raises AssertionError if the entity doesn't match. For testing and CI.

Definition management

d.definitions

d.definitions.versions() # list all registered versions
d.definitions.active() # current active version name
d.definitions.get(version) # definitions dict for a version
d.definitions.register(version, definitions)
d.definitions.activate(version)
d.definitions.draft(version, based_on=None)

DefinitionsDraft

draft = d.definitions.draft("v2", based_on="v1")
draft.add_property("customer", "ltv", {"type": "number", "default": 0})
draft.add_state("customer", "suspended", {...})
draft.add_transition("customer", "active", "suspended", "suspend", {...})
draft.add_handler("customer", "active", "downgrade", {...})
draft.update_identity("customer", {"email": {"match": "exact"}})
draft.remove_property("customer", "old_field")
draft.remove_state("customer", "deprecated_state")
draft.validate() # returns ValidationResult
draft.diff() # dict of changes vs base version
draft.impact() # predicted build mode
draft.register() # commit to database

Inspection

d.ledger

d.ledger.count() # total events
d.ledger.count(source="app") # events from a specific source
d.ledger.events_for(entity_id) # events for a specific entity

d.identity

d.identity.lookup(hint_value) # entity ID for a hint
d.identity.hints(entity_id) # all hints for an entity

Standalone validation

from defacto import validate_definitions

result = validate_definitions(definitions_dict)
result.valid # bool
result.errors # list[str]
result.warnings # list[str]

close()

d.close()

Flushes pending events and releases all connections. Also works as a context manager:

with Defacto("definitions/") as d:
d.ingest("app", events, process=True)

DefactoTable

Returned by table(), history(), and query(). Wraps an Ibis expression. All Ibis methods (filter, select, mutate, order_by, group_by, join) pass through and return new DefactoTable instances.

MethodDescription
.execute()Execute and return pandas DataFrame
.as_of(timestamp)Filter to state at a point in time (use on history(), not table())
.resolve_merges()Include pre-merge history from absorbed entities
.to_pandas()Same as .execute()
.to_pyarrow()Execute and return PyArrow Table
.to_parquet(path)Export to Parquet file
.to_csv(path)Export to CSV file
.sql()Return generated SQL without executing

TableCollection

Returned by tables(). Iterable, indexable.

MethodDescription
collection["customer"]Get a specific entity table
for name, table in collectionIterate over entity tables
.history()Return new collection with full SCD history
.to_parquet(dir)Export to directory (one file per entity type)
.to_csv(dir)Export to CSV directory
.to_duckdb(path)Export to DuckDB file
.to_pandas()Dict of DataFrames
.to_networkx()NetworkX DiGraph
.to_graph_json(){nodes, edges} for D3/Cytoscape
.to_neo4j(url, auth=(user, pass))Export to Neo4j

Result types

All results are frozen dataclasses (immutable).

IngestResult

FieldTypeDescription
events_ingestedintEvents accepted
events_failedintEvents that failed normalization
duplicates_skippedintEvents rejected by dedup
failureslist[EventFailure]Details for each failed event

BuildResult

FieldTypeDescription
modestrSKIP, INCREMENTAL, FULL, FULL_RENORMALIZE, FULL_WITH_IDENTITY_RESET
events_processedintEvents interpreted
entities_createdintNew entities
entities_updatedintExisting entities with state changes
merges_detectedintIdentity merges during build
late_arrivalsintEvents with timestamps before watermark
failureslist[EventFailure]Events that failed interpretation

TickResult

FieldTypeDescription
effects_producedintTime rule effects that fired
entities_affectedintEntities with state changes
transitionsintState transitions produced

MergeResult

FieldTypeDescription
from_entity_idstrEntity merged away (loser)
into_entity_idstrEntity merged into (winner)
events_reassignedintEvents moved from loser to winner
entities_rebuiltintEntities rebuilt after merge

EraseResult

FieldTypeDescription
entity_idstrEntity erased
entities_erasedintTotal entities erased (including merge cascade)
events_deletedintLedger rows deleted

Timeline

FieldTypeDescription
entity_idstrEntity this timeline is for
entity_typestrEntity type
entrieslist[TimelineEntry]Chronological entries

TimelineEntry

FieldTypeDescription
timestampdatetimeWhen the event occurred
event_typestrNormalized event type
event_idstrUnique event identifier
effectslist[str]Human-readable effect descriptions
state_beforestr or NoneState before this event
state_afterstr or NoneState after this event

BuildStatus

FieldTypeDescription
cursorintLast processed sequence number
pending_eventsintEvents not yet processed
dirtyboolWhether a previous build was interrupted
last_build_timestr or NoneISO 8601 timestamp of last build
last_build_modestr or NoneMode of last build

Error types

All errors inherit from DefactoError.

ErrorWhen
ConfigErrorBad connection string, missing parameters
DefinitionErrorInvalid YAML, schema violations
IngestErrorAll events in a batch fail normalization
ValidationErrorSchema validation failure
BuildErrorInterpretation failure
StorageErrorDatabase read/write failure
IdentityErrorMerge conflict, cache corruption
NotFoundErrorEntity or version doesn't exist
ConsumerErrorKafka consumer failure
QueryErrorInvalid SQL reference