Aggregated stream
Aggregated streams are our main way to read and consume messages from multiple streams. An aggregated stream contains all the messages from the matched streams, partitioned according to a partition scheme. See data model for more details.
We will use the same message store as in the previous chapter here, but we will create a new set of streams within it:
import random
for _ in range(20):
n = random.randint(0, 200)
stream = f"aggregate-me-{n % 5}"
message_store.write(stream=stream, message=EventA(num=n))
For our aggregated stream, we need to prepare a partition function (or rather class).
from depeche_db import StoredMessage
class NumMessagePartitioner:
def get_partition(self, message: StoredMessage[EventA | EventB]) -> int:
if isinstance(message.message, EventA):
return message.message.num % 3
return 0
Now we can put together the aggregated stream.
aggregated_stream = message_store.aggregated_stream(
name="example_docs_aggregate_me2",
partitioner=NumMessagePartitioner(),
stream_wildcards=["aggregate-me-%"],
)
aggregated_stream.projector.update_full()
Whenever we call update_full
, all new messages in the origin streams will be
appended to the relevant partition of the aggregated stream in the right order.
We will not have to call this manually though. We can use the
Executor
to do it for us.
We can read from the aggregated stream directly:
print(next(aggregated_stream.read(partition=2)))
# AggregatedStreamMessage(
# partition=2,
# position=0,
# message_id=UUID("1f804185-e63d-462e-b996-d6f16e5ff8af")
# )
The AggregatedStreamMessage
object contains minimal metadata about the message
in the context of the aggregated stream. It does not contain the original message
though. To get that, we need to use the message store reader.
Usually though we will not read the aggregated stream directly, but rather use a reader or a subscription to consume it. We will cover subscriptions in the next chapter.
reader = aggregated_stream.reader()
reader.start()
print(next(reader.get_messages(timeout=1)))
reader.stop()
# SubscriptionMessage(
# partition=2,
# position=0,
# stored_message=StoredMessage(
# message_id=UUID("1f804185-e63d-462e-b996-d6f16e5ff8af"),
# stream="aggregate-me-1",
# version=1,
# message=EventA(
# event_id=UUID("1f804185-e63d-462e-b996-d6f16e5ff8af"),
# happened_at=datetime.datetime(2023, 10, 5, 20, 3, 26, 658725),
# num=176,
# ),
# global_position=4,
# ),
# )
The AggregatedStreamReader
will read the messages from the aggregated
stream and record its position in the stream while doing so.
If you specify a timeout
, it will wait for that long for new messages before returning.
The next call to get_messages
will only return new messages that have been
written to the stream since the last call.
aggregated_stream.reader()
takes an optional start_point
argument, which
specifies where to start reading from. See the subscription chapter
for more details on this.
The main use case of an AggregatedStreamReader
is for streaming
information based on messages. E.g. we can use it to implement a
GraphQL subscription that is used by a UI to live-update.
There is also an asynchonous version of the reader: aggregated_stream.async_reader()
.
The readers use PostgreSQL's LISTEN
/NOTIFY
mechanism to get notified of new messages.
The synchronous version of the reader starts a new thread to listen for notifications.
The asynchronous version uses an async listener.