My solutions to Gossip Glomers, a series of distributed systems challenges brought by Fly.io and Kyle Kingsbury (the amazing author of Jepsen).
My unique ID is the concatenation of the current nano timestamp and a cryptographically secure random integer (using math/rand
wasn't passing the test).
In this solution, I use something other than the suggested topology. As we don't have any constraints, every node broadcasts to all the nodes using Send
. It could be way more efficient, but it works. In #3d, the solution will be smarter.
Same code as #3b. It turns out that a dumb solution to broadcast to all the nodes also solves network partition issues 😅. Same as before, in #3d, the solution will contain some proper retry.
Things are starting to get challenging. We have some constraints in the number of messages exchanged (goodbye good old dumb solution that broadcasts to all the nodes) and the latency.
Regarding the latency, the maximum should be 600ms. As exchanging a message takes 100ms, we need to have a topology where the distance between any node never exceeds 5.
The topology that I used is a flat tree. One root node and the rest are children nodes; hence, a tree with only two levels:
In this topology, a child node broadcasts a message only to the root node and the root node to all the children. Hence, the maximum distance between any nodes is 2 (e.g., 24 -> 0 -> 3).
In the end, the solution achieves the following results:
- Messages-per-operation: 23.38 (it's the result returned by maelstrom, I guess the average value is below 24 because it only counts the messages-per-operation for the other operations, such as
read
🤷) - Median latency: 181ms
- Maximum latency: 207ms
I also switched from Send
to SyncRPC
to detect network partitions and implement a proper retry mechanism.
The main downside of this topology is that the broadcast load isn't evenly spread among the nodes. Indeed, the root node becomes a hot spot. In case of this node becomes inaccessible, it means that until it's fixed, none of the nodes will receive any broadcast message.
Now we need to make the messages-per-operation below 20 (below the number of nodes). I used a batch operation where each node now broadcasts such a message:
{
"type": "broadcast",
"messages": [1, 8, 72, 25]
}
So, instead of sending a single message value, we can now broadcast multiple messages at once. I do it via a goroutine scheduled every 500ms, which checks what has to be broadcasted and then sends the messages.
- Messages-per-operation: 1.92
- Median latency: 764ms
- Maximum latency: 1087ms
It's also interesting to play with the frequency. Increasing the value means decreasing messages-per-operation but increasing the latencies. No solution is perfect; everything is a question of tradeoffs and balance.
The solution also handles network partitions.
We need to work with a sequentially-consistent store. The main impact is that when doing a read, the only client that will have the guarantee to get the latest value is the one who performed the write.
In this solution, each node writes to its own bucket. Then, the solution relies on some form of coordination among the nodes before returning a read request:
- Iterate over all the nodes of the cluster:
- If the node is the current node, we perform a read in the store directly using
ReadInt
- Otherwise, it contacts the other node via a
SyncRPC
call to ask it to return the latest value (and this node will perform the read in the store usingReadInt
as well)
- If the node is the current node, we perform a read in the store directly using
- We return the sum of all the values
In the meantime, and even if it wasn't mandatory to pass all the tests (including the network partitions test), I introduced some forms of caching so if a node can't contact the store or another node, it will return the latest known value (availability > consistency). But again, it's just a question of tradeoff; if we remove the cache and return an error in case a node or the store is unreachable, we would favor consistency over availability.
Not much to say here. We store everything in memory with a map per key
. As the logs are ordered, we can find the proper offset when replying to a request
using a binary search.
In this first distributed implementation, I decided to use three bucket types (by bucket, I mean entries in the store):
- One to store the latest offset for a given key
- One to store the latest committed offset for a given key
- And the last bucket to store the messages
For the latter, I chose to store a single entry per message. The main advantage is that I don't have to rely on CompareAndSwap
to store messages. I only use it to store the latest offset. During the tests, it triggers a CAS retry about 40 times. Yet, the main downside is that the poll
requires n calls to the store: we start from the provided offset, and then iterate until we reach an KeyDoesNotExist
error.
This solution leads to the following results:
- Messages-per-operation: 12.03
- Availability: 0.9991992
- Throughput peak: ~300hz
Here, we are asked to improve the overall latency. The solution I took is to switch from one entry per message to one entry per key (hence, one entry contains multiple messages). To do that while passing the test, my solution routes the message for a given key to the same instance using a hashing mechanism. That allows me to get rid of expensive CAS fail-and-retry operations. All the writes are done using Write
using a shared mutex to protect concurrent writes that would result (if no mutex) in consistency issues.
Metrics-wise:
- Messages-per-operation: 6.88
- Availability: 0.99511856
- Throughput peak: ~350hz
So a small drop regarding availability, which can probably be explained by the fact that a send
request needs to be forwarded synchronously to another node if the hashing doesn't match the node ID. Yet, this version decreases the messages-per-operation and increases the throughput.
One remark, though. In Kafka, this routing to the same node isn't achieved at the topic level. Imagine that a 3-node Kafka cluster has only one topic; we don't want to have only one node being a hot spot. Hence, Kafka introduces the concept of a partition, basically a sub-split per topic. If I wanted to improve my solution, I should probably do the same.
Also, another downside is that now I have only one entry for all the messages that belong to a specific key. If we receive too many messages, at some point, it will become an issue. To tackle that, and if I recall correctly, Kafka introduces the concept of segments, basically splitting the partitions into chunks (and a rotation either based on time or on size). Again, if I wanted to improve my solution, I should probably do it as well.
This part is pretty straightforward; as it's a single node, we can store everything in memory.
Now, we have to implement a distributed store that ensures read-uncommitted consistency. From my notes (and a lot was coming from the Jepsen website), this consistency level prohibits dirty writes: a transaction that can overwrite a value that has previously been written by another transaction that is still in-flight and has not been committed yet. The main problem with dirty writes is that it can violate integrity constraints.
In my solution, I implemented a coordination mechanism between the nodes to synchronize their state upon a new request. As we have to handle network partitions, the solution relies on SyncRPC
and a custom retry mechanism.
Now, our store has to ensure the read-committed consistency. Again, from my notes, this model prevents dirty reads: transactions aren't allowed to observe writes from transactions that do not commit.
Unfortunately (and I mean it), my previous solution for #6b is passing. Unfortunately, because it means the end of the challenge 😔.
Thanks to Fly.io and Kyle Kingsbury, it was truly a fantastic experience.