Skip to content

Aggregated stream

We will use the same message store as in the previous chapter here, but we will create a new set of streams within it:

import random

for _ in range(20):
    n = random.randint(0, 200)
    stream = f"aggregate-me-{n % 5}"
    message_store.write(stream=stream, message=EventA(num=n))

For our aggregated stream, we need to prepare a partition function (or rather class).

from depeche_db import StoredMessage


class NumMessagePartitioner:
    def get_partition(self, message: StoredMessage[EventA | EventB]) -> int:
        if isinstance(message.message, EventA):
            return message.message.num % 3
        return 0

Now we can put together the aggregated stream.

aggregated_stream = message_store.aggregated_stream(
    name="example_docs_aggregate_me2",
    partitioner=NumMessagePartitioner(),
    stream_wildcards=["aggregate-me-%"],
)
aggregated_stream.projector.update_full()

Whenever we call update_full, all new messages in the origin streams will be appended to the relevant partition of the aggregated stream in the right order. We will not have to call this manually though. We can use the Executor to do it for us.

We can read from the aggregated stream directly:

print(next(aggregated_stream.read(partition=2)))
#  AggregatedStreamMessage(
#      partition=2,
#      position=0,
#      message_id=UUID("1f804185-e63d-462e-b996-d6f16e5ff8af")
#  )

The AggregatedStreamMessage object contains minimal metadata about the message in the context of the aggregated stream. It does not contain the original message though. To get that, we need to use the message store reader.

Usually though we will not read the aggregated stream directly, but rather use a subscription to consume it. We will get to that in the next chapter.