Uber’s Analytics Pipeline
At Uber, we use Apache Kafka as a message bus for connecting different parts of the ecosystem. We collect system and application logs as well as event data from the rider and driver apps. Then we make this data available to a variety of downstream consumers via Kafka.
Data in Kafka feeds both real-time pipelines and batch pipelines. The former data is for activities like computing business metrics, debugging, alerting, and dashboarding. The batch pipeline data is more exploratory, such as ETL into Apache Hadoop and HP Vertica.
In this article, we describe uReplicator, Uber’s open source solution for replicating Apache Kafka data in a robust and reliable manner. This system extends the original design of Kafka’s MirrorMaker to focus on extremely high reliability, a zero-data-loss guarantee, and ease of operation. Running in production since November 2015, uReplicator is a key piece of Uber’s multi–data center infrastructure.
What’s a Mirror Maker, and Why Do We Need One?
Given the large-scale use of Kafka within Uber, we end up using multiple clusters in different data centers. For a variety of use cases, we need to look at the global view of this data. For instance, in order to compute business metrics related to trips, we need to gather information from all data centers and analyze it in one place. To achieve this, we have historically used the open source MirrorMaker tool shipped with the Kafka package to replicate data across data centers, as shown below.
MirrorMaker (as part of Kafka 0.8.2) itself is quite simple. It uses a high-level Kafka consumer to fetch the data from the source cluster, and then it feeds that data into a Kafka producer to dump it into the destination cluster.
Kafka’s MirrorMaker Limitations at Uber
Although our original MirrorMaker setup started out sufficient, we soon ran into scalability issues. As the number of topics and the data rate (bytes/second) grew, we started seeing delayed data delivery or complete loss of data coming into the aggregate cluster, resulting in production issues and reducing data quality. Some of the major issues with the existing MirrorMaker tool (as of 0.8.2) for Uber’s particular use cases are listed below:
- Expensive rebalancing. As mentioned before, each MirrorMaker worker uses a high-level consumer. These consumers often go through a process of rebalance. They negotiate among themselves to decide who gets to own which topic-partition (done via Apache Zookeeper). This process can take a long time; we’ve observed about 5–10 minutes of inactivity in certain situations. This is a problem, as it violates our end-to-end latency guarantee. In addition, the consumers can give up after 32 rebalancing attempts and get stuck forever. Unfortunately, we saw this happen firsthand a few times. After every rebalance attempt, we saw a similar traffic pattern:
After the inactivity during the rebalance, MirrorMaker had a massive backlog of data that it had to catch up with. This resulted in a traffic spike on the destination cluster and, subsequently, all downstream consumers, leading to production outages and increased end-to-end latency.
- Difficulty adding topics. At Uber, we must specify a whitelist of topics within our mirroring tool to control how much data flows across the WAN link. With Kafka MirrorMaker, this whitelist was completely static, and we needed to restart the MirrorMaker cluster to add new topics. Restart is expensive, since it forces the high-level consumers to rebalance. This became an operational nightmare!
- Possible data loss. The old MirrorMaker had a problem—it seems to be fixed in the latest release—with automatic offset commit that could have resulted in data loss. The high-level consumer automatically committed the offsets for fetched messages. If a failure were to occur before MirrorMaker could verify that it actually wrote the messages to the destination cluster, then those messages would be lost.
- Metadata sync issues. We also ran into an operational issue with the way config was updated. To add or delete topics from the whitelist, we listed all the final topic names in a config file, which was read during MirrorMaker initialization. Sometimes the config failed to update on one of the nodes. This brought down the entire cluster, since the various MirrorMaker workers did not agree on the list of topics to replicate.
Why We Developed uReplicator
We considered the following alternatives for solving the aforementioned problems:
A. Split into multiple MirrorMaker clusters. Most of the problems listed above resulted from the high-level consumer rebalance process. One way to reduce its impact is to restrict the number of topic-partitions replicated by one MirrorMaker cluster. Thus, we would end up with several MirrorMaker clusters, each replicating a subset of the topics to be aggregated.
Pros:
– Adding new topics is easy. Just create a new cluster.
– MirrorMaker cluster restart happens quickly.
Cons:
– It’s another operational nightmare: we have to deploy and maintain multiple clusters.
B. Use Apache Samza for replication. Since the problem is with the high-level consumer (as of 0.8.2), one solution is using the Kafka SimpleConsumer and adding the missing pieces of leader election and partition assignment. Apache Samza, a stream processing framework, already statically assigns partitions to workers. We can then simply use a Samza job to replicate and aggregate data to the destination.
Pros:
– It’s highly stable and reliable.
– It’s easy to maintain. We can replicate a lot of topics using one job.
– Job restart has minimal impact on replication traffic.
Cons:
– It’s still very static. We need to restart the job to add and/or delete topics.
– We need to restart the job to add more workers (as of Samza 0.9).
– Topic expansion needs to be explicitly handled.
C. Use an Apache Helix-based Kafka consumer. Ultimately, we decided to use a Helix-based Kafka consumer. In this case, we’re using Apache Helix to assign partitions to workers, and each worker uses the SimpleConsumer to replicate data.
Pros:
– Adding and deleting topics is very easy.
– Adding and deleting nodes to the MirrorMaker cluster is very easy.