Skip to content

Event Sourcing tools

These classes are just given for reference. They are pretty straight-forward and you probably will implement them yourself instead of using these.

depeche_db.event_sourcing.EventSourcedAggregateRoot

Bases: ABC, Generic[ID, E]

Source code in depeche_db/event_sourcing/aggregate_root.py
class EventSourcedAggregateRoot(_abc.ABC, Generic[ID, E]):
    def __init__(self):
        self._events: list[E] = []
        self._version = 0

    @property
    def events(self) -> list[E]:
        return list(self._events)

    @_abc.abstractmethod
    def get_id(self) -> ID:
        raise NotImplementedError

    def _add_event(self, event: E) -> None:
        self._version += 1
        self._events.append(event)

    def apply(self, event: E) -> None:
        self._apply(event)
        self._check_invariants()
        self._add_event(event)

    @_abc.abstractmethod
    def _apply(self, event: E) -> None:
        raise NotImplementedError

    def _check_invariants(self) -> None:
        pass

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, EventSourcedAggregateRoot):
            raise NotImplementedError()
        return self.get_id() == other.get_id() and self._version == other._version

depeche_db.event_sourcing.Repo

Bases: ABC, Generic[OBJ, ID]

A repository is a collection of objects that can be queried and persisted.

This is an abstract base class that defines the interface that all repositories implement.

add(entity) abstractmethod

Add a new entity to the repository.

get(id) abstractmethod

Get an entity from the repository by its ID.

save(entity, expected_version) abstractmethod

Save an existing entity to the repository.


depeche_db.event_sourcing.EventStoreRepo

Bases: Generic[E, OBJ, ID], Repo[OBJ, ID]

Source code in depeche_db/event_sourcing/repository.py
class EventStoreRepo(Generic[E, OBJ, ID], Repo[OBJ, ID]):
    def __init__(
        self,
        event_store: MessageStore[E],
        constructor: Callable[[], OBJ],
        stream_prefix: str,
    ):
        self._event_store = event_store
        self._constructor = constructor
        self._stream_prefix = stream_prefix

    def add(self, obj: OBJ) -> MessagePosition:
        return self.save(obj, expected_version=0)

    def save(self, obj: OBJ, expected_version: int) -> MessagePosition:
        return self._event_store.synchronize(
            stream=f"{self._stream_prefix}-{obj.get_id()}",
            messages=obj.events,
            expected_version=expected_version,
        )

    def get(self, id: ID) -> OBJ:
        with self._event_store.reader() as reader:
            return ReadRepository[E, OBJ, ID](
                event_store_reader=reader,
                constructor=self._constructor,
                stream_prefix=self._stream_prefix,
            ).get(id)