Skip to main content

Configuration

All configuration is passed through the Defacto constructor. There are no config files or environment variables.

Constructor parameters

d = Defacto(
config,
database="postgresql://user:pass@host:5432/mydb",
batch_size=100,
workers=1,
shard_id=0,
total_shards=4,
namespace="defacto",
kafka={"bootstrap_servers": "localhost:9092", "topic": "entity-state"},
cold_ledger="s3://bucket/cold/",
dead_letter={"type": "file", "path": "/var/log/defacto/dead_letter.jsonl"},
log_level="INFO",
log_format="console",
)

config

The first positional argument. Accepts three formats:

FormatExampleDescription
Directory path"my-project/"Loads YAML from entities/, sources/, schemas/ subdirectories
Dict{"entities": {...}, "sources": {...}}Definitions as a Python dict
Definitions objectDefinitions.from_directory("...")Pre-loaded definitions

database

Database URL for the ledger, identity, and state history backends.

URLBackend
(omitted)SQLite, auto-created in .defacto/ inside the project directory
"postgresql://user:pass@host:5432/db"Postgres

SQLite is single-writer and intended for development. Postgres supports concurrent access and is required for sharding.

batch_size

Number of events per processing batch when using process=True. Events are buffered until the batch is full, then processed together. Larger batches reduce per-event overhead but increase latency.

Default: 100

workers

Size of the Rust thread pool for normalization and interpretation. In practice, increasing this has minimal effect because I/O dominates pipeline time, not computation.

Default: 1

shard_id and total_shards

Enable sharding. Each process owns a deterministic subset of entities.

d = Defacto("defs/", database="postgresql://...", shard_id=0, total_shards=4)

shard_id must be between 0 and total_shards - 1. All shards must use the same total_shards value. Entity assignment is via SHA-256 hash of the entity ID.

Both must be set together, or neither.

namespace

Postgres schema prefix for all tables. Useful for running multiple independent defacto environments on the same database (e.g., staging and production).

Default: "defacto"

Infrastructure tables go in {namespace}. and state history tables go in {namespace}_{version}..

kafka

Enables Kafka publishing. Snapshots are published to the topic after each batch, partitioned by entity ID for ordering guarantees.

kafka={
"bootstrap_servers": "localhost:9092",
"topic": "entity-state",
}

When Kafka is configured, defacto uses KafkaPublisher instead of InlinePublisher. Messages are compressed with lz4 and include a header indicating whether the message is a snapshot or a tombstone.

cold_ledger

S3 path for Delta Lake cold storage. Enables the TieredLedger, which combines hot Postgres with cold Delta Lake for cost-effective long-term retention.

cold_ledger="s3://my-bucket/defacto/cold/"

Requires the deltalake package: pip install defacto[tiered].

dead_letter

Where to route events that fail processing. Failed events are captured in result objects regardless of this setting. The dead letter sink provides a durable secondary destination.

# File-based (JSONL, one line per failure)
dead_letter={"type": "file", "path": "/var/log/defacto/dead_letter.jsonl"}

# Kafka topic
dead_letter={"type": "kafka", "bootstrap_servers": "localhost:9092", "topic": "dead-letter"}

Default: no dead letter sink (failures only in result objects).

log_level

Controls the verbosity of defacto's logging output.

LevelWhat's logged
DEBUGEverything, including per-batch details
INFOOperation completions (ingest, build, merge, erase)
WARNINGDefinition warnings, validation issues
ERRORStorage failures, delivery errors

Default: "INFO"

log_format

FormatDescription
"console"Human-readable, fixed-width columns
"json"Structured JSON, one object per line. All context fields included

Default: "console"

JSON format is intended for log aggregation systems (Datadog, Elasticsearch, CloudWatch). Console format is for local development.

Consumer configuration

consumer = Defacto.consumer(
kafka={"bootstrap_servers": "localhost:9092"},
database="postgresql://...",
store="postgresql://analytics-db/...",
batch_size=1000,
batch_timeout_ms=5000,
dead_letter=None,
log_level="INFO",
log_format="console",
)
ParameterDefaultDescription
kafkarequiredKafka connection config
databaserequiredShared database (reads definitions and version info)
storerequiredState history destination (where SCD tables are written)
batch_size1000Messages per write batch
batch_timeout_ms5000Max wait time before flushing a partial batch
dead_letterNoneSame format as above
log_level"INFO"Same as above
log_format"console"Same as above

Optional dependencies

Defacto's core depends only on ibis-framework[duckdb] and pyyaml. Backend-specific features require extras.

pip install defacto[postgres] # psycopg for Postgres
pip install defacto[kafka] # confluent-kafka
pip install defacto[tiered] # deltalake for Delta Lake cold storage
pip install defacto[networkx] # networkx + scipy for graph analytics
pip install defacto[neo4j] # neo4j driver
pip install defacto[s3] # boto3 for S3 access
pip install defacto[all] # everything