Subscription
Given the aggregated stream from the previous chapter, we can put together a subscription.
You can read from a subscription directly. Whenever get_next_messages
emits
a message, it will update the position of the subscription, so that the next
call will return the next message.
The emitted message is wrapped in a SubscriptionMessage
object which contains
the metadata about the message in the context of the subscription/aggregated stream.
for message in subscription.get_next_messages(count=1):
print(message)
# SubscriptionMessage(
# partition=2,
# position=0,
# stored_message=StoredMessage(
# message_id=UUID("1f804185-e63d-462e-b996-d6f16e5ff8af"),
# stream="aggregate-me-1",
# version=1,
# message=EventA(
# event_id=UUID("1f804185-e63d-462e-b996-d6f16e5ff8af"),
# happened_at=datetime.datetime(2023, 10, 5, 20, 3, 26, 658725),
# num=176,
# ),
# global_position=4,
# ),
# )
Reading from a subscription directly is not the most common use case though.
In order to continously handle messages on a subscription we create a
MessageHandlerRegister
and pass this in when we create the subscription.
On the MessageHandlerRegister
we register a handler for the
message type(s) we are interested in.
You can register multiple handlers for different message types but the handled
message types must not overlap. Given your message type E
, you can request
SubscriptionMessage[E]
, StoredMessage[E]
or E
as the type of the
argument to the handler by using type hints.
from depeche_db import SubscriptionMessage, MessageHandlerRegister
handlers = MessageHandlerRegister[EventA | EventB]()
@handlers.register
def handle_event_a(msg: SubscriptionMessage[EventA]):
real_message = msg.stored_message.message
print(f"num={real_message.num} (partition {msg.partition} at {msg.position})")
Now we can create a new subscription with these handlers.
subscription = aggregated_stream.subscription(
name="sub_example_docs_with_handlers",
handlers=handlers,
)
Running run_once
will read the unprocessed messages from the subscription and call
the registered handlers (if any).
subscription.runner.run_once()
# num=111 (partition 0 at 0)
# num=199 (partition 1 at 0)
# num=166 (partition 1 at 1)
# num=0 (partition 0 at 1)
# num=152 (partition 2 at 0)
# num=172 (partition 1 at 2)
# num=12 (partition 0 at 2)
# ...
Running run_once
will read the unprocessed messages from the subscription and call
the registered handlers (if any).
In a real application, we would not call run_once
directly, but we would use
the Executor
to do it for us.
A subscription by default starts at the beginning of the stream. If we want to
change this behaviour, we can pass in a SubscriptionStartPosition
object when we
create the subscription. This object can be one of the following:
from datetime import timezone
from depeche_db import StartAtNextMessage, StartAtPointInTime
subscription_next = aggregated_stream.subscription(
name="sub_example_docs_aggregate_me_next", start_point=StartAtNextMessage()
)
subscription_point_in_time = aggregated_stream.subscription(
name="sub_example_docs_aggregate_me_next",
start_point=StartAtPointInTime(
datetime(2023, 10, 5, 14, 0, 0, 0, tzinfo=timezone.utc)
),
)