Subscription
depeche_db.SubscriptionFactory(stream)
Bases: Generic[E]
This factory is accessible on the aggregated stream:
stream : AggregatedStream = ...
subscription = stream.subscription(
name="subscription_name",
handlers=...,
call_middleware=...,
error_handler=...,
state_provider=...,
lock_provider=...,
)
__call__(name, handlers=None, batch_size=None, call_middleware=None, error_handler=None, state_provider=None, lock_provider=None, start_point=None)
Create a subscription.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
The name of the subscription, needs to be a valid python identifier |
required |
handlers |
Optional[MessageHandlerRegisterProtocol[E]]
|
Handlers to be called when a message is received, defaults to an empty register |
None
|
batch_size |
Optional[int]
|
Number of messages to read at once, defaults to 10, read more here |
None
|
call_middleware |
Optional[CallMiddleware]
|
A middleware to customize the call to the handlers |
None
|
error_handler |
Optional[SubscriptionErrorHandler]
|
A handler for errors raised by the handlers, defaults to handler that will exit the subscription |
None
|
state_provider |
Optional[SubscriptionStateProvider]
|
Provider for the subscription state, defaults to a PostgreSQL provider |
None
|
lock_provider |
Optional[LockProvider]
|
Provider for the locks, defaults to a PostgreSQL provider |
None
|
start_point |
Optional[SubscriptionStartPoint]
|
The start point for the subscription, defaults to beginning of the stream |
None
|
depeche_db.Subscription(name, stream, message_handler, batch_size=None, state_provider=None, lock_provider=None, start_point=None)
Bases: Generic[E]
A subscription is a way to read messages from an aggregated stream.
Read more about the subscription in the concepts section.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of the subscription, needs to be a valid python identifier |
required |
stream |
AggregatedStream[E]
|
Stream to read from |
required |
message_handler |
SubscriptionMessageHandler[E]
|
Handler for the messages |
required |
batch_size |
Optional[int]
|
Number of messages to read at once, defaults to 10, read more here |
None
|
state_provider |
Optional[SubscriptionStateProvider]
|
Provider for the subscription state, defaults to a PostgreSQL provider |
None
|
lock_provider |
Optional[LockProvider]
|
Provider for the locks, defaults to a PostgreSQL provider |
None
|
start_point |
Optional[SubscriptionStartPoint]
|
The start point for the subscription, defaults to beginning of the stream |
None
|
depeche_db.CallMiddleware
Bases: Generic[E]
Call middleware is a protocol that is used to wrap a call to a handler.
Typical implementation:
class MyCallMiddleware(CallMiddleware):
def __init__(self, some_dependency):
self.some_dependency = some_dependency
def call(self, handler, message):
# or use a DI container here
handler(message, some_dependency=self.some_dependency)
call(handler, message)
Calls a handler with a given message.
The type of the message depends on the type annotation of the handler function. See MessageHandlerRegister for more details.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
handler |
Callable
|
Handler |
required |
message |
Union[SubscriptionMessage[E], StoredMessage[E], E]
|
Message to be passed to the handler |
required |
depeche_db.SubscriptionErrorHandler
Bases: Generic[E]
Subscription error handler is a protocol that is used to handle errors that occur.
handle_error(error: Exception, message: SubscriptionMessage[E]) -> ErrorAction
Handles an error that occurred during message processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception
|
Error |
required |
message |
SubscriptionMessage[E]
|
Message that was being processed when the error occurred |
required |
Returns:
Type | Description |
---|---|
ErrorAction
|
Action to be taken |
depeche_db.ErrorAction
Bases: Enum
Error action is an action that is taken when an error occurs during message processing.
Attributes:
Name | Type | Description |
---|---|---|
IGNORE |
Ignore the error and continue processing. |
|
EXIT |
Exit processing. |
depeche_db.ExitSubscriptionErrorHandler
depeche_db.LogAndIgnoreSubscriptionErrorHandler(subscription_name)
depeche_db.SubscriptionMessageHandler(handler_register, error_handler=None, call_middleware=None)
Bases: Generic[E]
Handles messages
Parameters:
Name | Type | Description | Default |
---|---|---|---|
handler_register |
MessageHandlerRegisterProtocol[E]
|
The handler register to use |
required |
error_handler |
Optional[SubscriptionErrorHandler]
|
A handler for errors raised by the handlers, defaults to handler that will exit the subscription |
None
|
call_middleware |
Optional[CallMiddleware]
|
The middleware to call before calling the handler |
None
|
depeche_db.SubscriptionRunner(subscription, message_handler, batch_size=None)
Bases: Generic[E]
Handles messages from a subscription using a handler
The batch_size
argument controls how many messages to handle in each
batch. If not provided, the default is 10. A larger batch size will
result less round trips to the database, but will also make it more
likely that messages from different partitions will be processed out of
the order defined by their global_position
on the message store.
A batch size of 1 will ensure that messages are processed in order
regarding to their global_position
.
Messages in the same partition will always be processed in order.
Implements: RunOnNotification
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subscription |
Subscription[E]
|
The subscription to handle |
required |
message_handler |
SubscriptionMessageHandler
|
The handler to use |
required |
batch_size |
Optional[int]
|
The number of messages to handle in each batch, defaults to 10 |
None
|
depeche_db.LockProvider
Bases: Protocol
Lock provider is a protocol that is used to lock and unlock resources.
lock(name: str) -> bool
Locks resource with a given name. Returns True
if the resource was locked.
This method must not block!
unlock(name: str)
Unlocks resource with a given name.
depeche_db.SubscriptionStateProvider
Bases: Protocol
Subscription state provider is a protocol that is used to store and read subscription state.
initialize(subscription_name: str)
Marks subscription state as initialized.
initialized(subscription_name: str) -> bool
Returns True
if the subscription state was already initialized.
read(subscription_name: str) -> SubscriptionState
session(**kwargs) -> SubscriptionStateProvider
Returns a session for the subscription state provider.
This can be used to run the state reads/updates in a transaction provided by the client.
See DbSubscriptionStateProvider for an example implementation.
store(subscription_name: str, partition: int, position: int)
Stores subscription state for a given partition.
depeche_db.SubscriptionStartPoint
Defines the start point of a subscription.
init_state(subscription_name, stream, state_provider)
Initializes subscription state (if not yet initialized).
depeche_db.StartAtNextMessage
depeche_db.StartAtPointInTime(point_in_time)
Bases: SubscriptionStartPoint
Starts consuming messages from a point in time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
point_in_time |
datetime
|
The point in time to start consuming messages from. The point in time must be timezone aware. |
required |