Drivetribe’s Modern Take On CQRS With Apache Flink®
This is a guest post from Aris Koliopoulos, a senior software engineer at London-based Drivetribe.
Drivetribe is the world’s digital hub for motoring. The platform was created by former Top Gear presenters Jeremy Clarkson, Richard Hammond, and James May, and the company has raised funding from the likes of 21st Century Fox, Atomico, and Breyer Capital. That’s quite some star power behind the product, and we expected that the co-founders’ fan base would drive a surge of traffic to the site as soon as it launched in November 2016. As the team tasked with building a product–from scratch–that could handle high user volume from the start and scale efficiently, we had to make key early decisions about Drivetribe’s architecture.
In this post, we’ll walk you through how and why we built drivetribe.com using Apache Flink® and other technologies, and we’ll also discuss how our approach enables a better experience for the end users.
1. An Introduction to Drivetribe
2. Architecture Overview
3. Apache Flink in the Drivetribe Stack
4. The Developer Experience with Flink
An Introduction to DrivetribeFirst, a little bit about Drivetribe. After creating an account, users join ‘Tribes’, which are topic-specific groups hosted either by one of the three co-founders or by other bloggers, experts, and motoring enthusiasts: Tribe subject matter covers everything from vintage, to off-road, to trips around the world. Inside a Tribe, users can post their own content and also comment, repost, or “bump” (our terminology for a “like”) content posted by other users. A user’s home feed highlights their most active tribes and also shows new posts from their tribes that we think they’ll be interested in. It’s up to us to personalize a user’s content to keep them engaged with the site, a familiar challenge in the world of feed-based digital media. And when launching a product like ours from scratch (with minimal training data on hand), flexibility in content ranking models is a must-have. We’ll come back to this later in the post.
Architecture OverviewWe at Drivetribe are a data-driven company, and we like to capture and count everything. When first thinking about our architecture, we decided that a traditional approach, where state transitions are not logged and a fleet of stateless servers mutates a gigantic database, would be error-prone and completely inflexible given our requirements.
We’re a team of Scala developers, and we greatly appreciate immutability. Additionally, having the ability to experiment with different algorithms and iterate quickly at scale was of paramount importance. This led to our first decision: to use Event-Sourcing and Command-Query-Responsibility-Segregation (CQRS).
The presence of an event log as a source of truth provides great flexibility in producing materialised views. Different teams can work on the same data in parallel. Algorithms can be easily replaced and applied to the whole history of events retroactively. The same process can be applied to bugs. Every downstream consumer can be launched, upgraded, and removed without any service disruption.
CQRS provides the ability to separate the read path from the write path and therefore to build, optimise, and scale them independently.
This process has a steep learning curve. Multiple components need to be provisioned, deployed, maintained, scaled, and troubleshot. Subtle errors can jeopardise the correctness of the distributed program, and consistency guarantees at the system-level cannot be trivially established.
The Golden Hammer is a well-known engineering anti-pattern, and our approach is not ideal for every distributed system. However, it has the potential to work well for large publishing platforms, thus we decided to take the risk and go for it.
To begin building such an architecture, we needed to make a few important decisions. Apache Kafka™ is known to be a scalable, fault-tolerant, and reliable distributed log. Choosing it as the backbone of our architecture was a no-brainer.
The system’s entry point consists of a fleet of stateless restful servers. They consume materialised views from Elasticsearch and Redis and produce data in the form of Kafka messages. This is fairly straightforward; professional software developers of all levels usually have some experience building and scaling stateless APIs.
We knew from the beginning that the most challenging part of this architecture would be the downstream distributed Kafka consumers. When evaluating alternatives, a few options came to mind: Apache Storm, Apache Samza, Apache Spark Streaming, and Apache Flink. We evaluated systems based on a few criteria:
- Provable scalability and robustness
- High performance
- Rich and extensible APIs (in Scala or worst case Java)
- Ability to manage state and battle-tested integrations with the rest of the stack (i.e. Kafka, Redis, and Elasticsearch to start)
After an initial evaluation and experimentation phase, Flink won the race. It was the only contender that was built with stateful streams of data in mind, and it ticked all the other boxes, too.
Apache Flink in the Drivetribe StackEvery byte of information generated in the platform passes through Kafka to Flink. User actions are represented by Kafka messages, and Flink is responsible for consuming those messages and updating our internal data model.
We run a number of Flink jobs, ranging from simple persisters to complicated stateful reducers. Persisters are used in trivial cases, where persisting a data point to the serving layer is enough.
Persisting a user profile, a comment, or a “bump” are such cases, and Flink’s low latency allows us to obfuscate the fact that consistency at the API-level is only eventual.
However, Flink’s true power lies in two key features: stateful computations and the ability to define topologies. At Drivetribe, we count and aggregate statistics about every entity. Some of them are visible (i.e., bump counts, comment counts, impressions counts, bumps-per-day). For example, the aggregations on each tribe’s leaderboard are handled by Flink: Many other stats are reduced internally to feed our ranking models. Newsfeed content is algorithmically generated, and we serve our ranking models through Flink. In addition, we recently launched a new feature that produces recommendations via streaming collaborative filtering on Flink: We compute statistics about every entity in the platform. This by default requires large amounts of state. Flink’s stateful stream abstraction makes defining windows and folding over them a breeze. We just need to define an appropriate datatype for an aggregate (for example, counters can be modelled as Monoids), and then Flink takes care of hash partitioning the key space and performing the distributed computations. This enables us to compute metadata about the content, such as click-through rates, “hotness”, co-occurrences etc and to use this metadata to feed internal ranking and prediction models.
Of course, code can be buggy, algorithms can be improved, and features can be enhanced. The fact that the source of truth is a log containing all the events that ever occurred enables us to change our implementation, deploy a new set of jobs on Flink, and process the event streams using the new algorithm. In a few hours, the views that the platform provides can be completely updated. For a young start-up such as ours, the ability to change our mind and iterate quickly is extremely valuable.
Replaying Kafka presents a number of challenges. Every state transition since the beginning of time becomes visible, and though fascinating to observe, it is unacceptable for a live system. To mitigate this, we employed a Black/White architecture (also known as blue/green).
Conceptually, there are two systems in the Drivetribe stack, Black and White. Each has its own Flink cluster, Elasticsearch cluster, Redis cluster, and restful servers. They both share Kafka. The following diagram depicts the architecture described above. This approach enables us to rebuild the state in Flink and the downstream materialised views in the serving layer in parallel to the live system. Once this process is done, a simple load balancer switch redirects the traffic to the new system with no downtime. This approach enables the deployment of major changes without the need for scheduled downtime or painful database migrations.
A second major challenge is replaying Kafka without violating causal consistency, and thus being able to produce deterministic states. Events usually have some sort of causal relationship between them; a user needs to be created before joining tribes and a post needs to exist before receiving “bumps”, impressions, or comments.
Kafka can only guarantee ordering within a partition of a topic. Assuming keyed messages, all the messages that correspond to a key will be delivered in order (thus, there is no chance an “update” event comes after a “delete” event on the same key). However, Flink jobs often consume multiple topics. Additionally, Flink allows keying the streams by any field. When processing messages as they arrive, assuming similar latency across all paths, it is unlikely that causality between messages will be violated. This is not true while replaying Kafka, where hundreds of thousands of messages are consumed every second. Any stream that consumes more than one topic, or uses a different key to partition the messages than the one in Kafka, needs to take this into account.
A third challenge comes from the presence of duplicates. Though Flink can guarantee “exactly-once” delivery with regards to its internal state, this is extremely difficult when taking the entire pipeline into account: A client will send the same request twice, or a Kafka producer will send the message twice, meaning that duplicates already exist in Kafka. A sink will deliver the processed messages to an external system twice. A naive approach that would increment a simple counter for every new message would probably compute incorrect results. To mitigate this, every message has a unique id, and every request is idempotent. We use data structures with set semantics to count, and we only perform idempotent updates to external systems.
Another challenge is what we call “The Clarkson Effect”. Jeremy Clarkson is extremely popular–multiple orders of magnitude more than the average user. The same applies to his tribe and his posts. When trying to partition the key space so that streaming computations can be distributed to a number of nodes, we naturally end up with a skewing problem. This becomes problematic when we need compute the total number of impressions his tribe has received, for example. If every impression hits RocksDB, the process naturally becomes slow and back pressure builds up. To mitigate this issue, we took advantage of Flink’s low-level API and developed an in-memory pre-aggregating buffer which computes partial aggregates and disseminates the results downstream using a processing-time trigger. This approach proved to yield up to an order of magnitude more throughput in aggressive pressure scenarios.
Finally, and until multi-message commits in Kafka become a reality, the platform cannot support transactions out of the box. However, transactional semantics are required in various aspects of the product. For example, when users are deleted, every tribe/post/comment/etc associated to them needs to be deleted as well. This consists of a set of messages that need to be committed as a unit across multiple topics. Partial propagation may lead to an inconsistent state, where the user was deleted but his posts still appear in the platform.
The restful servers that generally publish to Kafka are dispensable. The auto-scaling group in which they belong may add or remove servers depending on the load. Though we try to drain connections on deployments, this is not always successful. It was clear that we needed to manage transactions in some way.
Though Kafka does not support transactions, Redis does. The messages are committed to Redis as a group, and then a custom Redis source in Flink fetches the messages and pushes them back to Kafka. Because the semantics of the Kafka sink are in general “at-least-once”, it is really important that the messages are idempotent. If the producing process fails to commit the messages to Redis the transaction as a whole will fail. If the consuming process crashes, Flink will restart it and re-process the messages. If we think of Redis as the transaction log, this is basically considered to be a Saga transaction with roll-forward semantics.
The Developer Experience with FlinkFlink’s high level APIs make defining stateful streaming computations as easy as manipulating simple Scala collections. Flink’s low level APIs allow experienced users to extend and optimize for every streaming use case.
Although getting started is easy, the road to a stable, scalable, efficient system can be paved with a steep learning curve. A few tips for running Flink in production we learned the hard way:
- Run as many jobs as possible. Ideally, one per graph. No matter how well-tested the code is, there will still be failures in production, and it is really important to offer a degradable level of service. Additionally, each graph has different requirements and it is important to be able to manage, scale and troubleshoot them independently.
- Pay attention to per-key state size. Stream performance degrades when the state per key increases over a few hundreds of kilobytes. Additionally, pushing large messages to downstream operators will eventually saturate the network.
- Pay attention to stateful operators when using savepoints. Operator names need to be unique and all class references needs to remain unchanged. Otherwise, deserialization will fail and the state will need to be built from scratch.
- Pay attention to checkpoint management. Checkpoints are not incremental, and it is easy to run out of disk space if there is no background process cleaning up.
- Devote time to building a proper deployment pipeline. Continuous integration is not as easy as stopping and starting a stateless server. It will take time to perfect.
- Connected streams are heavy. If the purpose is to just enrich the data, consider denormalising upstream.
- Message keys and payloads should be constructed so that you can take advantage of Kafka’s compacted topics. This will allow to save storage space, decrease processing times and improve ordering guarantees. An update event should contain the full updated payload (e.g. the updated user profile), as opposed to a diff.
- Gather and analyze metrics, or else it is difficult to debug production issues.