Skip to content

MessageStore

depeche_db.MessageStore(name, engine, serializer)

Bases: Generic[E]

Message store.

Parameters:

Name Type Description Default
name str

A valid python identifier which is used as a prefix for the database objects that are created.

required
engine Engine

A SQLAlchemy engine.

required
serializer MessageSerializer

A serializer for the messages.

required

Attributes:

Name Type Description
aggregated_stream AggregatedStreamFactory

A factory for aggregated streams.

read(stream)

Read all messages from a stream.

Parameters:

Name Type Description Default
stream str

The name of the stream.

required

Returns:

Type Description
Iterator[StoredMessage[E]]

Iterator[StoredMessage]: An iterator over the messages.

reader(conn=None)

Get a reader for the store.

You can give a connection to use for the read as conn. If you don't give a connection, a new connection will be used (and discarded after the reader context has been left).

Example usage:

with store.reader() as reader:
    message = reader.get_message_id(some_id)

Parameters:

Name Type Description Default
conn Optional[SAConnection]

A database connection.

None

Yields:

Name Type Description
MessageStoreReader MessageStoreReader[E]

A reader for the store.

synchronize(stream, expected_version, messages, conn=None)

Synchronize a stream with a sequence of messages.

Given a stream and a sequence of messages, this method will write the messages to the stream that are not already in it.

Optimistic concurrency control must used to ensure that the stream is not modified by another process between reading the last message and writing the new message. You have to give expected_version. If the stream has been modified, a OptimisticConcurrencyError will be raised.

You can give a connection to use for the write as conn. If you don't give a connection, a new connection will be created and the write will be committed. You can use this to write messages and other data in a single transaction. Therefore, if you give a connection, you have to commit the transaction yourself.

Parameters:

Name Type Description Default
stream str

The name of the stream to which the message should be written.

required
expected_version int

The expected version of the stream.

required
messages Sequence[MessageProtocol]

The messages that should be in the stream after the synchronization.

required
conn Optional[SAConnection]

A database connection.

None

Returns:

Name Type Description
MessagePosition MessagePosition

The position of the last message in the stream.

write(stream, message, expected_version=None, conn=None)

Write a message to the store.

Optimistic concurrency control can used to ensure that the stream is not modified by another process between reading the last message and writing the new message. You have to give expected_version to use it. If the stream has been modified, a OptimisticConcurrencyError will be raised.

You can give a connection to use for the write as conn. If you don't give a connection, a new connection will be created and the write will be committed. You can use this to write messages and other data in a single transaction. Therefore, if you give a connection, you have to commit the transaction yourself.

Parameters:

Name Type Description Default
stream str

The name of the stream to which the message should be written.

required
message MessageProtocol

The message to write.

required
expected_version Optional[int]

The expected version of the stream.

None
conn Optional[SAConnection]

A database connection.

None

Returns:

Name Type Description
MessagePosition MessagePosition

The position of the last message in the stream.


depeche_db.MessageStoreReader(conn, storage, serializer)

Bases: Generic[E]

Message store reader.

get_message_by_id(message_id)

Returns a message by ID.

Parameters:

Name Type Description Default
message_id UUID

Message ID.

required

get_messages_by_ids(message_ids)

Returns multiple messages by IDs.

Parameters:

Name Type Description Default
message_ids Sequence[UUID]

Message IDs.

required

read(stream)

Returns all messages from a stream.

Parameters:

Name Type Description Default
stream str

Stream name

required

read_wildcard(stream_wildcard)

Returns all messages from streams that match the wildcard.

Use like syntax to match multiple streams:

  • stream-% - match all streams that start with stream-
  • % - match all streams
  • %-% - match all streams that contain -

Parameters:

Name Type Description Default
stream_wildcard str

Stream name wildcard

required

depeche_db.MessageSerializer

Bases: Protocol, Generic[M]

Message serializer is a protocol that is used to serialize and deserialize messages.

The following must be true for any serializer:

  • deserialize(serialize(message)) == message
  • type(deserialize(serialize(message))) is type(message)
  • serialize(deserialize(data)) == data

deserialize(message: dict) -> M

Deserializes message from a dictionary.

serialize(message: M) -> dict

Serializes message to a dictionary. The dictionary must be JSON serializable.