AggregatedStream
depeche_db.AggregatedStreamFactory(store)
Bases: Generic[E]
This factory is accessible on the message store:
store = MessageStore(...)
stream = store.aggregated_stream(
name="stream_name",
partitioner=...,
stream_wildcards=["stream_%"]
)
__call__(name, partitioner, stream_wildcards, update_batch_size=None)
Create an aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the stream, needs to be a valid python identifier |
required |
partitioner |
MessagePartitioner[E]
|
A partitioner for the stream |
required |
stream_wildcards |
List[str]
|
A list of stream wildcards |
required |
update_batch_size |
Optional[int]
|
The batch size for updating the stream |
None
|
depeche_db.AggregatedStream(name, store, partitioner, stream_wildcards, update_batch_size=None)
Bases: Generic[E]
AggregatedStream aggregates multiple streams into one (partitioned) stream.
Read more about aggregated streams under Data Model.
The update_batch_size
argument can be used to control the batch size of the
update process. Higher numbers will result in less database roundtrips but
also in higher memory usage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Stream name, needs to be a valid python identifier |
required |
store |
MessageStore[E]
|
Message store |
required |
partitioner |
MessagePartitioner[E]
|
Message partitioner |
required |
stream_wildcards |
List[str]
|
List of stream wildcards |
required |
update_batch_size |
Optional[int]
|
Batch size for updating the stream, defaults to 100 |
None
|
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
Stream name |
projector |
StreamProjector
|
Stream projector |
subscription |
SubscriptionFactory
|
Factory to create subscriptions on this stream |
async_reader(start_point=None)
Get an async reader for the aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start_point |
Optional[SubscriptionStartPoint]
|
Start point for the reader |
None
|
get_migration_ddl(name)
classmethod
DDL Script to migrate from <=0.8.0
get_partition_statistics(position_limits=None, result_limit=None, conn=None)
Get partition statistics for deciding which partitions to read from. This is used by subscriptions.
global_position_to_positions(global_position)
Get the positions for each partition at a given global position.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
global_position |
int
|
Global position |
required |
Returns:
Type | Description |
---|---|
Dict[int, int]
|
A dictionary mapping partition numbers to positions |
migrate_db_objects(name, conn)
classmethod
Migrate from <=0.8.0
read(partition, conn=None)
Read all messages from a partition of the aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition |
int
|
Partition number |
required |
conn |
Optional[SAConnection]
|
Optional connection to use for reading. If not provided, a new connection will be created. |
None
|
read_slice(partition, start, count, conn=None)
Read a slice of messages from a partition of the aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition |
int
|
Partition number |
required |
start |
int
|
Start position |
required |
count |
int
|
Number of messages to read |
required |
conn |
Optional[SAConnection]
|
Optional connection to use for reading. If not provided, a new connection will be created. |
None
|
reader(start_point=None)
Get a reader for the aggregated stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start_point |
Optional[SubscriptionStartPoint]
|
Start point for the reader |
None
|
time_to_positions(time)
Get the positions for each partition at a given time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time |
datetime
|
Time to get positions for (must be timezone aware) |
required |
Returns:
Type | Description |
---|---|
Dict[int, int]
|
A dictionary mapping partition numbers to positions |
truncate(conn)
Truncate aggregated stream.
depeche_db.StreamProjector(stream, partitioner, stream_wildcards, batch_size=None)
Bases: Generic[E]
Stream projector is responsible for updating an aggregated stream.
The update process is locked to prevent concurrent updates. Thus, it is fine to run the projector in multiple processes.
Implements: RunOnNotification
notification_channel: str
property
Returns the notification channel name for this projector.
run(budget=None)
Runs the projector once.
stop()
No-Op on this class.
update_full(budget=None)
Updates the projection from the last known position to the current position.
depeche_db.MessagePartitioner
Bases: Protocol
, Generic[E]
Message partitioner is a protocol that is used to determine partition number for a message.
get_partition(message: StoredMessage[E]) -> int
Returns partition number for a message. The partition number must be a positive integer. The partition number must be deterministic for a given message.
depeche_db.AggregatedStreamReader(stream, start_point=None)
Bases: Generic[E]
get_messages(timeout=0)
On the first call, get all messages in the stream after the start_point. On subsequent calls, get all messages in the stream after the last returned message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int
|
Only wait for this many seconds when there are no new messages. |
0
|
start()
Start the notification listener.
This method should be called before calling get_messages.
The notification listener will listen for notifications on the notification channel of the stream. In order to do so, it creates a a connection to the database and listens for notifications on a separate thread. The connection is closed when the notification listener is stopped.
stop()
Stop the notification listener.
depeche_db.AsyncAggregatedStreamReader(stream, start_point=None)
Bases: Generic[E]
get_messages(timeout=0)
async
On the first call, get all messages in the stream after the start_point. On subsequent calls, get all messages in the stream after the last returned message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int
|
Only wait for this many seconds when there are no new messages. |
0
|
start()
async
Start the notification listener.
stop()
async
Stop the notification listener.