### Building Infrastructure in Parallel

In the early days of stacker at Remind, the number of stacks that we were managing was just a handful....

Here is a scenario that happens at **Remind** every day: a teacher sends a class announcement, which goes out to every class participant. There are several ways that an announcement can be delivered, as each recipient may choose their preferred delivery mechanism: push notification, email and SMS are supported.

Statistics are collected about whether each recipient has received and/or read the message, and then, they are presented to the teacher as an aggregated view, also known as a **message summary**.

When reading a message summary, teachers may realize a class member did not receive the latest class announcement. The teacher can then reach out to that student or parent, maybe in the classroom, to understand why they were unable to receive that message so that they don’t miss important announcements in the future.

In this blog post, we discuss how we were able to scale up message summaries using a probabilistic counting algorithm while making it faster, on both reads and writes, and more memory efficient.

Message summaries contain essentially a **counter** (integer) and a **set of user UUIDs** (set of strings) for each status a recipient can be at: *read*, *delivered* or *failed*.

**delivered and read**

If a student named Chris **receives** and **reads** that message, then both `delivered_count`

and `read_count`

are incremented by 1 and both `delivered_uuids`

and `read_uuids`

contain Chris’s UUID.

**delivered but not read**

A parent named Amy, who has **received** but **not read** that message yet, would have caused `delivered_count`

to be incremented by 1 and `delivered_uuids`

would contain Amy’s UUID.

**failed**

Lastly, if Dave did not receive that message at all, maybe she changed her mobile device recently, so `failed_count`

would be incremented by 1 and `failed_uuids`

would contain Dave’s UUID.

When sending an announcement, it may take a few seconds to deliver it to all recipients (see *t1* in the figure below). If there are thousands of recipients, then it would take even longer. During that time, successes and failures are stored in a separate caching layer (*from t2 to t4*). Only after that time period, the *counters* and the *set of user UUIDs* are finally persisted into DynamoDB (*t3*).

While storing every single recipient’s UUID provides us with an **accurate** message summary, it has proved to be **impractical** due to data limitations. Message summaries are stored in a DynamoDB table, which limits each row to 400KB. That’s why each set of user UUIDs may be truncated to prevent it from growing unlimited at the time it is persisted to DynamoDB.

An accuracy problem arises while tracking message **reads** afterwards. It may take a few minutes, hours or days for someone to read a message, so the initial counters in a message summary **may change** after they are initially persisted into the database. That’s why accurate counters are challenging.

We had to revisit our design for message summaries when a new set of requirements emerged:

- to support a larger number of recipients
- to support an arbitrary list of recipients (not necessarily members of a class)

We aimed at supporting message summaries for up to 50,000 unique recipients. That’s about 10x more than what we were accustomed to seeing at maximum.

Supporting an arbitrary list of recipients exposed unnecessary coupling between our message summaries DynamoDB table and another table, which was required to track message reads. We had to come up with a solution that would remove that dependency while making sure each row in DynamoDB would not exceed its 400KB row limit.

Before we started looking for a new solution to message summaries, we looked into our existing data to understand the problem better. Soon, we realized not all messages are equal. In fact, *99%* of Remind announcements are sent to **250** recipients or less. That means the problem of computing message summaries for lots of recipients is actually restricted to 1% of all announcements. Therefore, we could tackle most of the problem with 100% of accuracy, since the set of user UUIDs wouldn’t exceed 250 items, they would be able to fit well into a DynamoDB row. Then, we had to design a more elaborate solution for the remaining 1%.

If one wants to track the cardinality of a large set with 100% accuracy, one needs to pay that cost by allocating a large amount of memory to it. There is no magic sauce. On the other hand, if one is willing to give away a bit of accuracy, it is possible to achieve good enough accuracy, say 99%, while using a much lower amount of memory.

For us, tracking 50,000 user UUIDs would consume ~1.5MB, which is definitely impossible in a single DynamoDB row. However, it would be a pretty good deal if we could achieve 99% accuracy by using less than 400KB.

We went back to discuss with the product team the idea of providing good-enough accuracy for large message summaries. A caveat is that we would not be able to show individual user UUIDs in that case. It didn’t take long before the team agreed that the compromise was reasonable since it would enable us to scale up message summaries to much larger sizes.

After reading a bit about different cardinality estimation algorithms, we settled on using A Linear-Time Probabilistic Counting Algorithm for Database Applications, from a paper published in 1990. It is simple enough we felt confident we could implement it on top of our existing DynamoDB solution. It works quite well for small cardinalities, a category our use case falls into since we are not dealing with several million or billion items. Redis, for instance, uses linear counting in its HyperLogLog data structure when cardinalities are less than 40,000.

Here is a section from that paper that explains how linear counting works:

Linear counting is a two-step process. In step 1, the algorithm allocates a bit map (hash table) of size m in main memory. All entries are initialized to “0”s. The algorithm then scans the relation and applied a hash function to each data value in the column of interest. The hash function generates a bit map address and the algorithm sets this addressed bit to “1”. In step 2, the algorithm first counts the number of empty bit map entries (equivalently, the number of “0” entries). It then estimates the column cardinality by dividing this count by the bit map size m (thus obtaining the fraction of empty bit map entries V

_{n}) and plugging the result into the following equation: n^ = -m * ln V_{n}(The symbol ^ denotes an estimator)

While I’m not sure I understand the math behind that paper, let me take a stab at explaining that logic using our use case as an example. Say we take a user UUID (32 bytes) and apply a hash function to it in order to get a corresponding bit map address, whose value is a single bit. After applying that hash function to an entire set of user UUIDs, all the corresponding addresses for those user UUIDs should be 1. Sure, conflicts can occur, two or more UUIDs yielding the same address, that’s why it’s an estimator in the first place.

The point is one can control the bit map size in order to achieve a certain accuracy. Well, if each UUID (32 bytes) corresponds to a 1-bit value in the bit map, then one could estimate the cardinality of a set of user UUIDs using **256 times** less space. In practice, you want to have your bit map size a few times larger than the original set cardinality, in order to avoid conflicts which would skew your estimator. That is called **load factor** in the paper. Even so, by having a load factor of 16, one could obtain **16x** savings in space by using such estimator. That’s a significant amount of savings!

After our initial implementation was ready, we ran a few simulations to validate it was working according to the expected average error rate. We wanted to achieve something with an avg. error rate < 1%. The graph below shows the results of our initial experiments:

We implemented it as a Go package whose source code is publicly available here. The avg. error rate was less than 1% in our initial simulations, which gave us confidence to adopt it in production. Finally, we went with a hybrid approach: accurate counters were used to track messages with <= 250 recipients, whereas linear counting was used for messages with > 250 recipients.

We finally deployed our probabilistic-based approach to production. We kept both versions running so that we could compare their results against each other. We looked into the distribution of the differences between each counter. For instance, here is a graph that shows p50/p95/max of the distribution of differences in `delivered_count`

.

Our goal was to minimize those values. On the other hand, we didn’t expect to reduce them to zero given that the new solution had an inherent error rate and the old implementation had a few known bugs. We were able to use those data points to kick off thorough investigations on specific cases. After looking into several of them, the probabilistic-based approach showed to be closer to the truth than the original implementation.

Performance-wise, the new implementation is also much faster than the original one. For instance, the median latency on reads dropped to **half** of its original value, while p95 dropped to a **third (!!)** of its original value.

However, there was one metric we were not so happy with: space utilization. Surprisingly, each row in the new message summaries table was much larger than the previous version by several orders of magnitude.

When we opened up one of the new message summary data structures to understand why they were so large, we realized they were very repetitive. Each bit map is initialized with a bunch of zeros. Therefore, its default value is a long string of 0 bits taking up 8KB. Generally, it is likely there will be long strings of 0’s or 1’s in the bit map, which makes it really suitable for compression.

We decided to compress all bit map fields with Go’s compress/gzip package before writing it into DynamoDB. After compression, each DynamoDB row became much smaller. For instance, the default value of a bit map went from **10,937** bytes to **69** bytes. We finally can declare victory!

Here are the key takeaways we learned during our process to scale up message summaries:

- Know your data! Maybe it can help you to reduce your problem to a more tractable version.
- Scaling up if often about trade-offs. Bring them up with the entire team, including non-technical folks, so that requirements can be revisited.
- In math we trust. Who would have guessed a probabilistic approach would help us to achieve better accuracy?