BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Sleeping at Scale - Delivering 10k Timers per Second per Node with Rust, Tokio, Kafka, and Scylla

Sleeping at Scale - Delivering 10k Timers per Second per Node with Rust, Tokio, Kafka, and Scylla

Bookmarks
48:39

Summary

Lily Mara and Hunter Laine walk through the design of a system, its performance characteristics, and how they scaled it.

Bio

Lily Mara is Engineering Manager @OneSignal, Author of "Refactoring to Rust". Hunter Laine is Software Engineer @OneSignal.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Mara: My name is Lily Mara. I'm an engineering manager at a software company here in the Bay Area called OneSignal. We are a customer messaging company. We help millions of application developers and marketers connect with billions of users. We send around 13 billion push notifications, emails, in-app messages, and SMS every single day. I started using Rust professionally in 2016. I started using it as my main daily programming language professionally in 2019, when I started at OneSignal. I'm the author of the book "Refactoring to Rust" that is available now in early access at manning.com.

Outline

What are we going to be talking about? We're going to be talking about how we built this scalable, high throughput timer system at OneSignal. We're going to be talking about the motivation behind building it, in the first place. The architecture of the system itself. How the system performed. How we scaled it up, and some future work that we are maybe thinking about doing in the system.

Building a Scalable, High Throughput Timer System (OneSignal)

Let's jump back in time a little bit. It's 2019, I've just started at OneSignal. I've just moved to sunny California from horrible, not sunny Ohio. Everybody in OneSignal, everybody in the customer messaging sphere is talking about the concept of journey builders. If you're not a content marketer, you might not know what a journey builder is, so just a quick crash course. This is a no-code system that allows marketers to build out customer messaging flows. This is a flowchart that's going to be applied at a per user level. This is very similar to what a marketer might put together themselves. Over here on the left, every user is going to start over here. There's, immediately going to be dumped into a decision node. We allow our customers to store arbitrary key-value pairs at the user level. In this case, a customer might be storing what's your favorite color as one of those key-value pairs. We'll make a decision based on that. Let's say for the sake of argument that this user's favorite color was blue, we're going to send that user a push notification that says, "There's a 20% off sale on blue shirts, we know how much you love blue. Aren't you so excited about the blue shirt sale?" Then the next thing that's going to happen is we are going to wait 24 hours. We're going to do nothing for a day. This was the missing piece. We thought we understood how to build the event system, we understood how to build the node walking tree, but we didn't have a primitive in place in our network for scheduling, for storing a bunch of timers and expiring them performantly. This is what we're going to be building today. After we do that 24 hour wait, we are going to have another decision node. We're going to say, did that particular user click on that particular push notification we sent them? If they did, we will say, "Mission accomplished. Good job, journey. You did the engagement." If they didn't, we'll send them an SMS that has more or less the same message. You might want to do this because sending an SMS has a monetary cost associated with it. Carriers will bill you for sending SMS. Twilio will bill you for sending SMS. Push notifications have basically zero marginal cost. If you want to use SMS, you might want to use it as a second-order messaging system to get in contact with your customers. That's a journey builder. That's what we're going to try to enable the construction of today.

What are the requirements on this timer system that we're going to build? We want to be able to store billions of concurrent timers because we want to be able to support the billions of user records that we already have. We want to be able to expire those timers performantly. We want to minimize data loss because, of course, we don't want to be dropping timers on the floor. We don't want people to get stuck in particular nodes of their journeys. We want to integrate well with the rest of the data systems that we have at OneSignal. We're a relatively small company. We don't have the resources to have a team for every particular data system that's out there. We don't want to adopt 50 completely different open source data systems. Let's get in the headspace a little bit. We realized very crucially that if we wanted to build a timer, we had to think like a timer. We took the project and we set it down for a year, and we didn't think about it that hard. We prioritized other initiatives.

Jumping forward once again, we're in the beginning of 2021. We have the resources to start investigating this project. We have to make a decision, are we going to build something completely from scratch ourselves, or are we going to buy an off-the-shelf system? Is there an open source timer, an open source scheduling system that we can just use? The first place we looked were generic open source queuing systems like Sidekiq and RabbitMQ. We already operate Sidekiq very heavily at OneSignal. It's the core of our delivery API, and a lot of our lower throughput scheduling jobs. These general-purpose queuing systems, they had a lot of features that we didn't really need, we didn't want to pay to have to operate. They were lacking in the area that was the most important to us, and that was performance. We didn't think that these systems were going to scale up to the throughput that we were expecting this timer system to have. The published performance numbers for things like RabbitMQ just seemed orders of magnitude off from what we wanted out of this system. We knew from experience what Sidekiq was like to scale, and we didn't think that was going to be good enough. Based on the published numbers, we thought that other things weren't going to be good enough. We opted to build something for ourselves, we're going all the way.

Existing Systems (OneSignal)

Once again, let's look at these requirements. We want to be able to store tons of timers, expire them performantly, minimize data loss, and interoperate with the rest of our system. Let's talk about the rest of the systems. What is the prior art for system design at OneSignal? We have a lot of things written in Rust. We actually currently, but not at the time, had a policy around only using Rust for new systems. At the time, we were also spinning up new services in Go. We use Apache Kafka very heavily for our async messaging applications. We also use gRPC for all of our internal synchronous RPCs. Also, as a part of the larger journeys' initiative, we were going to be using Scylla, which is a C++ rewrite of Apache Cassandra.

Ins & Outs

Let's talk about the big blocks, how the data are flowing in and out of the system. Probably, sensibly, the input to the system is a timer. A timer is an expiry time, a time to end that, and a thing to do once it has ended an action. What are the actions? Initially, we came up with a pretty short list that is sending a message like a notification, email, SMS, in-app message, or it's adding a tag to a particular user. We realized that we might come up with more actions in the future so we didn't necessarily want to constrain ourselves to a fixed list. We also realized we might come up with actions that had absolutely nothing to do with journeys, absolutely nothing to do with messaging. That this timer system, this scheduler system has a broad range of applicability, and we wanted to leave the door open for us to take advantage of that. We imposed a new requirement on ourselves, we said we wanted this system to be generic. Given that this system is going to be generic, what is an action? What does it look like? How does it work? In addition to being generic, we wanted it to be simple. That meant constraining what an action is, even though it's flexible.

We didn't want to give people a whole templating engine or scripting library, or something like that. We wanted it to be pretty straightforward, you give us these bits, and we will ship them off when the time comes. Are we going to let people send HTTP requests with JSON payloads? All of our internal services use gRPC, so probably not that. Maybe they'll be gRPC requests then. Both of these systems suffer from the same, in our mind, critical flaw, in that these are both synchronous systems. We thought it was really important that the outputs of the timer system be themselves asynchronous. Why is that? As timers start to expire, if there's multiple systems that are being interfaced with, say notification delivery and adding key-value pairs. If one of those systems is down, or is timing out requests, we don't want, A, to have to design our own queuing independence layer in the timer system, or, B, have our request queues get filled up with requests for that one failing system, to the detriment of the other well behaving systems. We wanted the output of this to be asynchronous. We opted to use Apache Kafka, which as I mentioned, is already used very heavily at OneSignal. We already have a lot of in-house knowledge and expertise on how to operate and scale Kafka workloads. It gave us a general-purpose queuing system that was high performance. A really key benefit is it meant that the timer system was isolated from the performance of the end action system. What about these inputs? What about the timers themselves? Because all the timers are being written into the same place by us, we can own the latency of those writes, so this can be a synchronous gRPC call.

Interface

The interface, broadly speaking, looks like this. External customers make a gRPC request to the timer system. They're going to give us an expiry time, and they're going to give us an action, which is a Kafka topic, Kafka partition, and some bytes to write onto the Kafka stream. Then later on, when that timer expires, we're going to write that Kafka message to the place that the customer has specified. We'll delve into the magic box of the timer system as we go through this. Hopefully, later on, presumably at some point, the team that enqueue the timer will have a Kafka Consumer that picks up the message and acts on it. The really important thing here is the consumer picking up the message and acting on it is totally isolated from the timer system, dequeuing the message and shipping it across the wire. If a Kafka Consumer is down or not performant, that really has nothing to do with the timer system. It's not impacting us at all.

Internals

Let's delve into the internals of this a little bit. How are we going to store timers after they come across the gRPC interface? How are we going to expire timers to Kafka after they've been stored? What are the key health metrics of the system? Let's get into it. I'd first like to talk about the timer expiry process. We know we're going to be writing timers to Kafka. We know there's a gRPC service on the other side that has some storage medium attached to it for these timers. We want to try to build something that's relatively simple, relatively straightforward for pulling these things out of storage and expiring them to Kafka. How are we going to do this? We came up with this architecture as our first pass. We're still abstracting the storage away. We have our gRPC service, we've added a couple new endpoints to it. There's a get timers endpoint that takes in a timestamp, and it's going to serve us back all the timers that expire before this timestamp. Every minute, this new scheduler box over here, that's a scheduling service written in Rust, it's going to send a gRPC request up to our service. It's going to say, give me all the timers that expire before 10 minutes into the future. If it's currently 4:48, we're going to say, give me all the timers that expire before 4:58. It's going to take all those timers, write them into a pending area in memory, and have some in-memory timers. Then it's going to expire those out to Apache Kafka as those timers expire. Once they are expired and shipped off to Kafka, we are going to send a delete request up to the gRPC service to remove that particular timer from storage so that it's not served back down to the scheduler.

We viewed this as a relatively simple solution to the problem, because we thought that this timer system was going to be pretty tricky to implement in the first place. We were a bit incredulous when we came up with this solution. We weren't sure how we were going to represent the timers in memory. We weren't sure how we're going to avoid doubling-enqueuing. Once we just coded up a first pass, we realized that it was actually so much simpler and so much more performant than we thought it was going to be. We basically created an arena for these pending timer IDs in memory. We had an infinite loop with a 1-second period. We pulled all the timers from the gRPC interface, looped over them, and checked to see if they were known or not. If they were not known to the instance, we would spawn an asynchronous task using tokio. Use their built-in timer mechanism. When that timer expires, we would produce the Kafka events, delete the timer from the gRPC interface. Then there was a bit of additional synchronization code that was required to communicate this back to the main task so that we could remove that particular timer ID from the hash set. That was the only complicated piece here. The real service implementation is really not a whole lot more complicated than this. We were pretty impressed that it was quite so simple.

How does this thing actually perform? What do the numbers look like? Using the built-in default tokio async future spawning, and sleep_until functions, we tried to spawn a million timers, and measure both the latency and the memory utilization. We found that it took about 350 milliseconds to spawn a million timers. That ate about 600 megabytes of memory, which is a relatively high amount of memory for a million timers. It's about 96 bytes per timer, which seems a bit heavy. We decided that this was good enough performance metrics to go out with. We were not going to invest too heavily at this point in ultra-optimizing from here. What key performance metrics did we identify once we were ready to ship this thing? The first one was the number of pending timers in that hash set. The number of things that we are watching right now. This was really important for us when we started getting out of memory kills on this timer, because we had not put any backpressure into the system, so if there were too many timers expiring at say 4:00, 4:00 rolls around, you try to load those all into the scheduler. Scheduler falls down. Scheduler starts back up. It tries to load the timers again, and it keeps falling over. We use this metric to identify what's falling over at a million, let's tell it to not load any more than 600,000 into memory. The other one was a little bit less intuitive. It was the timestamp of the last timer that we expired to Kafka. We use this to measure the drift between the timer system and reality. If it's currently 4:00, and you just expired a timer for 3:00, that means your system is probably operating about an hour behind reality. You're going to have some customers who are maybe asking questions about why their messages are an hour late. This was the most important performance metric for the system. This is the one that we have alerting around. If things start to fall too far behind reality, we'll page an engineer and have them look into that.

The Storage Layer

Next, I'd like to talk about the storage layer for the system. Thinking about the requirements of this, we wanted to support a high write throughput. The goal that we had in mind was about 10,000 writes per second. From our experience operating Postgres, we knew that this was definitely possible to do with Postgres, but it was a big pain in the butt if you didn't have a lot of dedicated Postgres staff, and a lot of infrastructure dedicated to clustering in Postgres. We didn't really want to use Postgres for this. We wanted something that was going to be simple to scale, so that when we needed additional capacity, we could just throw it at the wall and have it stick. We wanted something that would be simple to maintain, so zero downtime upgrades were going to be really important to us. We knew that we were going to be making relatively simple queries that might serve back a bunch of results. What is a simple query? What is the kind of queries that we're going to be making? The scheduler is going to be doing queries that look like, give me all of the timers that are expiring in the next 10 minutes. That is not a very complicated query. It's not, give me all the timers that are expiring in the next 10 minutes for this particular customer, that are targeting these 10,000 users in these time zones, relatively straightforward queries. We didn't necessarily need all the querying, filtering power of something like Postgres or another relational database.

In the end, we picked Scylla. This was already something that we were spinning up as a part of the larger journeys project. Even though we didn't have existing in-house experience operating Scylla, we knew that we were going to be developing it as another business line item. One thing that we had to think about with adopting Scylla was that the data modeling approach for Scylla and Cassandra are very different from something like Postgres. We need to think pretty hard about how we're going to be writing these data to the tables and how we're going to be querying it afterwards. When you're doing data modeling in a relational database, it's a lot easier to just think about, what are the data and how do they relate to each other? You can, generally speaking, add any number of joins to a Postgres query, and then add any number of indices on the other side to make up for your poor data modeling. You don't really have this luxury with Scylla, or Cassandra. They use SSTables. They don't provide the ability to do joins. They really aren't much in the way of indices other than the tables themselves. Ahead of time, as we were writing the data to the database, we need to be thinking about how we're going to be querying it on the other side. The query we're going to be doing is fetch all timers about to expire.

What does about to expire mean in this sense? If we think about the basic elements that we just said were a part of a timer, it's got an expiry timestamp. It has a binary data blob. It has a Kafka topic and partition that the message is going to be written to. Most of the architecture work here was done by the Apache Cassandra team. My exposure to this ecosystem has all been through Scylla, so I'm going to attribute things to Scylla that were certainly done by people on the Apache Cassandra team. In Scylla, we have data that's distributed amongst a cluster of nodes. As you query the data in the nodes, generally speaking, each query, we want it to hit a single node. We don't want to be merging data together. We don't want to be searching across all the nodes for particular pieces of data. Generally speaking, we want to know ahead of time where a particular row is going to land in the cluster.

How do we do that? How do we distinguish where a row is going to go? There's a couple different layers of keys that exist on each row in Scylla, and we're going to use those. The primary key has two parts, which is like a relational database, we have a primary key on each row. The first part is the partitioning key, that's going to determine which node in the cluster a row is going to land on, and where on that node it's going to go. It's going to group the data into partitions that are shipped around as one unit. This is composed of one or more fields. There's also a clustering key that determines where in the partition each row is going to go. That's used for things like sort ordering. That's optional, but it also can have a variable number of fields in it. Generally speaking, the kinds of queries, the high performance read queries that we want to be doing, you need to include the partition key, an exact partition key in each read query that you're doing. You're not having a range of partition keys. You're not saying, give me everything. You need to provide, give me this partition key. The query we're performing is get all the timers about to expire. What does about to expire mean? It means we need to pre-bucket the data. We need to group our timers into buckets of timers that expire around the same time, so that we can query all those timers together.

We're going to be bucketing on 5-minute intervals. For example, a timer expiring at 4:48 p.m. and 30 seconds, we're going to bucket that down to 4:45. Everything between 4:45 and 4:50, those are going to land in the same bucket. We are still going to store the expiry time, but we're also going to have this bucket that's specifically going to be used for storage locality. We can't have the bucket alone be the primary key, because just like every other database that's out there, primary keys need to be unique in tables. If the 5-minute bucket was the sole primary key, you can only have one timer that existed per 5-minute bucket. That's not a very good data system. We're going to introduce a UUID field that's going to be on each row, and that's going to take the place of the clustering key. That's going to determine, again, where in the partition each row is going to land.

Our final table design looked like this. We had those same four fields that we talked about initially. We also introduced two new fields, this row, UUID fields, and the bucket fields, which is, again, the expiry timestamp rounded down to the nearest 5 minutes. You can see that on the primary key line down there, we have first the bucket field. That's the partitioning key. Second, we have the clustering key field, which is the UUID field. What do the queries look like that we're going to be doing on this table? We're going to be getting all the fields off of each timer row inside of each bucket, inside of this 5-minute bucket that starts at 4:45. The eagle-eyed among you might already be noticing a problem with this. If it's currently 4:48, and we get the timers that are in the bucket starting at 4:45. How are we going to do this 10-minute lookahead interval thing? How are we going to fetch the timers that start at 4:50 and 4:55? Because a 10-minute interval is necessarily going to span more than one, 5-minute data bucket. Further complicating things, this system is not necessarily always real time. It might be the case that this system is so far behind reality, which, in some cases, that might only be a couple seconds. It might be the case that there are still some existing timers that are falling into buckets that already ended. If it's 4:45 and 10 seconds, and you still have an existing timer that was supposed to expire at 4:44 and 59 seconds, you still have to be able to fetch that out of Scylla. Because maybe the scheduler is going to restart, and it's not going to be able to use the one that's floating around in memory.

How are we going to pull all the buckets and get the data? We can't just query the currently active bucket. We need to find out what buckets exist, which buckets that exist fall within our lookahead window. We need to query all of those for their timers. We introduced another table, a metadata table that was just going to hold every single bucket that we knew about. This is going to be partitioned just by its single field, the bucket timestamp. This was just going to give us access to query what buckets currently exist. Every time we insert data into our tables, we are going to do two different writes. We're going to store the timer itself. We're also going to do an insertion on this bucket table. Every insert in Scylla is actually an upsert. No matter how many millions of times we run this exact same query, it's just going to have one entry for each particular bucket because they all have the same primary key. What do our queries look like? We're first going to have to query every single bucket that exists in the database, literally every single one. That's going to come back into the memory of our gRPC service. We're going to say, of those buckets that exist, which ones fall into our lookahead window. That's going to be the four buckets from 4:40 to 4:45. We're going to query all the timers off of those. We're going to merge them into memory of the gRPC service, and then ship them down to the scheduler.

If we put this all together into one cohesive system view. On the external team side, we have a thing that creates timers, that's going to send a gRPC request across the wire to our service, that's going to store a timer in our Scylla database alongside a corresponding bucket. Then, every minute, our scheduler is going to call the get timers gRPC method, with a lookahead window. It's going to add the timers that fall into that window to its pending area in memory. When those in-memory timers expire, it's going to write them out to an Apache Kafka topic. Eventually, maybe there'll be a Kafka Consumer that picks that message up. This system, as I've described it, existed for about a year, maybe a-year-and-a-half without any major issues, modules that out of memory problem. We didn't have to do any major scaling operations. It didn't have any really big problems. We mostly just didn't think about it after it was out there and running. We were pretty happy with it. Eventually, we started to think about adding more users to our journeys' product. We started to think about using this timer system to support more use cases than just journeys. We realized that we would have to do some scaling work in this because what I've described has some poor scaling tendencies.

Jumping Forward, Q1 2023

Laine: My name is Hunter Laine. I've been an engineer on Lily's team at OneSignal for about two-and-a-half years. In a past life, I was a marketing operations manager in Prague.

We're going to take another leap forward in time to Q1 of this year. We have this effective timer service up and running pretty smoothly. It's capable of storing billions of concurrent timers and expiring them in a performant manner, while minimizing data loss, and easily integrating with the rest of our systems. It's essentially a set timeout function that's available across our entire infrastructure, without any use case-specific limitations. It sounds pretty good. We thought so too, so we decided it was time to actually start doing that integrating with the rest of our systems.

The Case for Scaling Up

We send about 13 billion notifications a day, so we wanted to use the timer service to ease a significant amount of load on our task queues. This could be useful for a myriad of things, from retrying requests on failures across multiple services, to scheduling future notifications, and many other areas we were already excited about. If we were going to use the timer service in these many critically important areas, we needed to ensure that it could handle a lot more timers reliably than it currently could. We needed to scale up. The original motivation for and use of the timer service was to enable these journey builders, no-code systems generally use as a marketing tool. These systems constitute a significant number of timers that we were storing and retrieving. However, when compared to the task of integrating with our delivery systems, it represented a relatively small scale of use. At the scale of use, we had opted for this, again, slightly more simplified architecture to avoid dealing with the more complex coordination required to make the timer service fully scalable. Specifically, when we talk about scaling issues, we will be focusing more on the scheduler portion.

Scaling the timer service vertically was no problem at all. We could and did add resources to both the gRPC service portion and the scheduler as needed. Scaling the gRPC service portion horizontally was also no trouble. We could easily add a pod or four to handle an increase in create, get, and delete requests from multiple clients. The slight hitch that we were now facing was that the scheduler was not quite so simple to scale horizontally. We'd not yet done the work to allow for multiple schedulers to run at the same time. See, each scheduler needs to ask the gRPC service for timers at some set interval. It does no one any good if each individual scheduler is asking for timers and getting all the same ones back. Then we're just duplicating all the work, instead of sharing the load. Plus, it certainly doesn't seem like a desirable feature to enqueue each message to Kafka multiple times as we see here. We needed to do a bit of a redesign to allow for multiple schedulers to share the task of scheduling and firing timers with each in charge of only a particular subset of the overall timers. How do we do that? If we wanted multiple schedulers to run in conjunction, we needed to find a way to group timers by more than just time, so that those schedulers could be responsible, each one, for requesting and retrieving only a particular group of timers.

First, we needed to make some adjustments to how our data was stored so the timers weren't stored just by the time that they were to be sent out, those 5-minute bucket intervals, but by some other grouping that different schedulers could retrieve from. We made some adjustments to our Scylla table schemas so that when a new timer is created, it is inserted into a bucket by both that time interval, and now a shard. While we were adjusting these schemas, we also actually decided to shrink that bucket interval from the 5 minutes we'd been using to 1-minute bucket intervals. This was because we were already noticing that our Scylla partitions were getting larger than we would like. We would like to keep our Scylla partitions in general relatively small, as this leads to more efficient and less resource intensive compaction. We determined which shard a timer belongs to by encoding its unique UUID to an integer within the range of the number of shards we have. We actually have our shards set to 1024, with timers pretty evenly distributed among them. Each scheduler instance is then responsible for an evenly distributed portion of those shards. We went to this granularity and this many shards, also in an effort to keep those Scylla partitions relatively small. This means we have a more efficient way to parse a far greater number of timers. This update also makes the bookkeeping of the buckets table much more efficient, and means that when querying for timers, we look much more particularly at a particular shard and time to retrieve from. We then just needed to adjust our get timers request so that when we do request those timers, we do it not just by time, as we were before, but in addition by that shard, so that we can get a particular subset of the overall timers.

We now have a system whereby timers are stored by both time and shard, and a way by which to retrieve those timers by both time and shard. We're there? Just one big problem. Each scheduler instance needs to have state so that it can reliably ask for the same subset of timers every time it asks. How does each scheduler know who it is? This was important because if a scheduler instance were to restart for any reason, it needed to start back up as the same scheduler so it could pull the same timers. If we have multiple instances pulling the same subset of timers, we're back at square one. If scheduler 2 were to restart for any reason, now thinking that it's scheduler 1, when we already have a scheduler 1 up and running and pulling timers for scheduler 1, we're again duplicating work and erroneously firing timers multiple times. Plus, even worse here, no one is looking after the shards assigned to scheduler 2.

In order to solve this, we deployed a new version of the scheduler as a stateful set in Kubernetes, which, among other things, gave us a stable unique name for each instance of the scheduler every time it started up, with each name ending in a zero-indexed value up to the number of replicas. Each scheduler could then take that value and calculate a range of shards that it's responsible for retrieving timers from. Importantly, this means that each shard of timers and therefore each individual timer will only ever be retrieved by one scheduler instance. We now have this system where we store timers by both time and shard, where we have retrieval requests that can get a subset of timers by both time and shard. Now, schedulers that have state and can therefore reliably request the same subset of timers every time they ask. We have achieved full scalability.

Performance Characteristics

With these changes made to the architecture of the timer service, adding new nodes to the scheduler is as simple as increasing the replica count in the config file, which makes scaling the timer service both vertically and horizontally, possible and simple to do as we continue to use it in more parts of our system. Because we now have multiple instances of both the gRPC service portion and the scheduler, we've made the timer service much less susceptible to serious outage if a node were to go down. Previously, we only had one scheduler, so if it went down, it was it. There were no timers being retrieved, or processed, or fired, and no messages being enqueued to Kafka by the timer service until that node were to restart. Now, because we have multiple instances, each in charge of only a particular subset of the overall timers, if a node goes down, it has much less impact on the overall functioning of the system. On single pod performance, each scheduler node is capable of handling about 10,000 timers per second. Frankly, that's without even pushing it, which makes horizontally scaling incredibly powerful. Each gRPC instance handles about 17,000 requests per second really without trouble.

Callouts

There are a few things to note about our timer service as it exists today. First, the timer service has an at-least-once guarantee. This is because there's a space between the scheduler enqueuing its message to Kafka, and then turning around and requesting that that timer be deleted. If the scheduler were to restart for any reason between those two actions, or if the gRPC service has some communication error between the scheduler or between Scylla when processing that delete, the timer will fail to be deleted and will again be retrieved and fired. Because of this, the timer service does expect that all downstream consumers manage their own idempotence guarantees. Another callout about the timer service is that timers will fire close to but not exactly at the time they're scheduled. This is because schedulers still need to pull in timers at that periodic interval via that get timers request. We currently have each scheduler set to pull in the next 10 minutes of timers every 1 minute. Possibly the biggest callout about the timer service as it exists today, is that once a timer has been retrieved by the scheduler, there's no method by which to cancel it. This is because the only way to delete or cancel a timer in this system the way we have it currently, is via that delete timer request on the gRPC service, which deletes from the database. Therein, if the timer has already been retrieved from the database and is now in the scheduler, there's no way to currently stop it from firing.

Future Potential

The timer service has proven to be incredibly powerful within our own internal systems. As such, we really believe it could be useful to other organizations and individuals. There's definite potential that we will move to open source it in the near future. As we continue to grow as an organization, we intend to use the timer service as we create new services, but want to dedicate resources to integrating it more broadly across our existing infrastructure in order to streamline. We also would like to add and fine-tune features such as perhaps the ability to cancel the timer at any point in the lifecycle.

What We Have

We're about four years on now, from that original conception of a timer service to enable those journey builders. We now have an incredibly robust system that's capable of storing billions of concurrent timers and expiring them in a performant manner, while minimizing data loss, easily integrating with the rest of our systems, and, importantly, scaling simply both vertically and horizontally to accommodate our future use.

Questions and Answers

Participant: How did you handle when you added the new nodes to the scheduler? If you have 4 running in each kit, 250 charge, you add a fifth one, now they're going to be 200? How did you keep them from stepping on each other's toes as you increased that number?

Mara: We would take all of the nodes down at the time that a restart occurred, and start them back up. There would maybe be 30 seconds of potential timer latency. We weren't super concerned about millisecond accuracy of the schedulers in this case.

Participant: One of the interesting places for scheduling [inaudible 00:41:14].

Mara: I was the manager on the team, at the time we were initially building out this project. The engineer that was in charge of the implementation, he was running all these complicated data structures past me, and I suggested, did you try the most naive possible approach? Did you try spawn a future and wait on a task? He was like, that can't possibly perform well enough for our needs. I said, why don't you try it and we'll see if it performs well enough? It did. We actually didn't really continue to evaluate more complicated data structures because the easiest one worked for us.

Participant: How do you handle the [inaudible 00:42:38] that replicated the time zones, or anything related here. Did you sidestep or how did you handle that?

Mara: That was totally sidestepped. These timers were all in UTC. These were all server-side events. Every timer that's in this system, is a timer that was scheduled by another team at OneSignal. If somebody cared about something being time zone sensitive, they cared about, you send a notification at 8 a.m. in the user's time zone, they would have to figure out when 8 a.m. on the user's time zone was on a UTC clock.

Participant: You mentioned that you have a secondary table that you store all the tickets, and you request all the tickets [inaudible 00:43:34] for next interval you want to fetch. That's essentially like a full table scan?

Mara: That is basically a full table scan. The thing that's saving us from really having a bad time on that is the fact that the number of entries on that table is much more constrained than the number of entries on the timers table. There's going to be a maximum of one entry on that table for every 5-minute interval. I don't think the timer system actually has a limit on how far out you can schedule timers. The systems that currently enqueue timers I believe do have limits on how long they will allow you to schedule out a timer. Like journeys will not allow you to, say, send a notification then wait for 30 days. That just puts a constraint on the number of things that are allowed to live in the dataset. Yes, it's potentially very expensive.

Participant: I have a few questions about the database structure itself. It spawns data in the case of Cassandra? It must be more performant than the Scylla database. At the backend, how many nodes do you have, and what the replication factor is? [inaudible 00:45:06]

Mara: We have just a single data center, I believe four timers. We have I think six nodes with Scylla. I'm not sure if we did any of our own benchmarking of Scylla versus Cassandra. I think at the time, our CTO had quite a strong aversion to Java. I believe we actually run zero Java code in production, and adopting Scylla versus Cassandra was partially motivated by that desire. We have been quite happy with Scylla since we've adopted it.

Participant: [inaudible 00:46:06]

Mara: A timer would be returned by the gRPC service multiple times until it was expired, until that delete timer method was called for that particular timer. That, I suppose, is another inefficiency of the system. There's a bunch of data retransmission. Again, this was done in the name of the simplicity of the system. That is something that probably could be removed in the future if we wanted to optimize that a bit more.

Participant: It's basically that there is a possibility here of some crashing bug in a particular [inaudible 00:47:20], if one of your scheduled nodes stopped and crashed, do you have a mechanism of recovering that, or that is not a problem?

Mara: That hasn't been a problem for us, I think because the API of a timer is so small. The variance in each timer is time and data and Kafka settings basically. We don't give you more Kafka settings than topic and partition. The attack surface is really small. We haven't had any instances of a malformed timer that was causing issues for the service. If a particular node of the timer system was just crash looping, we would basically just have to page an engineer and have them look into it. There's not auto-healing built into it.

 

See more presentations with transcripts

 

Recorded at:

Apr 26, 2024

BT