BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles How Netflix Ensures Highly-Reliable Online Stateful Systems

How Netflix Ensures Highly-Reliable Online Stateful Systems

Key Takeaways

  • Reliability means spending money to reduce the probability of failure, the blast radius, and recovery time to zero.
  • Building reliable services at scale has to be done across the clients, servers, and at the APIs.
  • Reliable servers are redundant, workload-optimized, and heavily cached. They offer quick data recovery and the ability to leverage multiple replicated copies across cloud availability zones.
  • Reliable clients make constant incremental progress and use signals from the server to learn how to retry or hedge requests to meet the service level objectives (SLOs).
  • Reliable APIs rely on the concepts of idempotency and fixed-size units of work.

This is a summary of my talk at QCon SF in October 2023. Over the years, I’ve worked across multiple different stateful systems and storage engines. In this article, I’d like to discuss making stateful systems reliable. But first, I want to define what "reliable" even means.

Reliability

If you ask people in the database industry, they might say that reliability means having a lot of nines - if you have a lot of nines, you’re highly reliable. In my experience, most database users don’t care too much about how many nines your system has. To show why, let’s consider three hypothetical stateful services.

Service A always fails a little bit; it never recovers. Service B occasionally fails cataclysmically. It recovers quickly but still experiences a near 100% outage during that period. Finally, service C rarely fails, but when it does fail, it fails for a long time.

The following graphic shows that each service has an equal number of nines. However, how you solve these three failure modes is drastically different! The first requires request hedging or retries, the second needs load shedding or backpressure, and the third needs faster detection and failover.

Instead of asking, how many nines do I have, ask the following:

  • How often do my systems fail?
  • When they fail, how large is the blast radius?
  • How long does it take us to recover from an outage?

Then, you’ll spend money on engineering time or computers to make these go to zero. Database users just want systems that don’t fail, have minimal impact on failure, and recover quickly - that’s what reliable means. Different failure modes in your stateful services require different solutions. Throughout the rest of this article, we’ll explore many techniques that we can use to try to make these properties go in the direction we want.

Netflix Online Stateful Scale

We use these techniques at Netflix to reach very high scales of online stateful services. We have near caches, which live on service hosts that handle billions of requests per second in sub-100-microsecond latency. We have remote caches based on Memcached that handle tens of millions of requests per second in 100-microsecond latency targets, storing petabytes of data. Finally, we have stateful databases with Apache Cassandra running in 4-region full-active mode, providing in-region read-your-write consistency with single-digit millisecond latency.

We replicate our user state to four Amazon regions in a full-active topology.

Building Reliable Stateful Services

Building reliable stateful services at scale combines three different techniques:

  • Building reliable stateful servers
  • Pairing them with reliable stateful clients
  • Designing the stateful API to use all those reliability techniques

Reliable Stateful Servers

Single Tenant

The most fundamental stateful server technique at Netflix is that our applications are single-tenant; they don’t share caches or data stores. The data stores are spun up and down on demand for each use case. Before, when we had multi-tenant data stores that served multiple customers, we had rare failures that had a significant blast impact. When we moved to this technique, we now had more frequent failures, but they had a more isolated blast radius. For Netflix’s business, this tradeoff made sense as large blast-radius failures damage our brand, while minor partial failures, which we can hide, do not.

Capacity Planned

To minimize the likelihood of failure, we want to ensure that each single-tenant data store is appropriately provisioned. We always start with mathematically rigorous capacity planning and an understanding of hardware. To do this, we have to fit generative distributions to observed data so we can understand the likelihood of tail events such as latency outliers. Below are two EC2 instances where we’ve measured how quickly their disks can respond to real workloads and fit summary beta distributions to describe them.

Note that there is a little latency blip on the second drive’s p99, which could mean that your stateful service might have a higher frequency of SLO-busting events due to less reliable hardware.

With an understanding of our software and the underlying hardware, we program workload capacity models, which consider several parameters about the workload to model the possible CPU, memory, network, and disk requirements. Given these workload details, the model then outputs the specifications for a cluster of computers that we call the least regretful choice. More information about how this capacity modeling works can be found here: AWS re:Invent talk from 2022.

These automated capacity models give us a central leverage point to shift our spending toward critical clusters (Tier 0) and those requiring expensive properties such as strong consistency while saving money on low-criticality clusters and cases that don’t require strong consistency and durability.

Highly-replicated

We replicate our clusters to 12 Amazon availability zones spread across four regions because we want to ensure that all of our microservices have local zone access to their data. Network communication is an opportunity for failure, so we try to keep as much within a zone as we can, and if we have to cross zones, we try to keep it in the region. But, sometimes, we do have to go across regions. This replication technique allows us to have highly reliable write and read operations because we can use quorums to accept writes in any region. By having three copies in every region, we can provide a very high level of reliability while maintaining strong consistency.

Spend on Stateful, Save on Stateless

Sometimes, we have to evacuate traffic from a degraded region. This illustrates the overall capacity usage of the running Netflix system. Most of our money is spent on our stateless services, typically provisioned to handle around one-fourth of global traffic.

When us-east-1 fails, we have to evacuate traffic from that region. At Netflix, our resiliency team has built an amazing capability to evacuate an Amazon region in single-digit minutes; however, this is a problem for the data stores because they have to be ready to absorb a 33% hit of new traffic instantaneously. That’s even a problem for the stateless services because they can’t autoscale that fast. You have to reserve headroom (buffer) for slow scaling services and pre-inject it for fast scaling services. With this architecture, you must reserve (⅓) / (¼) = 33% headroom for failover.

The alternative is to do a more traditional sharded approach for our databases, where we run two copies of state instead of four. For example, imagine we had two replication groups: one in "America" between us-west-2 and us-east-2, and one for "Europe" between us-east-1 and eu-west1. In this traditional approach, we would have to reserve a lot more headroom for traffic during failover: (½) / (¼) = 100% more.

Regions are impacted constantly, both for hardware and software reasons, so having this fast evacuation capability allows Netflix to recover extremely quickly during a failure. However, this is only cost-effective if you can spread the load between all other regions, which having a full-active data topology allows.

Stateful Agility

Sometimes, bad things happen even with all this capacity planning and sharding. When they do, you want to be able to deploy software and hardware quickly to mitigate the problem. We have carved mutation seams into our stateful images to allow us to react more rapidly without affecting the system's availability. Specifically, we’ve factored stateful instances into three components.

We have:

  1. The components in yellow, such as agents or sidecars, which we change daily but which do not affect the availability of the data store
  2. The stateful process and OS Kernel, which change monthly and require downtime for the primary process
  3. The state itself, which must move whenever hardware changes, and we aim to be able to move it quarterly

We couple the stateful process and the OS Kernel together because if we have to upgrade, for example, the Linux OS, we will have to bring down the data store process. Every time the primary process is down, we risk failing quorums, so we want to do this as fast as possible.

The state is the most problematic part of a stateful service and is coupled to the hardware you run on. You don’t want to touch it too often. If you have to move the state around, you will risk low reliability, so if we have to, we want to move the state as fast as possible. At Netflix, to minimize the likelihood of failure, we use snapshot restoration. This is where we suck down backups from S3 as fast as possible onto new hot instances. Then, we just share the deltas as we switch over. State movement has a major reliability impact because it takes time, and you run the risk of quorum loss while deltas transfer.

Monitor Your Component Performance

As hardware failure is so risky, when we launch new instances, we first ensure they can handle the load we will put on them. We review a list of checks, including that the network functions and the disks perform as expected, before starting the real workload - we call these pre-flight checks. Then, we continuously monitor errors and latency for both hardware and software and preemptively eject hardware that is starting to fail from the fleet before it becomes a source of failure.

You can do this with your software as well! For example, at Netflix, we use jvmquake on our stateful services written in Java because it detects GC death spirals early and prevents concurrent-mode-failure-related gray failures via a token bucket algorithm:

Continuous monitoring is vital for reliability because when you have 2000-plus clusters over a year, bad stuff happens to both your hardware and your software. If you are proactive about monitoring your components, you can detect failures quickly, remediate them, and recover before the failure propagates to the customer of your stateful service.

Deploy Smartly

Software needs to be deployed frequently, but with stateful systems, we must be careful how quickly we do maintenance. For example, let’s look at a simulation of system failure on an Apache Cassandra fleet. On the x-axis of this graph, we have how often we perform maintenance on the Cassandra fleet. On the y-axis, we have how many outages we expect to cause through our maintenance. Each curve represents a possible way of doing maintenance, either on-instance state replication, network drive re-attachments, or optimally imaging software in place without any state movement. As you can see, the more often you do maintenance, and the longer that maintenance takes, the bigger a risk you are running:

At Netflix, we’ve taken this journey starting at the red (copying data between instances for every software change) and then, over time, moving to the green, where we move zero bytes of state to upgrade software. This allows us to recover faster from software issues while not taking higher risks of failure. Ultimately, we must use that in-place imaging technique to get to something like weekly software changes.

Cache in Front of Services

At Netflix, we treat caches like materialized view engines, caching complex business logic that runs on data rather than caching the underlying data. Most operations to services hit that cached view rather than the service. Whenever the underlying data of that service changes, the service re-calculates the cache value and fills that cache with the new view.

A cache in front of the service protects the service, which is, at least for us, the component that fails first. The cache is cheap relative to the service; stateless apps running actual business logic are quite expensive to operate. This technique can help us improve reliability by decreasing the amount of load that the services and data stores have to handle. It does shift load to caches, but those are easier to make reliable.

The logical conclusion of this technique is to use total near caching, where we service all data requests out of local system copies of the needed data. Netflix’s video metadata system works this way:

In this architecture, all operations go against the local cache populated by eventually consistent pub-sub, and this is extremely reliable because there are no database calls in the hot path of service requests.

Caches can reduce reliability if you don’t mitigate cold cache problems, but the good news is that warming caches before traffic hits them is straightforward. Once you have good cache warming, Memcached can serve orders or more traffic than any Java service can.

Reliable Stateful Clients

We’ve seen several techniques for making reliable stateful servers, but these aren’t very useful if the stateful clients are doing the wrong things and causing failures.

Servers Signal to Clients

Historically, there was a big debate at Netflix about tuning timeouts, and for stateful services, we realized that you can’t really set one correct timeout. Instead, you need to know who the client is, what shard of what data store they’re talking to, and what data they’re trying to access. Then, from that context, the server can deduce reasonable service level objectives and signal them to clients.

We use Server signaling to convey context to clients, which improves reliability. For example, we use the same signaling to inform the client to compress and chunk data for maximum reliability. We’ve used LZ4 compression from our stateful clients to compress large payloads before they’re sent, which, in practice, can reduce bytes sent by between 2x and 3x. Compressing data improves reliability because every packet is an opportunity for Linux to trigger a 200ms re-transmit - a clear SLO buster; fewer bytes are more reliable than more bytes. Similarly, chunking large payloads into smaller ones is more reliable because the client can hedge and retry each small piece of work.

An example of dynamic signaling is with service level objectives, or how fast we think the server can perform different operations:

The KeyValue Service on the left and the TimeSeries Service on the right communicate with the client: "This is what I think your target service level objective should be, and this is what the maximum should be." You can think of the target latency as latency that we will use hedges and retries to try to hit, and max is a more traditional: "After this, just timeout and go away." At the same time timeouts are communicated, the server transmits the current concurrency limits. The server says, "Client, you’re allowed to hedge 50 concurrent requests against me. You’re allowed to send 1000 normal requests." Hedging refers to the technique where clients send the request multiple times, and whichever server answers first, the client takes their response.

Service Level Objectives per Namespace and Type

We can also tune these SLOs based on the particular namespace they’re trying to access, the client, and their observed average latency. Consider the following:

For example, suppose we have client A, and they’re making GET requests against namespace 1. If they observe a 1-millisecond latency, then to hit the SLO of 10 milliseconds, we will have to send a hedge around 9 milliseconds. The same client might be doing PUT requests averaging 1.5 milliseconds. If we want to try to hit 10 milliseconds SLO, we have to hedge slightly earlier. On the other hand, SCAN requests are really slow. They have an SLO of around 100 milliseconds. We don’t want to hedge until 77 milliseconds after that. Finally, client B might continuously be getting above the SLO latency, so hedging isn’t going to help you.

Concurrency Limit Hedging and GC-Tolerant Timeouts

We also have to be really careful when we’re hedging requests. We have to use concurrency limiting to prevent too much load going to our backend services.

In this case, client1 sends two requests to server1, which immediately goes into garbage collection pause. At that point, client1 waits for the SLO minus the average and then hedges against server2. It puts a hedge limiter of 1 in place, which doesn’t allow client1 to hedge against server2. Once we get the response from server2 to request A, the hedge limiter lifts, and a subsequent PUT from client1 to server1 can speculate.

All three of these requests would have hit that latency-busting 24-millisecond GC pause. Because of this hedging system, we could rescue two or three requests.

We can use similar techniques to make GC-tolerant timeouts. One might set a single asynchronous future at 500 milliseconds to implement a naive timeout. Then, at the end of 500 milliseconds, an exception is thrown. In our stateful clients, we use GC-tolerant timers, where we chain four async futures 125 milliseconds apart. This creates a virtual clock that’s resilient to that client1 pausing and doesn’t distract investigators, so you fix the actual cause.

Retries and Load Balancing

Sometimes, things break, and you’ll experience load shedding. When that happens, we allow limited retries against stateful services. We use a slightly modified capped exponential backoff algorithm. The only real modification worth pointing out here is that the first retry is relevant to the SLO target. Having those SLOs allows us to retry more intelligently against our backends.

It would have been better if we hadn’t sent any requests to that slow server in the first place. That’s where load balancing (or unbalancing) comes into play. The typical load balancing algorithms, like random or round robin, don’t work very well at avoiding slow servers. At Netflix, we use an improvement on choice-of-2 that we call weighted-choice-of-n.

With weighted-choice-of-n, we exploit prior knowledge about networks in the cloud. We know that because we have a replica of data in every zone, we will get a faster response if we send the request to the same zone. All we have to do is weight requests toward our local zone replica. We want this to degrade naturally, in case we only have two copies or if the client is in an overloaded zone. Instead of just having a strict routing rule, we take concurrency into account as well. Instead of picking the two with the least concurrency, we weigh the concurrency by the following factors: not being in the same availability zone, not having a replica, or being in an unhealthy state. This technique reduces latency by up to 40% and improves reliability by keeping traffic in the same zone!

A Real-World Example

Let’s combine all these techniques and look at a real-world KeyValueService issue. At time t1 at 7:02 a.m., an upstream client of this KeyValueService suffers a retry storm, and their traffic instantly doubles. Within 30 seconds, our server concurrency limiters shunt the load off overloaded servers, trying to protect the backend from the sudden load spike. Meanwhile, the client hedges and exponential retries mitigate the impact. This is such a small impact that it didn’t violate any of our SLOs.

The only problem that we have is the high latency impact. We went from 40% utilization to 80% utilization in 10 seconds, so latency degraded. Finally, our server autoscaling system restores our latency SLO by injecting capacity to remedy the latency impact. This whole process (from the client load doubling to our server limiters tripping, to our client hedges and exponential retries mitigating, to the server autoscaling and finally restoring the SLO) takes about five minutes with no human involvement.

Putting it all together: We run with enough buffer to prevent significant outages; when there is an impact, the systems automatically mitigate it, and finally, we recover quickly without human intervention.

Reliable Stateful APIs

The APIs also have to support all these techniques that I outlined. You can’t retry requests if they have side effects!

Idempotency Tokens

Assume you must retry; how do you make your mutable API safe? The answer is idempotency tokens. In concept, this is pretty simple. You generate a token and send it to your endpoint. Then, if something bad happens, you can just retry using the same idempotency token. You have to make sure that the backing storage engines implement this idempotency.

At Netflix, an idempotency token is generally a tuple of a timestamp and some form of token. We can use client monotonic timestamps, in this case, a microsecond timestamp with a little bit of randomness mixed into the last character just to prevent a little bit of duplication and a random nonce. If we have a more consistent store, we might need to make a global monotonic timestamp within the same region. Finally, the most consistent option is to take a global transaction ID.

On the x-axis here, we have the level of how strongly consistent we are, ranging from not at all consistent all the way up to global linearizability. On the y-axis, we have the reliability of that technique. We’ll notice that the regional and global isolated tokens have problems. It’s because they require communication across the network. At Netflix, we tried as much as possible to use client monotonic clock tokens for high reliability. If you can make it less consistent, make it less consistent.

KeyValue Abstraction

Let’s dive into some examples of real-world APIs using these concepts. We will start with the KeyValue Abstraction, which Netflix offers developers a stable HashMap as a service and exposes a pretty simple CRUD interface.

Let’s look at PutItems. We can immediately see the idempotency token pop right out, generated via one of those methods we discussed earlier. We also see that in our list of items, there’s a chunk number, and that facilitates the clients splitting up large payloads into multiple small ones, followed by a commit.

On the read side, we see the same technique of breaking down work. A GetItems response does not guarantee that you will receive X items, but it attempts to return a fixed size of data (work) at a time. This is because we want to be able to retry each component. We don’t use streaming with gRPC because of those resiliency techniques I talked about where you can retry individual pieces of work, hedge, speculate, and retry - they don’t work in a streaming API. Instead, we’ve shifted almost all of our stateful APIs to paginated APIs that return fixed amounts of data.

As most storage engines implement pagination based on the number of rows, and we want to implement pagination in terms of size, we have this mismatch where the server might make many round trips to the data store to accumulate a page. It might bust the SLO, but because we know what the SLO is, we can just stop paginating. We don’t guarantee to return a fixed amount of work, so instead, we just present progress to the client and then let the client request the next page if needed.

SCAN is just like GET, except that there’s no primary key. When you initiate a scan, the KeyValueService dynamically computes how many concurrent cursors it releases to that client - more if the system has a low load or less if many clients are scanning.

TimeSeries Abstraction

TimeSeries Abstraction stores all of our trace information and playback logs that tell us somebody hit play. This service handles tens of millions of events per second, fully durably with retention for multiple weeks or, in some cases, years.

The idempotency token in this service is built into the TimeSeries contract. If you provide an event with the same event time and unique event ID, then the backend storage deduplicates that operation. On the read side, we see that breaking down a potentially huge response into multiple pages that can be consumed progressively.

Because of the massive scale here, developers asked us for an additional level of reliability and were willing to trade off significant amounts of consistency - we offer three modes to clients:

The first mode, fire and forget, is loved by our users because it can handle 20 million writes per second with nearly 100% uptime. These users didn’t care if we dropped one service trace out of billions. The next two modes are incrementally more consistent and less reliable. For example, TimeSeries can let you know when the data is enqueued into our in-memory queue on the TimeSeries server. Alternatively, we can delay the acknowledgment until it is fully durable in storage. Different use cases needed different trade-offs.

Conclusion

Building reliable stateful services at scale isn’t a matter of building reliability into the servers, the clients, or the APIs in isolation. By combining smart and meaningful choices for each of these three components, we can build massively scalable, SLO-compliant stateful services at Netflix, namely by:

  • Building reliable stateful servers
  • Pairing them with reliable stateful clients
  • Designing the API to use all those reliability techniques

About the Author

Rate this Article

Adoption
Style

BT