Affichage des articles dont le libellé est event sourcing example. Afficher tous les articles
Affichage des articles dont le libellé est event sourcing example. Afficher tous les articles

jeudi 19 février 2015

Experimenting with event sourcing 3

Third and last article on this topic. You can find the others here and there.

I left with a working implementation of the reading ability for the PersonRegistry. It was straight formard but not efficient. The idea to improve that is to make the EventTimeLine trigger some writting action in a more reading-friendly data structure, such as a dictionary. To do so, we can simply implement the Observer pattern: the registry will declare itself as a subscriber to the timeline to be notified at every event.

class PersonRegistry:
    """Provide services to deal with people"""

    def __init__(self, timeline):
        """Initialize timeline event"""
        self.timeline = timeline
        timeline.add_subscriber(self)
        self._current_id = 0
        # _registry will allow me to store 
        # the state of the system in order to
        # simplify querying
        self._registry = {}

    …

    def notify(self, data):
        """Get notified that something happened in the 
        system. It should be called by an event emmitter 
        only to update internal data representation."""

        # data is the event data, sent back by the EventTimeLine

        if 'personId' in data.keys():
            person_id = data['personId']
            if data['type'] == EventTimeLine.PERSON_CREATION:
                self._registry[person_id] = {
                    'name': data['name'],
                    'address': data['address'],
                    'status': data['status']
                }
            if data['type'] == EventTimeLine.PERSON_STATUS_CHANGE:
                self._registry[person_id]['status'] = data['newStatus']

The EventTimeLine is also allows subscription and notify its listeners on every event.

class EventTimeLine:

    PERSON_CREATION = 1
    PERSON_STATUS_CHANGE = 2

    def __init__(self):
        """Initialize the inner event list"""
        self.event_list = []
        # Yes subscribers are in a set, it'll prevent multiple notifications
        # if a subscriber is added several times. 
        self._subscribers = set()

def add_event(self, event_data):
    """Add an event in the event list."""
    event_data['_datetime'] = datetime.datetime.today()
    self.event_list.append(event_data)
    # Notify all the listeners
    self._notify_all(event_data)

def _notify_all(self, event_data):
    # trivial…
    for subs in self._subscribers:
        subs.notify(event_data)

And… it works! You can find the whole code in commit 5488ee5 on github. Yet, we can go a bit further and adopting CQRS simply by separating the PersonRegistry service into a service for commands (still PersonRegistry) and a service for query (PersonRegistryReader). The latter will be subscribe to the EventTimeLine and bear the get_person_by_id method. This is a big change since it is a modification of the API, but it is acceptable to me (I'm the only user BTW :p).

class PersonRegistryReader:
    """PersonRegistry aimed at read access"""

    def __init__(self):
        """Initialize the read registry"""
        self._registry = {}

    def notify(self, data):

        if 'personId' in data.keys():
            person_id = data['personId']
            if data['type'] == EventTimeLine.PERSON_CREATION:
                self._registry[person_id] = {
                    'name': data['name'],
                    'address': data['address'],
                    'status': data['status']
                }
            if data['type'] == EventTimeLine.PERSON_STATUS_CHANGE:
                self._registry[person_id]['status'] = data['newStatus']

    def get_person_by_id(self, demanded_id):
        """Retrieve a person from it's identifier in the system."""
        stored_person = self._registry[demanded_id]
        returned_person = Person(
            stored_person['status'],
            Address(
                stored_person['address']['street'],
                stored_person['address']['city']
            ),
            Name(
                stored_person['name']['firstname'],
                stored_person['name']['lastname']
                )
        )
        return returned_person

    def __hash__(self):
        return id(self)

Extracting was easy, it is just a cut and paste from PersonRegistry. Two remarks:

  • I keep the Person value object structure that I use for command but I really don't have to. I could also pass a dict directly or a JSON string. It is a strength of CQRS approach: even both domain models can be differentiated.
  • I did not mentionned it but I've been TDDing this example. I focused on keeping the tests green as long as possible. To do so, I keep the existing API alive while I'm building the new one. I branch when it's done then clean up while the tests stay green. I want to be able to ship at any time.

The creation and injection of all these object is done in the web app. In last commits, I added an event listener to produce logs from the timeline and versionning for person, just for fun and because it was easy.

To conclude, I had a great pleasure building this example and sense what event sourcing means. I wanted to show that it is actually a simple yet very powerful concept. I also wanted to propose a way to build apps incrementally, by making some compromises at first (see previous article) then continouusly improving without breaking. All was made within 30 minutes to 1 hour iterations. Finally, I wanted to show that programming or design concepts can be expressed using only language capabilities and data structures, without introducing fancy NoSQL system or AMQP Message queues hosted in the cloud and blah blah blah.

Of course, the example is really simple and there are ways to improvement: physically separate query and command models, externalize EventTimeLine, add more controls, etc. but I'll leave it that way. It's just tooling and making the architecture too complex for the example.

I hope you enjoyed reading these articles!

mardi 17 février 2015

Experimenting with event sourcing 2 - reading

Second article about playing with event sourcing. You can have a look at the first one here.

I left you with the need to read your model. As you will see my first implementation is straightforward as it consists in iterating through the EventTimeLine, filtering to what we need and mutating an object that is convenient to read. The retrieval method is in PersonRegistry service:

def get_person_by_id(self, demanded_id):
    returned_person = None
    person_events = (
        p for p in self.timeline 
        if p['personId'] == demanded_id
    )
    for event in person_events:
        if event['type'] == EventTimeLine.PERSON_CREATION:
            returned_person = Person(
                event['status'],
                Address(
                    event['address']['street'],
                    event['address']['city']
                ),
                Name(
                    event['name']['firstname'],
                    event['name']['lastname']
                    )
            )
        if event['type'] == EventTimeLine.PERSON_STATUS_CHANGE:
            returned_person.status = event['newStatus']
    return returned_person

It works there are obvious drawbacks.

  • I have to iterate through the whole EventTimeLine every time I need a person, which will take longer and longer as my timeline will grow (list → O(n) in read).
  • I may have to mutate my entity object a lot, for nothing.

EventTimeLine is greate for writing purpose but not so convenient for reading. The solution is to use the events to populate an intermediate data structure with better read access. To do so we can use the Observer pattern. This is for next article.

The repository is updated with model code, tests and a UI wrote using Bottle

samedi 14 février 2015

Experimenting with event sourcing

Some month ago I got interested in CQRS and event sourcing approach. Yesterday, I took time to experiment with event sourcing a bit with Python.

The goal of event sourcing approach is to induce changes into a system by saving all the events that appends in it rather than mutating its state. Here is some possiblities that it brings:

  • you do not have to know the wholes state of the entities of your system. All you need is their unique identifier to create events related to them,
  • all you have to care at first is the persistence of your event storem
  • you can theorically bring back the system to any state of its history, take a snapshot and represent it the way you want.
  • as you store functional events (create user, sell item, whatever), data migration is not needed. All you have to think of is the way you propaged the event through your whole information system.

So the idea is pretty cool. It seems disconcerting at first. Though, notice that that's the way work our well known relational data bases management systems. Every transaction details is at first written in redo logs, even if it is in progress and not committed. This way, the RDBMS is able to rerun a set of operation in case of a crash, reducing the risk of losing an important part of a transaction. This is event sourcing! And it is consider as the safest way to track all the transactions in the system.

Let's look at some code. The stuff I wrote in the train is about to create people entities, or in other word associate them with an identifier in the system and being able to change their situation, like marital status or tell they have moved. It could be useful for a revenue declaration app.

I begin with creating a service to deal with people. I call it PersonRegistry. With it, I can create a person in the system and change her marital status:

class PersonRegistry:
    """Provide services to deal with people"""

    # timeline parameter is my event store
    def __init__(self, timeline):
        """Initialize timeline event"""
        self.timeline = timeline
        self._currentId = 0

    def create(self, person):
        """Create a person in the system"""
        self._currentId += 1
        self.timeline.addEvent({
            'type': EventTimeLine.PERSON_CREATION,
            'personId': self._currentId,
            'status': person.status,
            'address': person.address.to_dict(),
            'name': person.name.to_dict()
        })
        return self._currentId

    def changeStatus(self, personId, newStatus):
        self.timeline.addEvent({
            'type': EventTimeLine.PERSON_STATUS_CHANGE,
            'personId': personId,
            'newStatus': newStatus
        })

I wanted the way to store events to be very strait forward. They're simple dicts, easily serializable and readable.

To act as a serious Domain Driven Design guy, I created some value objects as well:

class Person:

    SINGLE = 1
    MARRIED = 2

    def __init__(self, status, address, name):
        self.status = status
        self.address = address
        self.name = name


class Name:
    def __init__(self, firstname, lastname):
        """Blah Blah Blah"""
        self.firstname, self.lastname = firstname, lastname

    def to_dict(self):
        return {'firstname': self.firstname, 'lastname': self.lastname}


class Address:
    def __init__(self, street, city):
        self.street = street
        self.city = city

    def to_dict(self):
        return {'street': self.street, 'city': self.city}

The event store is called EventTimeLine. What it does is allow to addEvent in the system and retrieve them by iterating in its data.

class EventTimeLine:
    """Basically, the list of all events in the system. Allows to add an event.
    The object itself is iterable if you want to browse the created events."""

    PERSON_CREATION = 1
    PERSON_STATUS_CHANGE = 2

    def __init__(self):
        """Initialize the inner event list"""
        self.event_list = []

    def addEvent(self, eventData):
        """Add an event in the event list."""
        eventData['_datetime'] = datetime.datetime.today()
        self.event_list.append(eventData)

    def __iter__(self):
        for e in self.event_list:
            yield e

I can store my events. It is a good start for the first iteration. I can ship it to the users, so thay can populating their system with people.

Next step is to be able to get a snapshot of the system state. I'll be my monday iteration! For now, you can find the code on github.