I recently played around a little bit with the Azure EventHub managed service which promises high throughput event processing at relatively low cost. At first it seems relatively easy to use in a distributed matter using the class EventProcessorHost and that is what all the online examples provided by Microsoft are using too.
My experience is that the EventProcessorHost is basically useless. Not only does it not contain any provision that I have found to provide a retry policy to make its API calls fault tolerant. It also is designed to only checkpoint its progress at relatively few intervals meaning that you have to design your application to work properly even if events are reprocessed (Which is what will happen after a catastrophic failure). Worse than that though once you fire up more than one processing node it simply falls all over itself constantly causing almost no processing to happen.
So if you want to use the EventHub managed service in any serious way you need to code directly to the EventHubClient interface which means that you have to figure out your own way of distributing its partitions over the available nodes.
This leads me to an interesting problem. How do your evenly balance the load of work evenly over a certain number of nodes (In the nomenclature below the work is split into one or more partitions) which can at any time have a catastrophic failure and stop processing without a central orchestrator.
Furthermore I want the behavior that if the load is completely evenly distributed between the nodes the pieces of the load should be sticky, meaning that the partitions of work currently allocated to a node should stay allocated to that node.
The algorithm I have come up with requires a Redis cache to handle the orchestration and it uses only 2 hash tables and two subscription for handling the orchestration. But any key value store that provides publish and subscribe functionality should do.
The algorithm have 5 time spans that are important.
- Normal lease time. I'm using 60 seconds for this. It is the normal time a partition will be leased without generally being challenged.
- Maximum lease time. Must be significantly longer than the normal lease time.
- Maximum shutdown time. The maximum time a processor has to shut down after it has lost a lease on a partition.
- Minimum lease grab time. Must be less than the normal lease time.
- Current leases held delay. Should be relatively short. A second should be plenty (I generally operate in the 100 to 500 millisecond range). This is multiplied by the number of currently processing partitions. It can't be too low though or you will run into scheduler based jitter of partitions jumping between partitions.
Each node also should listen to two Redis subscriptions (Basically notifications to all subscribers). Each will send out a notification that is the partition being affected.
- Grab lease subscription. Used to signal that the leas of a partition is being challenged.
- Allocated lease subscription. Used to signal that the lease of a partition has ended when somebody is waiting to start processing it.
There are also two hash keys in use to keep track of things. Each one contains the hash field of the partition and will contain the name of the host currently owning it.
- Lease allocation. Contains which nodes currently is actually processing which partition.
- Lease grab. Used to race and indicate which node won a challenge to take over processing of a partition.
This is the general algorithm.
- Once every time per normal lease time each node will send out a grab lease subscription notification per each partition that.
- It does not yet own and which does not currently have any value set for the partition in the lease grab hash key.
- If it has been more than the maximum lease time since the last time a lease grab was signaled for the partition (This is required for the case when a node dies somewhere after step 3 but before step 6 has completed). If this happens also clear the lease allocation and lease grab hash for the partition before raising the notification since it is an indication that a node has gone offline without cleaning up.
- If the node currently is already processing the partition it will wait the number of active partitions on the node currently held times the current leases held delay minus half of this delay (So basically (Locally active partitions - 1) * current leases held delay).
- If the node currently is not busy processing the partition that is being grabbed the node should wait the local active partitions plus one times the current leases held delay (On so fewer words (Locally active partitions + 0.5) * current leases held delay).
- Generally the node that has the lowest delay from step 2 should get this which also means that the active partitions on each node should distribute evenly among any active nodes since the more active partitions each individual node has the longer it will wait in step 2 and the less likely it is that they will win the race to own the partition lease.
- If a node is currently processing a partition but did not win the race in step 2 it should immediately signal its partition to gracefully shut down and once it is shut down it should remove the lease allocation hash field for the partition. Once this is done it should also publish the allocated lease subscription notification. After that is completed this node should skip the rest of the steps.
When I run this algorithm in my tests it works exactly as I want it. Once a new node comes online within the normal lease time the workload has been distributed evenly among the new and old nodes. Also an important test is that if you only have one partition the partition does not skip among the nodes, but squarely lands on one node and stays there. And finally if I kill a node without giving it any chance to do any cleanup after roughly maximum lease time the load is distributed out to the remaining nodes.
This algorithm does not in any way handle the case when the load on the different partitions is not uniform, in that case you could relatively easily tweak the formula in step 2 above and replace the locally active partitions with whatever measurement of load or performed work you wish. It will be tricky to keep the algorithm sticky though with these changes.