# Kafka + Flink: A practical, how-to guide

Posted on Sep 2nd, 2015 by and

A very common use case for Apache Flink™ is stream data movement and analytics. More often than not, the data streams are ingested from Apache Kafka, a system that provides durability and pub/sub functionality for data streams. Typical installations of Flink and Kafka start with event streams being pushed to Kafka, which are then consumed by Flink jobs. These jobs range from simple transformations for data import/export, to more complex applications that aggregate data in windows or implement CEP functionality. The results of these jobs may be actually fed back to Kafka for consumption by other services, written out to HDFS, other systems like Elasticsearch or user-facing web frontends.

In such pipelines, Kafka provides data durability, and Flink provides consistent data movement and computation. data Artisans and the Flink community have put a lot of work into integrating Flink with Kafka in a way that (1) guarantees exactly-once delivery of events, (2) does not create problems due to backpressure, (3) has high throughput, and (4) is easy to use for application developers.
In this blog post, we provide a hands-on guide for developing your first Flink application using the Kafka consumer and producers bundled with Flink.

### A 5-minute Introduction to Kafka

In order to understand how Flink is interacting with Kafka, let us first introduce the main concepts behind Kafka. Apache Kafka is a distributed, high-throughput message queuing system designed for making streaming data available to multiple data consumers. Kafka makes the streaming data durable by persisting incoming messages on disk using a log data structure. This allows various downstream consumers to read the stream at different positions and different speeds and also read messages from the past, i.e., “replaying history”.

The most important abstraction in Kafka is the topic. A topic is a handle to a logical stream of data, consisting of many partitions. Partitions are subsets of the data served by the topic that reside in different physical nodes. Services that put data into a topic are called producers. For example, Kafka comes bundled with a “console producer” which puts the strings from standard input into a topic.

The opposite of a producer, i.e., a service that reads data from a topic is called a consumer. The individual partitions of a topic are managed by a Kafka broker, a service that is installed on the node that contains the partition and allows consumers and producers to access the data of a topic. When a partition is replicated (for durability), many brokers might be managing the same partition. Then, one of these brokers is designated as the “leader”, and the rest are “followers”.

Kafka is assigning each message within a partition a unique id, the so-called “message offset”, which represents a unique, increasing logical timestamp within a partition. This offset allows consumers to request messages from a certain offset onwards, essentially consuming data from a given past logical time.

To enable this model for multiple consumers, there is a concept of “consumer groups”. For each consumer group, messages are guaranteed to be consumed at least once.

In the picture above, a producer (a cluster of web servers) is pushing messages into one topic with four partitions. Two brokers are managing two partitions each. The example also contains three consumers divided into two consumer groups. Both consumer groups will see all messages written into the topic even though they both consume overlapping subsets (partitions) of the topic.

### Hands-on: Use Kafka topics with Flink

Let us now see how we can use Kafka and Flink together in practice. The code for the examples in this blog post is available here, and a screencast is available below.

#### Preparation: Get Kafka and start it locally

The easiest way to get started with Flink and Kafka is in a local, standalone installation. We later cover issues for moving this into a bare metal or YARN cluster. First, download, install and start a Kafka broker locally. For a more detailed description of these steps, check out the quick start section in the Kafka documentation.

#get kafka
wget http://mirror.softaculous.com/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
# unpack
tar xf kafka_2.10-0.8.2.1.tgz
cd kafka_2.10-0.8.2.1
# start zookeeper server
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
# start broker
./bin/kafka-server-start.sh ./config/server.properties 
# create topic “test”
./bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
# consume from the topic using the console producer
./bin/kafka-console-consumer.sh --topic test --zookeeper localhost:2181
# produce something into the topic (write something and hit enter)
./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

Now, we have a broker and a Zookeeper server running locally and we have verified that reading and writing is properly working.

The next step is to subscribe to the topic using Flink’s consumer. This will allow you to transform and analyze any data from a Kafka stream with Flink.

<dependency>
<version>0.9.1</version>
</dependency>

First, we look at how to consume data from Kafka using Flink. We will read strings from a topic, do a simple modification, and print them to the standard output. We will use the console producer that is bundled with Kafka. The end result is a program that writes to standard output the content of the standard input.

Here is how you can create a Flink DataStream out of a Kafka topic. Note that both the DataStream and topics are distributed, and Flink maps topic partitions to DataStream partitions (here, we are reading the required Kafka parameters from the command line):

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream&lt;String&gt; messageStream = env
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));

Once a DataStream is created, you can transform it as you like. For example, let us pad every word with a fixed prefix, and print to stdout:

messageStream
.rebalance()
.map ( s -&gt; “Kafka and Flink says: ” + s)
.print();

The call to rebalance() causes data to be re-partitioned so that all machines receive messages (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances). The full code can be found here.

The command-line arguments to pass to this Flink program in order to read from the Kafka topic “test” that we have created before are the following:

--topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myGroup

Since we are reading from the console producer, and printing to the standard output, the program will simply print the strings you write in the console. These strings should appear almost instantly.

Let us now look on how you can write into a Kafka topic using Flink. We will, as before create a StreamExecutionEnvironment, and a Flink DataStream using a simple String generator.

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream&lt;String&gt; messageStream = env
.addSource(new SimpleStringGenerator());

Then we will put this DataStream into a Kafka topic. As before, we read the relevant Kafka parameters as command line arguments:

ParameterTool parameterTool = ParameterTool.fromArgs(args);

parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("topic"),
new SimpleStringSchema()));

The command line arguments to pass to the program in order to write the strings to the Kafka console topic we created above are the following;

--topic test --bootstrap.servers localhost:9092

Since we are writing to Kafka’s console topic, the result of the program is that the strings will appear in the standard output.

Check out a screencast of running the examples of this section:

#### Running the example in a cluster

Of course, the Flink code we saw also works in a cluster. To run this code in a cluster, first install Kafka by either using your favorite distribution or follow Kafka’s documentation for setting up multiple brokers. Make sure that you use the Flink consumer that corresponds to your Kafka version (currently 0.8.1 and 0.8.2 are available).

When creating a new topic in a cluster, it is advised to set an appropriate number of partitions so that all parallel instances of Flink receive data. To achieve that, the number of partitions has to be at least equal to the number of Flink instances.

#### How are Kafka partitions assigned to Flink workers?

When creating a Kafka topic, the user has to specify the number of partitions. Partitions are assigned to Flink’s parallel task instances.

When there are more Kafka partitions than Flink tasks , Flink consumer instances will subscribe to multiple partitions at the same time:

Flink maintains assignment of partitions to Flink instances also after failures, keeping the partitioning deterministic, which is essential for exactly-once delivery.

#### How does Flink retain the order of messages in Kafka partitions?

Since Kafka partitions are ordered, it is useful for some applications to retain this order both within and across Flink jobs.

Inside a Flink job, all record-at-a-time transformations (e.g., map, flatMap, filter, etc) retain the order of their input. Partitioning and grouping transformations change the order since they re-partition the stream.

When writing to Kafka from Flink, a custom partitioner can be used to specify exactly which partition an event should end up to. When no partitioner is used, Flink will use a direct mapping from parallel Flink instances to Kafka partitions. This means that multiple Flink jobs that use Kafka as an intermediate pipe retain the order within partitions:

How does Flink guarantee exactly-once processing with Kafka?

Flink’s Kafka consumer integrates deeply with Flink’s checkpointing mechanism to make sure that records read from Kafka update Flink state exactly once. Flink’s Kafka consumer participates in Flink’s checkpointing mechanism as a stateful operator whose state is Kafka offsets.

Flink periodically checkpoints user state using an adaption of the Chandy-Lamport algorithm for distributed snapshots. Checkpointing is triggered by barriers, which start from the sources and travel through the topology together with the data, separating data records that belong to different checkpoints. Part of the checkpoint metadata are the offsets for each partition that the Kafka consumer has read so far. Offsets are committed only when a checkpoint has been completed. This means, that the offsets that are part of the checkpoint are consistent with the state backups generated by the same checkpoint, which results in exactly-once state update guarantees when restarting from failures.

On completion of a checkpoint, the Kafka consumer also commits the offsets to Zookeeper. From the offset in Zookeeper, users can still use tools such as Kafka’s Offset Checker.

The following presentation shows a step-by-step example of the checkpointing with the Kafka consumer:

Streaming systems like Flink need to be able to slow down upstream operators (for example the Kafka consumer) if downstream operators operators (like sinks) are not able to process all incoming data at the same speed. This is called backpressure handling (you can read more about Flink’s backpressure handling here).

Flink’s Kafka consumer handles backpressure naturally: As soon as later operators are unable to keep up with the incoming Kafka messages, Flink will slow down the consumption of messages from Kafka, leading to fewer requests from the broker.

Since brokers persist all messages to disk, they are able to also serve messages from the past. As soon as the operators speed up again, Flink will consume the build up messages at full speed. This behavior makes Kafka a perfect fit as a buffer between streaming sources and Flink, as it provides a durable buffer for events at load spikes.

### Some next steps Check out the source code for the examples in the blog post here. Read here on how people have successfully put Kafka and Flink to work in the real world. If you are interested in trying out Flink and Kafka yourself, visit the Flink documentation for more details.

1. alex
Sep 8th, 2015, 1:24 pm

How can flink guarantee exactly once given that the producers for kafka have a retry logic which may enqueue a msg multiple times in case of failure in the same topic.

This basically means at least once.

I did read the Chandy-Lamport algo as well as your adaptation and while you can guarantee that processing ‘inside’ flink, you can’t really make the same claim if you integrate with a system that effectively doesn’t have idempotent ‘put’s .. that is messages cannot be duplicated at the source to begin with.

Did i miss something?

• Kostas Tzoumas
Sep 9th, 2015, 1:50 pm

You are exactly right. Flink guarantees exactly-once inside Flink (i.e., exactly-once state updates as mentioned). It is possible to design Flink sources and sinks that do guarantee that by taking part in the checkpointing mechanism, stay tuned for an upcoming blog post on that!

• Ankit
Jan 11th, 2016, 4:39 pm

Hi Kostas,

Thanks for clarifying this. But I am still curious about how to use checkpointing to guarantee exactly-once for sources.

Is there a blog post explaining this already out? If not, could you please share some thoughts on how can we achieve this for sources.

Thanks!
Ankit

2. Andra Lungu
Oct 26th, 2015, 2:35 pm

This works like a charm, Robert! Thanks for sharing 😀

3. Lukas
Nov 27th, 2015, 1:51 pm

Hi
I’m trying to do the howto but I get an error at the moment:
Have you seen this error before?
Kind regards
Lukas

• Lukas
Nov 27th, 2015, 2:52 pm

Using kafka_2.10-0.8.2.1 as in the blog did resolve the issuse.

4. Viswanath
Feb 3rd, 2016, 6:53 pm

Thanks for the tutorial. When I tried to run the kafka consumer in my IDE, I ended up having the following error.

I added the following maven dependency but still doesn’t get picked up to get past the NoClassDefFoundError.

provided

• Robert Metzger
Feb 3rd, 2016, 7:06 pm

Hi, which IDE are you using? Are all the dependencies using the same Flink version?

5. Chester Chen
Feb 23rd, 2016, 10:11 pm

1) Import the maven profile to IDEA,
2) from command line, run mvn clean package
3) from IDEA run to run ReadFromKafka, but got

Looks at the DeserializationSchema seems it belongs to flink-streaming-connectors.jar but it’s not listed as the dependency in pom.xml file.

Also, the flink-streaming-connectors.jar seems to be not having 0.9.1 version. Not sure what to include

• Robert Metzger
Feb 24th, 2016, 2:53 pm

Hi Chester,

You have to use the same Flink version for all dependencies.
The DeserializationSchema is contained in the “flink-streaming-java.jar” package, which is shipped in the default distribution of Flink.

Can you maybe post the full stack trace?

6. Tong
Aug 21st, 2016, 2:24 pm

Some glitches in the sourcecode, like: “DataStream<String>”, which is in fact DataStream”. Great blogpost thanks.

7. Paul
Aug 25th, 2016, 10:47 am

Hi,
I hit this exception and changing flink-connector-kafka version to 0.10.1 solved it, following the answer here: http://stackoverflow.com/questions/35374482/flinkkafka-gethostnameport