Skip to content

Subscription

depeche_db.SubscriptionFactory(stream)

Bases: Generic[E]

This factory is accessible on the aggregated stream:

stream : AggregatedStream = ...
subscription = stream.subscription(
    name="subscription_name",
    handlers=...,
    call_middleware=...,
    error_handler=...,
    state_provider=...,
    lock_provider=...,
)

__call__(name, handlers=None, batch_size=None, call_middleware=None, error_handler=None, state_provider=None, lock_provider=None, start_point=None)

Create a subscription.

Parameters:

Name Type Description Default
name str

The name of the subscription, needs to be a valid python identifier

required
handlers Optional[MessageHandlerRegisterProtocol[E]]

Handlers to be called when a message is received, defaults to an empty register

None
batch_size Optional[int]

Number of messages to read at once, defaults to 10, read more here

None
call_middleware Optional[CallMiddleware]

A middleware to customize the call to the handlers

None
error_handler Optional[SubscriptionErrorHandler]

A handler for errors raised by the handlers, defaults to handler that will exit the subscription

None
state_provider Optional[SubscriptionStateProvider]

Provider for the subscription state, defaults to a PostgreSQL provider

None
lock_provider Optional[LockProvider]

Provider for the locks, defaults to a PostgreSQL provider

None
start_point Optional[SubscriptionStartPoint]

The start point for the subscription, defaults to beginning of the stream

None

depeche_db.Subscription(name, stream, message_handler, batch_size=None, state_provider=None, lock_provider=None, start_point=None)

Bases: Generic[E]

A subscription is a way to read messages from an aggregated stream.

Read more about the subscription in the concepts section.

Parameters:

Name Type Description Default
name str

Name of the subscription, needs to be a valid python identifier

required
stream AggregatedStream[E]

Stream to read from

required
message_handler SubscriptionMessageHandler[E]

Handler for the messages

required
batch_size Optional[int]

Number of messages to read at once, defaults to 10, read more here

None
state_provider Optional[SubscriptionStateProvider]

Provider for the subscription state, defaults to a PostgreSQL provider

None
lock_provider Optional[LockProvider]

Provider for the locks, defaults to a PostgreSQL provider

None
start_point Optional[SubscriptionStartPoint]

The start point for the subscription, defaults to beginning of the stream

None

depeche_db.CallMiddleware

Bases: Generic[E]

Call middleware is a protocol that is used to wrap a call to a handler.

Typical implementation:

class MyCallMiddleware(CallMiddleware):
    def __init__(self, some_dependency):
        self.some_dependency = some_dependency

    def call(self, handler, message):
        # or use a DI container here
        handler(message, some_dependency=self.some_dependency)

call(handler, message)

Calls a handler with a given message.

The type of the message depends on the type annotation of the handler function. See MessageHandlerRegister for more details.

Parameters:

Name Type Description Default
handler Callable

Handler

required
message Union[SubscriptionMessage[E], StoredMessage[E], E]

Message to be passed to the handler

required

depeche_db.SubscriptionErrorHandler

Bases: Generic[E]

Subscription error handler is a protocol that is used to handle errors that occur.

handle_error(error: Exception, message: SubscriptionMessage[E]) -> ErrorAction

Handles an error that occurred during message processing.

Parameters:

Name Type Description Default
error Exception

Error

required
message SubscriptionMessage[E]

Message that was being processed when the error occurred

required

Returns:

Type Description
ErrorAction

Action to be taken


depeche_db.ErrorAction

Bases: Enum

Error action is an action that is taken when an error occurs during message processing.

Attributes:

Name Type Description
IGNORE

Ignore the error and continue processing.

EXIT

Exit processing.


depeche_db.ExitSubscriptionErrorHandler

Bases: SubscriptionErrorHandler

Exit the subscription on error


depeche_db.LogAndIgnoreSubscriptionErrorHandler(subscription_name)

Bases: SubscriptionErrorHandler

Log the error and ignore the message


depeche_db.SubscriptionMessageHandler(handler_register, error_handler=None, call_middleware=None)

Bases: Generic[E]

Handles messages

Parameters:

Name Type Description Default
handler_register MessageHandlerRegisterProtocol[E]

The handler register to use

required
error_handler Optional[SubscriptionErrorHandler]

A handler for errors raised by the handlers, defaults to handler that will exit the subscription

None
call_middleware Optional[CallMiddleware]

The middleware to call before calling the handler

None

depeche_db.SubscriptionRunner(subscription, message_handler, batch_size=None)

Bases: Generic[E]

Handles messages from a subscription using a handler

The batch_size argument controls how many messages to handle in each batch. If not provided, the default is 10. A larger batch size will result less round trips to the database, but will also make it more likely that messages from different partitions will be processed out of the order defined by their global_position on the message store.

A batch size of 1 will ensure that messages are processed in order regarding to their global_position. Messages in the same partition will always be processed in order.

Implements: RunOnNotification

Parameters:

Name Type Description Default
subscription Subscription[E]

The subscription to handle

required
message_handler SubscriptionMessageHandler

The handler to use

required
batch_size Optional[int]

The number of messages to handle in each batch, defaults to 10

None

depeche_db.LockProvider

Bases: Protocol

Lock provider is a protocol that is used to lock and unlock resources.

lock(name: str) -> bool

Locks resource with a given name. Returns True if the resource was locked. This method must not block!

unlock(name: str)

Unlocks resource with a given name.


depeche_db.SubscriptionStateProvider

Bases: Protocol

Subscription state provider is a protocol that is used to store and read subscription state.

initialize(subscription_name: str)

Marks subscription state as initialized.

initialized(subscription_name: str) -> bool

Returns True if the subscription state was already initialized.

read(subscription_name: str) -> SubscriptionState

Reads subscription state.

Returns:

Type Description
SubscriptionState

Subscription state

session(**kwargs) -> SubscriptionStateProvider

Returns a session for the subscription state provider.

This can be used to run the state reads/updates in a transaction provided by the client.

See DbSubscriptionStateProvider for an example implementation.

store(subscription_name: str, partition: int, position: int)

Stores subscription state for a given partition.


depeche_db.SubscriptionStartPoint

Defines the start point of a subscription.

init_state(subscription_name, stream, state_provider)

Initializes subscription state (if not yet initialized).


depeche_db.StartAtNextMessage

Bases: SubscriptionStartPoint

Starts consuming messages from the next message in the stream.


depeche_db.StartAtPointInTime(point_in_time)

Bases: SubscriptionStartPoint

Starts consuming messages from a point in time.

Parameters:

Name Type Description Default
point_in_time datetime

The point in time to start consuming messages from. The point in time must be timezone aware.

required