logo

Kafka Streams were introduced in Kafka 0.10.x and act as a way of programatically manipulating the data from Kafka. William Hamilton from Funding Circle introduced the concepts in a lightening talk during ClojureX. As discussed by myself and William, make Java Interop your friend.

I’ve based my example from James Walton’s Kafka Stream example which you can find on GitHub.

The Quick and Dirty Basic Stream Demo

First add the dependencies to your project.

[org.apache.kafka/kafka-streams "0.10.0.1"]

Configuration

First of all some configuration, the properties we’re going to use give the application a name, the Kafka broker to work with and the key/value classes to use for each message (in this example they are both strings). With those properties we then create a StreamsConfig class.

(def props
 {StreamsConfig/APPLICATION_ID_CONFIG, "my-stream-processing-application"
 StreamsConfig/BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"
 StreamsConfig/KEY_SERDE_CLASS_CONFIG, (.getName (.getClass (Serdes/String)))
 StreamsConfig/VALUE_SERDE_CLASS_CONFIG, (.getName (.getClass (Serdes/String)))})

(def config
 (StreamsConfig. props))

Creating the Builder

The main builder is defined first then we’ll add the topic and config on when the stream is created.

(def builder
 (KStreamBuilder.))

Defining the Topic

Just a string array of topic names, Kafka Streams can read more than one topic.

(def input-topic
 (into-array String ["topic-input"]))

Working with the Stream

While the stream is running every event passed through the topic becomes a KStream object, it’s a case of passing that through a method to do some work on the content of that stream. In this case we’re mapping the values (.mapValues) and converting the value of the key/pair (v) to a string then counting the length. That thing to do is print out the results to the System.out.

(->
 (.stream builder input-topic)
 (.mapValues (reify ValueMapper (apply [_ v] ((comp str count) v))))
 (.print))

It’s worth looking at the actual Java API for the Kafka KStream class. There are lots of methods to manipulate the data passing through, this might result in a value being sent to another Kafka topic or it just being written out to a file. Take the time to study the options, you’ll save yourself time in the long run.

Setting It All Off

The final parts of the puzzle.

(def streams
 (KafkaStreams. builder config))

(defn -main [& args]
 (prn "starting")
 (.start streams)
 (Thread/sleep (* 60000 10))
 (prn "stopping"))

The main function starts the service and will keep it alive for ten minutes.

Packaging it all up

I’m using leiningen, it’s a simple case of creating an uberjar.

$ lein uberjar
Compiling kstream-test.core
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).
log4j:WARN Please initialize the log4j system properly.
Created /Users/jasonbell/work/dataissexy/kstream-test/target/uberjar+uberjar/kstream-test-0.1.0-SNAPSHOT.jar
Created /Users/jasonbell/work/dataissexy/kstream-test/target/uberjar/kafka_streams.jar

Testing the Service

So straight out of the box, Kafka 0.10 is installed in /usr/local, I’m going to be the root user while I run all this (it’s just a local machine).

Start Zookeeper

$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka

$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties

Create the Topic

$KAFKA_HOME/bin/kafka-topics.sh --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-input


Created topic "topic-input".

 

Start a Producer and Add Content

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-input
This is some data
Thisis some more data
This is more data than the last time
asdas
asd
dfgd

Start the Uberjar’d Stream Service

$ java -jar target/uberjar/kafka_streams.jar
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).
log4j:WARN Please initialize the log4j system properly.
"starting"
null , 5
null , 3
null , 3
null , 3
null , 3
null , 6
null , 3
null , 3
null , 6
null , 0
null , 3
null , 4
null , 5
"stopping"

Concluding

A really quick walkthrough but it gets the concepts across. Ultimately there’s no way of doing things that’s better than the other. Part of me wants to stick with Onyx, the configuration works well and the graph workflow is easier to map and change. Kafka Streams is important though and certainly worth a look if you are using Kafka 0.10.x, if you are still on 0.8 or 0.9 then Onyx, in my opinion, is still the best option.

 

 

Advertisements