Blog
ArchiveTwitterFeed

How to Get Stronger Consistency Out of a Datastore

Welcome to our series of blog posts about things Sentry does that perhaps we shouldn’t do. Don’t get us wrong — we don’t regret our decisions. We’re sharing our notes in case you also choose the path less traveled. In this post, we stretch technologies to their limits to see real-time data while handling traffic spikes. (Some basic familiarity with Apache Kafka and consistency models in distributed systems will help you get the most out of this post.)

Sentry’s primary job is to ingest user errors. When our clients’ code breaks, Sentry receives traffic spikes. At the same time, error tracking is beneficial to users when it is near real-time.

There are two requirements here that may be competing with each other:

  • The event ingestion system has to be responsive, fast, and scalable under all types of load.
  • Error data must be accessible to Sentry’s users in near real-time.

To be able to absorb high and spiky traffic, Sentry receives an event from a client and performs a series of processing phases asynchronously. As a consequence, these do not necessarily terminate before the HTTP response is sent to the client.

Two of these phases include saving the event on Sentry main storage (ClickHouse) and a post-process task that sends notifications, calls plugins, etc.

How events go through sentry to Kafka to Snuba.

To save the event to ClickHouse, we insert the event itself into a Kafka topic. Several consumers read from this topic and efficiently write into ClickHouse by batching insert operations.

After the event is saved, we trigger the post-process task we talked about above, which needs to read up-to-date events (like historical data and the event we just wrote) from ClickHouse to work properly.

Wait. “After the event has been saved?” How can we be so sure the event was actually persisted into ClickHouse before we try to read it? Well, we can’t exactly be sure. In the end, the event ingestion task just put the event in a Kafka topic.

To guarantee the event has been already persisted before we try to read it, we need a storage system that provides a sequential consistency model (assuming the event ingestion task and the post-process happen in separate processes), or a read your write consistency model if the two operations happened in the same process. (More about consistency models.)

A storage system where writes happen asynchronously and reads are not synchronized with the write process doesn’t provide any of those guarantees. So, we needed to mitigate the issue with a different architecture.

We could break down the problem in two:

  • The event may not have reached ClickHouse at all when the post-process task tries to read. As a solution, the post-process task needs to wait for the event to travel through Kafka to ClickHouse before doing any read.
  • Even if we waited for the write to ClickHouse to happen before running the post-process task, we depend on ClickHouse consistency guarantees. ClickHouse is distributed and replicated and does not offer its own read after write consistency by default. We need to ensure we read from a replica that had already received the event we wanted to read.

Triggering post-processing after we write to ClickHouse

Solving this problem is equivalent to waiting for an event to be consumed and committed by one Kafka consumer from one topic before running the post-processing logic.

There are several possible solutions to this problem:

  • Using any distributed lock framework (explained here) to make the post-process task wait. While this would work, it would require us to add yet another distributed system framework in our architecture, thus increasing complexity.
  • Let Snuba trigger the post-process task after writing to ClickHouse. This solution would have a significant architecture implication in that Snuba would depend on Sentry (instead of just the other way around). Adding this additional dependency is not desirable.
  • Using the Kafka __consumer_offset topic to make the post-process task wait. This strategy is not that different from the solution we built. The main issues were practical. For example, the library we use to access Kafka does not provide an abstraction to decode messages on that topic.
  • Passing the full event body to the post-process task code that needs it instead of reloading data from the database. We actually do that. Unfortunately, this is not enough, as the post-process task needs to run aggregate queries that demand up-to-date storage.

SynchronizedConsumer

Instead of using these four solutions, we built a system that allows a Kafka consumer to pause itself and wait for another consumer (on a separate consumer group) to commit an offset before consuming that same message.

The synchronized consumer is a system that allows a Kafka consumer to pause itself and wait for another consumer (on a separate consumer group) to commit an offset before consuming that same message.

As seen in the image above, consuming the event from the event topic triggers the post-process task. We want the post-process task component to process the event only when the Snuba Event Consumer has stored the event into ClickHouse so that the post-process task can read the event.

To achieve this, we needed to provide Snuba a way to advertise when an event has been stored into ClickHouse. The advertisement is communicated through an additional Kafka topic (the commit log topic). The Snuba Event Consumer writes on the commit topic after committing an offset.

The snippet below is a payload example on the commit log topic. The key of the message identifies the event topic by providing topic, partition, and group. The payload itself is just the offset being committed.

key: events:0:snuba-consumers #topic:partition:group
payload: 70

When we start the Synchronized Consumer, we need to reload the state (last committed offset) for all partitions from the commit log topic. In order to make this process quick, the commit log topic is a compacted topic, and the initial offset is set to earliest (the earliest committed offset).

Since we commit Kafka messages only when they are stored in ClickHouse, this is the solution we need; it’s is performed by our Batching Kafka Consumer.

Now we have a topic in Kafka that tells us what the offset we consumed from every partition through the Snuba Consumer is. So we have the data we need to run post-processing of an event only when the consumer has processed its offset.

The Synchronized Consumer handles the coordination. This consumer reads both the event topic and the commit log topic (on two separate threads). It runs a state machine internally that keeps track of the highest offset from the commit log and consumes from the event topic only up to the watermark found on the commit log topic.

When the SynchronizedConsumer catches up to the latest offset in the commit log for a partition, it stops consuming events from that partition until more events are committed.

This solution works, but, as any solution to this problem, there is a compromise in availability. If the storage part of the flow is delayed, reads are put on hold, and so is the post-process task.

Ensuring ClickHouse replicated our event on the replica

ClickHouse is a distributed database with multi-master, eventually-consistent asynchronous replication. That sentence was a mouthful, so let’s break down the relevance to our current problem:

  • Distributed database → In this case, “distributed database” means we partition our tables on multiple nodes. When ClickHouse receives a batch of rows to write, by default, it partitions them and sends them to the right partition asynchronously. When the writer receives the response, the batch may not have been written on all partitions.
  • Replication is asynchronous and eventually-consistent → Reads can happen on any replica with the default load balancing schema. When we read, we can run into stale data because the replica we read from may not have received the most up to date writes.
  • Replication is multi-master → Writes can happen on any node and may not be applied in the same order on all replicas. Eventually, the merging process ensures that data is consistent on all replicas. ClickHouse also returns (by default) after writing on one node and one replica. So, it is not enough to lose the replica we wrote on to lose data.

Even if we waited for the Kafka commit before starting the post-processing (as described above), we could still read from a ClickHouse replica that that is not up to date; the solution above still isn’t enough.

Fortunately, ClickHouse is more flexible than that and allows us to have a stronger consistency guarantee per query without compromising the performance of the queries that do not need strong consistency.

The initial idea was to couple the insert_quorum and select_sequential_consistency settings, which would guarantee a given number of replicas have received the update before returning. Coupling the settings also guarantees we cannot query a replica that is not up to date.

The main issue was that select_sequential_consistency does not guarantee the load balancer picks an up-to-date replica. Instead, the query fails if the chosen replica is not up-to-date on any write in progress — this doesn’t work for us.

So, we split the problem in half to explore different solutions:

How do we ensure we read from an up-to-date replica?

When we need this guarantee (not all queries do), we use the in_order load balancing schema. This schema drives the load balancer to pick healthy replicas in the order they are defined in the configuration. So, as long as the first replica is up and running, it’s picked by the load balancer. We can do that both for reads and for writes, essentially reading and writing from the same replica as long as it is healthy. This replica is obviously up-to-date.

How do we ensure all partitions wrote the batch before reading?

ClickHouse provides us another convenient option when writing: insert_distributed_sync, which sets the partitioning to run synchronously (instead of asynchronously). The client receives the acknowledgment only after all partitions received the write, removing the risk that a reader would read from a partition that is not up-to-date.

Sequential consistency

These two solutions produce a consistency model that superficially resembles sequential consistency. By reading and writing through the same node, all writes appear to have taken place in a total order; once a client reads an event, it cannot read an older state.

Did we just break the CAP theorem? No.

Did we pick consistency and sacrifice availability? No, we are still leaning on the side of availability. This is the consistency model the system provides as long as all replicas are running (which means the system is not guaranteeing this consistency model). If we write on a replica and it dies before we read, there is no guarantee the load balancer goes to a replica that already received the write. We cannot say we formally guarantee sequential consistency or read your writes consistency, even though this is the result we achieve often enough.

Is this a performance issue? Sort of. Reading and writing on the same replica obviously binds the performance of the storage to that replica, but it has been good enough so far.

Not all reads need to be consistent, so it is up to the client to decide when to use in_order load balancing when querying. This solution allows a good enough consistency without compromising performance too much.


So far, we talked about having stronger consistency in most cases from a database that is eventually consistent. There is another area where we tried to do something ClickHouse is not designed to do: updating records after they’ve been written to the database. ClickHouse works best with immutable data, and, while Sentry is immutable under most circumstances, there are a few operations that do require the ability to update records.

How did we solve this? Our next post in this series shares how we’re making the most of mutability!

Your code is broken. Let's Fix it.
Start using Sentry