AggregatedStream
depeche_db.AggregatedStreamFactory(store)
Bases: Generic[E]
This factory is accessible on the message store:
store = MessageStore(...)
stream = store.aggregated_stream(
name="stream_name",
partitioner=...,
stream_wildcards=["stream_%"]
)
__call__(name, partitioner, stream_wildcards, update_batch_size=None)
Create an aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the stream, needs to be a valid python identifier |
required |
partitioner |
MessagePartitioner[E]
|
A partitioner for the stream |
required |
stream_wildcards |
List[str]
|
A list of stream wildcards |
required |
update_batch_size |
Optional[int]
|
The batch size for updating the stream |
None
|
depeche_db.AggregatedStream(name, store, partitioner, stream_wildcards, update_batch_size=None)
Bases: Generic[E]
AggregatedStream aggregates multiple streams into one (partitioned) stream.
Read more about aggregated streams under Data Model.
The update_batch_size
argument can be used to control the batch size of the
update process. Higher numbers will result in less database roundtrips but
also in higher memory usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Stream name, needs to be a valid python identifier |
required |
store |
MessageStore[E]
|
Message store |
required |
partitioner |
MessagePartitioner[E]
|
Message partitioner |
required |
stream_wildcards |
List[str]
|
List of stream wildcards |
required |
update_batch_size |
Optional[int]
|
Batch size for updating the stream, defaults to 100 |
None
|
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
Stream name |
projector |
StreamProjector
|
Stream projector |
subscription |
SubscriptionFactory
|
Factory to create subscriptions on this stream |
get_migration_ddl(name)
classmethod
DDL Script to migrate from <=0.8.0
get_partition_statistics(position_limits=None, result_limit=None)
Get partition statistics for deciding which partitions to read from. This is used by subscriptions.
global_position_to_positions(global_position)
Get the positions for each partition at a given global position.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
global_position |
int
|
Global position |
required |
Returns:
Type | Description |
---|---|
Dict[int, int]
|
A dictionary mapping partition numbers to positions |
migrate_db_objects(name, conn)
classmethod
Migrate from <=0.8.0
read(partition)
Read all messages from a partition of the aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition |
int
|
Partition number |
required |
read_slice(partition, start, count)
Read a slice of messages from a partition of the aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition |
int
|
Partition number |
required |
start |
int
|
Start position |
required |
count |
int
|
Number of messages to read |
required |
time_to_positions(time)
Get the positions for each partition at a given time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time |
datetime
|
Time to get positions for (must be timezone aware) |
required |
Returns:
Type | Description |
---|---|
Dict[int, int]
|
A dictionary mapping partition numbers to positions |
truncate(conn)
Truncate aggregated stream.
depeche_db.StreamProjector(stream, partitioner, stream_wildcards, batch_size=None)
Bases: Generic[E]
Stream projector is responsible for updating an aggregated stream.
The update process is locked to prevent concurrent updates. Thus, it is fine to run the projector in multiple processes.
Implements: RunOnNotification
notification_channel: str
property
Returns the notification channel name for this projector.
run()
Runs the projector once.
stop()
No-Op on this class.
update_full()
Updates the projection from the last known position to the current position.
depeche_db.MessagePartitioner
Bases: Protocol
, Generic[E]
Message partitioner is a protocol that is used to determine partition number for a message.
get_partition(message: StoredMessage[E]) -> int
Returns partition number for a message. The partition number must be a positive integer. The partition number must be deterministic for a given message.