In this post I want to talk a little about concurrency in the context of CQRS and event sourcing. Specifically, when a single aggregate is concurrently accessed by two commands (or command handlers, really).
Let’s assume there are two commands, DepositMoney and WithdrawMoney. Both commands are handled by the BankAccount aggregate.
Let’s also assume that those two commands are dispatched at exactly the same time. Without locking, the result of that operation is impossible to determine. Most likely, one of the command handlers will fail to apply an event.
Depending on the framework, language and threading model used, one of the commands will “win” and its handler will apply an event with sequence number x – an event which is then persisted. The other command handler will also try to apply an event, with the same sequence number, this will fail to persist. The result is a nasty exception.
Ideally, we’d like to detect such situations a little earlier in the process, or prevent them altogether.
Pessimistic locking will prevent any concurrent access to the aggregate. An aggregate can only be accessed by one thread, call or process at a time.
This locking strategy is the safest, but also the slowest. Each thread that wants to access an aggregate has to wait for the lock to be released by the previous thread.
Be aware of timeouts and deadlocks!
The optimistic locking strategy tries to detect concurrent access to an aggregate, and will throw an exception if it does so.
This is achieved by storing the version of the aggregate (describing the state of the aggregate when it was last loaded) with the command that’s being dispatched. In the example above, DepositMoney wins. When that command was dispatched, the aggregate was at version 1. After handling the command and applying the MoneyDeposited event, the aggregate is at version 2. Then WithdrawMoney is handled, but the version associated with the command does not match the current version of the aggregate, leading to a ConcurrencyException.
Locking is slightly more difficult when dealing with clustered or replicated services. Typically, such services are behind a load balancer, and each replica handles a fair share of the requests to the service.
When it comes to locks, normally they are maintained per (virtual) machine or thread pool. This means that, in a clustered setup, concurrent access to aggregates is still a possibility,
There are a few ways to deal with that:
- A distributed lock manager, implemented using something like ZooKeeper, ETCD or Redis. Potentially complicated & expensive.
- Make sure that all commands for a specific aggregate are handled within the same (virtual) machine or thread pool. This can be achieved by a consistent hashing algorithm (based on the aggregate id, for example), to route commands to the correct replica. Axon Framework has native support for a distributed command bus (using JGroups).
Concurrent access to or modification of aggregates doesn’t always pose a problem. Consider the aggregate Company, with the commands ChangeName and ChangeAddress and corresponding events NameChanged and AddressChanged. ChangeName and ChangeAddress deal with different (non-overlapping) parts of the aggregate state and can be safely merged if they are dispatched together. After all, the order in which NameChanged and AddressChanged are applied does not influence the final aggregate state.
However, two ChangeAddress commands that are simultaneously dispatched (because two users are submitting updates for a single company) do conflict. In this case you will want to display some sort of error message to the user that submitted the conflicting update, inviting them to try again.
I hope this post explains some of the things that you may encounter when dealing with concurrency & event sourcing. Let me know what you think!