Concurrent commands in event sourcing

In this post I want to talk a little about concurrency in the context of CQRS and event sourcing. Specifically, when a single aggregate is concurrently accessed by two commands (or command handlers, really).

Concurrent commands

Let’s assume there are two commands, DepositMoney and WithdrawMoney. Both commands are handled by the BankAccount aggregate.

Concurrent commands
Let’s also assume that those two commands are dispatched at exactly the same time. Without locking, the result of that operation is impossible to determine. Most likely, one of the command handlers will fail to apply an event.

Depending on the framework, language and threading model used, one of the commands will “win” and its handler will apply an event with sequence number x – an event which is then persisted. The other command handler will also try to apply an event, with the same sequence number, this will fail to persist. The result is a nasty exception.

Ideally, we’d like to detect such situations a little earlier in the process, or prevent them altogether.

Pessimistic locking

Pessimistic locking will prevent any concurrent access to the aggregate. An aggregate can only be accessed by one thread, call or process at a time.

Pessimistic locking

This locking strategy is the safest, but also the slowest. Each thread that wants to access an aggregate has to wait for the lock to be released by the previous thread.

Be aware of timeouts and deadlocks!

Optimistic locking

The optimistic locking strategy tries to detect concurrent access to an aggregate, and will throw an exception if it does so.

Optimistic locking

This is achieved by storing the version of the aggregate (describing the state of the aggregate when it was last loaded) with the command that’s being dispatched. In the example above, DepositMoney wins. When that command was dispatched, the aggregate was at version 1. After handling the command and applying the MoneyDeposited event, the aggregate is at version 2. Then WithdrawMoney is handled, but the version associated with the command does not match the current version of the aggregate, leading to a ConcurrencyException.

Multiple machines

Locking is slightly more difficult when dealing with clustered or replicated services. Typically, such services are behind a load balancer, and each replica handles a fair share of the requests to the service.

When it comes to locks, normally they are maintained per (virtual) machine or thread pool. This means that, in a clustered setup, concurrent access to aggregates is still a possibility,

There are a few ways to deal with that:

  • A distributed lock manager, implemented using something like ZooKeeper, ETCD or Redis. Potentially complicated & expensive.
  • Make sure that all commands for a specific aggregate are handled within the same (virtual) machine or thread pool. This can be achieved by a consistent hashing algorithm (based on the aggregate id, for example), to route commands to the correct replica. Axon Framework has native support for a distributed command bus (using JGroups).

 

Multiple replicas

Conflict resolution

Concurrent access to or modification of aggregates doesn’t always pose a problem. Consider the aggregate Company, with the commands ChangeName and ChangeAddress and corresponding events NameChanged and AddressChanged. ChangeName and ChangeAddress deal with different (non-overlapping) parts of the aggregate state and can be safely merged if they are dispatched together. After all, the order in which NameChanged and AddressChanged are applied does not influence the final aggregate state.

However, two ChangeAddress commands that are simultaneously dispatched (because two users are submitting updates for a single company) do conflict. In this case you will want to display some sort of error message to the user that submitted the conflicting update, inviting them to try again.

In closing

I hope this post explains some of the things that you may encounter when dealing with concurrency & event sourcing. Let me know what you think!

Event versioning (or why I sometimes modify the event store)

Requirements and applications evolve and change, leading to refactoring. For example, user registration now requires a first and last name, what was once an error may no longer be considered one, etc. In an event sourced application, that poses a few problems. In this post I’ll discuss a few strategies and my views on event versioning.

On immutability and legacy events

Consider legacy events that, based on new requirements, are:

  • No longer relevant;
  • Misnamed;
  • Missing information;
  • Too coarse-grained (high level) or too fine-grained (low level).

Some people argue that event stores in general, and events in particular, are immutable. That modifications to existing events should not happen, and any corrections must be made by new events. After all, events are a record of history, and history does not change.

This has some implications when it comes to the mutations listed above. Regardless of new versions of events, to prevent errors and inconsistent state, old events and their handlers need to be maintained indefinitely. Potentially, this also holds for event listeners that depend on the old events.

Needless to say, depending on the rate of change of the application and its events, this can dramatically increase technical debt and complexity.

Archiving events

Consider a domain where aggregates have a limited “lifespan”. For example, a site with classifieds that are automatically removed after a certain period of time. Once these aggregates have become inactive (i.e., after applying a Deleted event), they will not process new commands or apply events.

Following that train of thought, all existing events for those aggregates can be safely moved to a secondary (archival) event store. This satisfies immutability (nothing is removed or modified), but it also ensures that the events do not take up memory, index space, etc., reducing the working set.

An alternative approach would be to move those events to inexpensive, slow (but replicated) storage. That way, the events will remain available, but with significantly increased latency.

Event upcasting

Upcasting satisfies the requirement for an immutable event store. An upcaster is called when reading (old) events from a stream or store, before these events are pushed to event listeners, projections, etc. The original events are not touched, but the upcaster can map our new understanding of the world on those events.

In general, I consider an upcaster to be some method f that takes an event of a revision x, and turns it into zero or more events of revision x + 1. In other words, Event_V2 = f(Event_V1). It’s not something that can be fully automated, instead we will have to write a little code.

Let’s assume that, during user registration, an event was applied containing the date and time of registration. After some iterations, it’s decided that the time component is no longer of interest, just the date component:

In this case, an event listener interested in UserRegistered events will only have to handle the last version of an event (UserRegistered_V2). They will never see UserRegistered_V1. This reduces the amount of code that has to be supported.

Of course, upcasters do have a performance impact, depending on the number and type and complexity of the work they do. For example, to construct a new version of an event, an upcaster might need to pull in data from other events or data sources.

Additionally, be aware that existing projections or subscribers are not automatically updated (only during replays).

Framework support

Here’s an overview of how upcasting can be implemented in some of the popular event sourcing frameworks:

  • Axon Framework: has native support for Event Upcasters, which can also (de-)multiplex events (changing the granularity, i.e. splitting or merging). No longer relevant events can be converted to the empty list.
  • Akka Persistence: by writing a custom Event Adapter, upcasting can be implemented in the fromJournal method; (de-)multiplexing is possible.
  • Broadway: no (native) support for upcasting, there is however a pull request to add that to the library.
  • Prooph: upcasting can be achieved by writing a custom MessageFactory. (De-)multiplexing does not seem possible.
  • Event Store (not really an event sourcing framework, but rather a journal): other than copying all events from a stream to a new stream, updating each event as it is copied, there seems to be no built-in support for event versioning or upcasting. Potentially, you could do some transformation when reading events (in a query). It seems that code would have to be repeated for every query that deals with those events.

Let me know if there are frameworks missing from this list!

Rewriting events

This is where it might become a little controversial 😉

I’m a practical person. My personal view is that there are cases where directly modifying or deleting events is a perfectly valid and acceptable scenario. Such modifications can be done by directly accessing the database (for trivial cases), or programmatically:

  1. Load the events that need to be refactored.
  2. De-serialize event payload.
  3. Modify event payload.
  4. Serialize event payload.
  5. Write modified event back to event store.

As with all powerful tools, there are some inherent issues and dangers:

  • Running code that depends on the old structure of the events that are modified. This can be mitigated by performing the refactoring on application startup (but that means downtime), or saving the events as a new collection (and then switching to that collection at some point).
  • Removing or renaming the wrong events.
  • Breaking serialization.
  • Like with upcasting, existing projections or read models are not automatically updated.

So use with caution, but do consider it a part of your toolkit.

I’m keen to hear your thoughts!

Distributed command handling with Axon, JGroups and Docker

In a recent project I used Axon Framework together with JGroups, to create a clustered, or distributed command bus.

In that project we had some concurrency issues. One of those issues was that two events were applied on a single aggregate with an identical sequence number. The JGroupsConnector in Axon uses a consistent hashing algorithm to route commands. This ensures that commands with the same routing key will be sent to the same member, regardless of the sending member of that message. Within the project we used the aggregate identifier as the routing key, ensuring that commands for a single aggregate are processed in a single JVM, thus preventing duplicate sequence numbers.

To demonstrate such a setup, I’ve created a simple demo application, based on the latest version of Axon Framework (3.0-M3). Using Docker Compose, the application is launched twice (in two containers). The containers should then form a JGroups cluster, and handle a number of commands. Go check it out on GitHub!

The hierarchy of Continuous Deployment

In this post I’m presenting a twist on Maslow’s hierarchy of needs, the hierarchy of Continuous Deployment. This version is based on the steps that are required to successfully implement Continuous Deployment (a step up from Continuous Delivery).

Maslow’s hierarchy of needs (also known as Maslow’s Pyramid) is a well-known theory in psychology, describing human needs in a particular ranking, with the most basic of needs at the bottom. It has since been updated slightly to deal with 21st century needs.

Maslow's hierarchy of needs

The hierarchy of Continuous Deployment

The needs are ranked in order of increasing scope, difficulty and abstraction.

Code

Trust. It all starts with trust. A development team must trust its code, and trust it well enough to continuously improve and adapt it. TDD, refactoring, continuous integration, a solid dose of code coverage (did I hear anyone say 100%?) and tons of quality metrics lead to a healthy, mature, predictable, and above all, trustworthy code base. Also, pair programming.

Functionality

Story done, feature delivered, high fives all around. Was it the right feature? Wait, what did we ship? Involve the three amigos and come up with scenarios, then test your implementation against those scenarios. BDD, DDD, UBL.

Architecture

A stable, fast build pipeline (preferably, as code) producing consistent artifacts. Respect the build. Verify that service A and service B are still on a first-name basis and talk the same language. Check that the code does not kill the database using some obscure SQL dialect which is no longer supported. Throw 10x the load at the system. Then 100x.

Infrastructure

Time spent firefighting a flaky system is time not spent delivering value. Ensure easily replaced, stable servers with plenty of horsepower. Load balancers left and right. Self-healing, autoscaling clouds for the win.

Continuous Deployment

Automate all the things. Push to master. No human intervention, no manual gates, no downtime during deployments. Fast iterations, happy business. More high fives. Or beer. Or high fives with beer.

Hierarchy of Continuous Deployment

Agree? Disagree? Let me know what you think!

In future blog posts I’ll explain some of the things required to climb all the way to the top of this hierarchy.

Phing development update

A few years ago I pledged to publish regular updates on Phing’s development. I’m a little ashamed that I haven’t done that for nearly three years. Needless to say, a lot has happened, so it’s about time to post an update!

Recent releases

The latest version, 2.14.0, was released in March. Ever since the move of the code to GitHub, a steady flow of pull requests and contributions has resulted in a lot of new functionality, improved documentation and numerous squashed bugs.

Here’s a selection of interesting features that have recently been added:

  • Support for PHP 7 & HHVM.
  • A flurry of new tasks: stopwatch, switch, throw, retry and many more.
  • Better support for composer and numerous dependencies (such as PHPUnit).
  • Additional selectors and conditions.
  • Remember the old HTML user guide? That has been replaced by (significantly improved!) Docbook5 documentation.
  • Additional tests & test coverage.

Of course there’s much more and all the details can be found in the changelog.

Coming up

The next minor version, 2.15.0, should be released in a few weeks, the tickets assigned to that version can be found here.

Beyond Phing itself, there are a few planned changes to the website and the build process:

  • Migrating the ticket tracker (Trac) to GitHub issues.
  • Shipping a “full” .phar file with each release (containing recent versions of the most popular dependencies).
  • Testing the generated .phar file in the travis-ci builds.
  • Adding a documentation search.
  • Refreshing the website.

Phing 3.0

A serious refactor of Phing has been on my own wishlist for quite some time. Some work towards that goal has been committed to the 3.0 branch and I try to keep this branch more or less synchronized with master (albeit with some delay). The branch is entirely functional and requires PHP 5.6+ or above.

Some of the work that’s still in progress or on the roadmap: integrating SPL, changing the way booleans are handled, a proper task autoloader, cleaning up the code, improving the test harness, etc. If you have suggestions, then please let me know! Time available to spend on a pull request is even better 🙂