Skip to content

AggregatedStream

depeche_db.AggregatedStreamFactory(store)

Bases: Generic[E]

This factory is accessible on the message store:

store = MessageStore(...)
stream = store.aggregated_stream(
    name="stream_name",
    partitioner=...,
    stream_wildcards=["stream_%"]
)

__call__(name, partitioner, stream_wildcards, update_batch_size=None)

Create an aggregated stream.

Parameters:

Name Type Description Default
name str

The name of the stream, needs to be a valid python identifier

required
partitioner MessagePartitioner[E]

A partitioner for the stream

required
stream_wildcards List[str]

A list of stream wildcards

required
update_batch_size Optional[int]

The batch size for updating the stream

None

depeche_db.AggregatedStream(name, store, partitioner, stream_wildcards, update_batch_size=None)

Bases: Generic[E]

AggregatedStream aggregates multiple streams into one (partitioned) stream.

Read more about aggregated streams under Data Model.

The update_batch_size argument can be used to control the batch size of the update process. Higher numbers will result in less database roundtrips but also in higher memory usage.

Parameters:

Name Type Description Default
name str

Stream name, needs to be a valid python identifier

required
store MessageStore[E]

Message store

required
partitioner MessagePartitioner[E]

Message partitioner

required
stream_wildcards List[str]

List of stream wildcards

required
update_batch_size Optional[int]

Batch size for updating the stream, defaults to 100

None

Attributes:

Name Type Description
name str

Stream name

projector StreamProjector

Stream projector

subscription SubscriptionFactory

Factory to create subscriptions on this stream

get_migration_ddl(name) classmethod

DDL Script to migrate from <=0.8.0

get_partition_statistics(position_limits=None, result_limit=None)

Get partition statistics for deciding which partitions to read from. This is used by subscriptions.

global_position_to_positions(global_position)

Get the positions for each partition at a given global position.

Parameters:

Name Type Description Default
global_position int

Global position

required

Returns:

Type Description
Dict[int, int]

A dictionary mapping partition numbers to positions

migrate_db_objects(name, conn) classmethod

Migrate from <=0.8.0

read(partition)

Read all messages from a partition of the aggregated stream.

Parameters:

Name Type Description Default
partition int

Partition number

required

read_slice(partition, start, count)

Read a slice of messages from a partition of the aggregated stream.

Parameters:

Name Type Description Default
partition int

Partition number

required
start int

Start position

required
count int

Number of messages to read

required

time_to_positions(time)

Get the positions for each partition at a given time.

Parameters:

Name Type Description Default
time datetime

Time to get positions for (must be timezone aware)

required

Returns:

Type Description
Dict[int, int]

A dictionary mapping partition numbers to positions

truncate(conn)

Truncate aggregated stream.


depeche_db.StreamProjector(stream, partitioner, stream_wildcards, batch_size=None)

Bases: Generic[E]

Stream projector is responsible for updating an aggregated stream.

The update process is locked to prevent concurrent updates. Thus, it is fine to run the projector in multiple processes.

Implements: RunOnNotification

notification_channel: str property

Returns the notification channel name for this projector.

run()

Runs the projector once.

stop()

No-Op on this class.

update_full()

Updates the projection from the last known position to the current position.


depeche_db.MessagePartitioner

Bases: Protocol, Generic[E]

Message partitioner is a protocol that is used to determine partition number for a message.

get_partition(message: StoredMessage[E]) -> int

Returns partition number for a message. The partition number must be a positive integer. The partition number must be deterministic for a given message.