MessageStore
depeche_db.MessageStore(name, engine, serializer)
Bases: Generic[E]
Message store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
A valid python identifier which is used as a prefix for the database objects that are created. |
required |
engine |
Engine
|
A SQLAlchemy engine. |
required |
serializer |
MessageSerializer
|
A serializer for the messages. |
required |
Attributes:
Name | Type | Description |
---|---|---|
aggregated_stream |
AggregatedStreamFactory
|
A factory for aggregated streams. |
read(stream)
Read all messages from a stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
The name of the stream. |
required |
Returns:
Type | Description |
---|---|
Iterator[StoredMessage[E]]
|
Iterator[StoredMessage]: An iterator over the messages. |
reader(conn=None)
Get a reader for the store.
You can give a connection to use for the read as conn
. If you don't
give a connection, a new connection will be used (and discarded after
the reader context has been left).
Example usage:
with store.reader() as reader:
message = reader.get_message_id(some_id)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
conn |
Optional[SAConnection]
|
A database connection. |
None
|
Yields:
Name | Type | Description |
---|---|---|
MessageStoreReader |
MessageStoreReader[E]
|
A reader for the store. |
synchronize(stream, expected_version, messages, conn=None)
Synchronize a stream with a sequence of messages.
Given a stream and a sequence of messages, this method will write the messages to the stream that are not already in it.
Optimistic concurrency control must used to ensure that the stream is
not modified by another process between reading the last message and
writing the new message. You have to give expected_version
. If the
stream has been modified, a OptimisticConcurrencyError
will be raised.
You can give a connection to use for the write as conn
. If you don't
give a connection, a new connection will be created and the write will
be committed. You can use this to write messages and other data in a
single transaction. Therefore, if you give a connection, you have to
commit the transaction yourself.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
The name of the stream to which the message should be written. |
required |
expected_version |
int
|
The expected version of the stream. |
required |
messages |
Sequence[MessageProtocol]
|
The messages that should be in the stream after the synchronization. |
required |
conn |
Optional[SAConnection]
|
A database connection. |
None
|
Returns:
Name | Type | Description |
---|---|---|
MessagePosition |
MessagePosition
|
The position of the last message in the stream. |
write(stream, message, expected_version=None, conn=None)
Write a message to the store.
Optimistic concurrency control can used to ensure that the stream is
not modified by another process between reading the last message and
writing the new message. You have to give expected_version
to use it.
If the stream has been modified, a OptimisticConcurrencyError
will be
raised.
You can give a connection to use for the write as conn
. If you don't
give a connection, a new connection will be created and the write will
be committed. You can use this to write messages and other data in a
single transaction. Therefore, if you give a connection, you have to
commit the transaction yourself.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
The name of the stream to which the message should be written. |
required |
message |
MessageProtocol
|
The message to write. |
required |
expected_version |
Optional[int]
|
The expected version of the stream. |
None
|
conn |
Optional[SAConnection]
|
A database connection. |
None
|
Returns:
Name | Type | Description |
---|---|---|
MessagePosition |
MessagePosition
|
The position of the last message in the stream. |
depeche_db.MessageStoreReader(conn, storage, serializer)
Bases: Generic[E]
Message store reader.
get_message_by_id(message_id)
Returns a message by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_id |
UUID
|
Message ID. |
required |
get_messages_by_ids(message_ids)
Returns multiple messages by IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_ids |
Sequence[UUID]
|
Message IDs. |
required |
read(stream)
Returns all messages from a stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
str
|
Stream name |
required |
read_wildcard(stream_wildcard)
Returns all messages from streams that match the wildcard.
Use like syntax to match multiple streams:
stream-%
- match all streams that start withstream-
%
- match all streams%-%
- match all streams that contain-
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream_wildcard |
str
|
Stream name wildcard |
required |
depeche_db.MessageSerializer
Bases: Protocol
, Generic[M]
Message serializer is a protocol that is used to serialize and deserialize messages.
The following must be true for any serializer:
deserialize(serialize(message)) == message
type(deserialize(serialize(message))) is type(message)
serialize(deserialize(data)) == data
deserialize(message: dict) -> M
Deserializes message from a dictionary.
serialize(message: M) -> dict
Serializes message to a dictionary. The dictionary must be JSON serializable.