jeudi 19 février 2015

Experimenting with event sourcing 3

Third and last article on this topic. You can find the others here and there.

I left with a working implementation of the reading ability for the PersonRegistry. It was straight formard but not efficient. The idea to improve that is to make the EventTimeLine trigger some writting action in a more reading-friendly data structure, such as a dictionary. To do so, we can simply implement the Observer pattern: the registry will declare itself as a subscriber to the timeline to be notified at every event.

class PersonRegistry:
    """Provide services to deal with people"""

    def __init__(self, timeline):
        """Initialize timeline event"""
        self.timeline = timeline
        timeline.add_subscriber(self)
        self._current_id = 0
        # _registry will allow me to store 
        # the state of the system in order to
        # simplify querying
        self._registry = {}

    …

    def notify(self, data):
        """Get notified that something happened in the 
        system. It should be called by an event emmitter 
        only to update internal data representation."""

        # data is the event data, sent back by the EventTimeLine

        if 'personId' in data.keys():
            person_id = data['personId']
            if data['type'] == EventTimeLine.PERSON_CREATION:
                self._registry[person_id] = {
                    'name': data['name'],
                    'address': data['address'],
                    'status': data['status']
                }
            if data['type'] == EventTimeLine.PERSON_STATUS_CHANGE:
                self._registry[person_id]['status'] = data['newStatus']

The EventTimeLine is also allows subscription and notify its listeners on every event.

class EventTimeLine:

    PERSON_CREATION = 1
    PERSON_STATUS_CHANGE = 2

    def __init__(self):
        """Initialize the inner event list"""
        self.event_list = []
        # Yes subscribers are in a set, it'll prevent multiple notifications
        # if a subscriber is added several times. 
        self._subscribers = set()

def add_event(self, event_data):
    """Add an event in the event list."""
    event_data['_datetime'] = datetime.datetime.today()
    self.event_list.append(event_data)
    # Notify all the listeners
    self._notify_all(event_data)

def _notify_all(self, event_data):
    # trivial…
    for subs in self._subscribers:
        subs.notify(event_data)

And… it works! You can find the whole code in commit 5488ee5 on github. Yet, we can go a bit further and adopting CQRS simply by separating the PersonRegistry service into a service for commands (still PersonRegistry) and a service for query (PersonRegistryReader). The latter will be subscribe to the EventTimeLine and bear the get_person_by_id method. This is a big change since it is a modification of the API, but it is acceptable to me (I'm the only user BTW :p).

class PersonRegistryReader:
    """PersonRegistry aimed at read access"""

    def __init__(self):
        """Initialize the read registry"""
        self._registry = {}

    def notify(self, data):

        if 'personId' in data.keys():
            person_id = data['personId']
            if data['type'] == EventTimeLine.PERSON_CREATION:
                self._registry[person_id] = {
                    'name': data['name'],
                    'address': data['address'],
                    'status': data['status']
                }
            if data['type'] == EventTimeLine.PERSON_STATUS_CHANGE:
                self._registry[person_id]['status'] = data['newStatus']

    def get_person_by_id(self, demanded_id):
        """Retrieve a person from it's identifier in the system."""
        stored_person = self._registry[demanded_id]
        returned_person = Person(
            stored_person['status'],
            Address(
                stored_person['address']['street'],
                stored_person['address']['city']
            ),
            Name(
                stored_person['name']['firstname'],
                stored_person['name']['lastname']
                )
        )
        return returned_person

    def __hash__(self):
        return id(self)

Extracting was easy, it is just a cut and paste from PersonRegistry. Two remarks:

  • I keep the Person value object structure that I use for command but I really don't have to. I could also pass a dict directly or a JSON string. It is a strength of CQRS approach: even both domain models can be differentiated.
  • I did not mentionned it but I've been TDDing this example. I focused on keeping the tests green as long as possible. To do so, I keep the existing API alive while I'm building the new one. I branch when it's done then clean up while the tests stay green. I want to be able to ship at any time.

The creation and injection of all these object is done in the web app. In last commits, I added an event listener to produce logs from the timeline and versionning for person, just for fun and because it was easy.

To conclude, I had a great pleasure building this example and sense what event sourcing means. I wanted to show that it is actually a simple yet very powerful concept. I also wanted to propose a way to build apps incrementally, by making some compromises at first (see previous article) then continouusly improving without breaking. All was made within 30 minutes to 1 hour iterations. Finally, I wanted to show that programming or design concepts can be expressed using only language capabilities and data structures, without introducing fancy NoSQL system or AMQP Message queues hosted in the cloud and blah blah blah.

Of course, the example is really simple and there are ways to improvement: physically separate query and command models, externalize EventTimeLine, add more controls, etc. but I'll leave it that way. It's just tooling and making the architecture too complex for the example.

I hope you enjoyed reading these articles!