<img height="1" width="1" src="https://www.facebook.com/tr?id=1076094119157733&amp;ev=PageView &amp;noscript=1">

Getting started with Kafka using Scala Kafka Client and Akka

Posted by David Piggott on Tue, May 17, 2016



In this post I will give an introduction to Kafka, covering what it is and what you might use it for, and I'll then explain how I used it to decouple the two sides of a smart metering power data submission system. The goal is for the introduction to Kafka itself to be generally useful, while the specific approach shown will translate to some but not all Kafka uses.

The Kafka introduction is intended for readers having no prior knowledge of or experience with it (which is where I was two weeks before starting to write this guide). My aim is for it to be functional as a standalone introduction, so by the time you finish you'll have enough of an overview to know what the terminology is for the things you need to see more examples of, which things you might need to read more about, and which things are clean abstractions you shouldn't need to know more about.

We'll cover:

  • What a distributed log is
    • When you might use one
  • Key Kafka concepts:
    • Topics
      • Subscriptions
    • Keys
    • Key and Message (de)serialization
    • Partitions
    • Replication
    • Offsets
      • Offset management
      • Commit strategies and their trade-offs
    • Producers
    • Consumers
      • Consumer groups
    • Use of ZooKeeper
    • Log compaction

After that we'll look at a couple of examples of using scala-kafka-client -- a set of helper modules for operating the Kafka Java Client Driver in Scala codebases -- as well as one showing how to use it to run Kafka as part of integration tests.


What a distributed log is

Kafka is a persistent distributed log. A log is an append only sequence of messages that allows random reads, and typically has bounded length.

A distributed log is one whose elements are spread across multiple nodes. The distribution can be setup to provide scalability and/or increased availability and durability.

In the case of Kafka, the length is bounded to seven days in time by default -- but this can be changed to a space bound, a combination, or no bound at all.

Generally messages are deleted from the other end of the log to keep its size within the configured bound (if any), and this is how Kafka is typically used, but it can also be configured to do something called log compaction, which we'll cover later.

Kafka is also flexible in terms of which consumers messages are delivered to -- more-so than other messaging systems [citation needed].

To try to categorise Kafka, let's define publish--subscribe to mean an indirect message delivery mechanism that results in all of a topic's subscribers receiving a copy of any given message submitted to that topic, and let's also define a message queue to be an indirect message delivery mechanism that results in only a single recipient receiving any given message.

Given those definitions, Kafka can't actually be described as just one or the other. It can be used in either or both ways, and the relevant concept here is the idea of consumer groups. We'll cover these later too.

The significance of the persistent part is that because messages are written directly to disk, even a single node deployment provides durability in the event of crashes; Kafka does not maintain any in-memory buffer or cache of its own. Despite this, it supports high read and write throughput. High write throughput is enabled by the simple linear log structure, which shows mechanical sympathy for both spinning and solid state drives; while the latter don't have significant seek latencies, there is write amplification to consider and minimise where possible. High read throughput is enabled by the combination of linearity and the disk and operating system's own buffers and caches.

When you might use one

Distributed logs can be used to solve a lot of problems that would otherwise exist when you're trying to maximise the decoupling of components in event based systems. I won't enumerate them all here, but I will note that many have to do with the additional requirements that come in when messages are being distributed significantly in time as opposed to just space; with distribution in time comes greater need for durability, and larger storage requirements. Kafka meets both of these needs.

To give just one example, let's consider the case of the smart metering power data submission system I said we'd look at.

Let's assume that the end devices -- i.e. the smart meters -- in such a system have little to no storage capacity, and so even if they do store power samples or batch them to some extent, they can only hold a small quantity. Therefore to minimise loss of usage data the submission endpoint needs to be highly available.

In the smart metering application we can increase availability by having a simple submission endpoint that accepts JSON POSTs from smart meters and after some basic validation just submits the received samples to a Kafka topic. This means the other parts of the system can be changed and redeployed without ever taking the submission endpoint offline. The processing and long term storage of submitted data could even then be done as an off-peak daily batch job if there weren't any reasons to process them in real-time.

Key Kafka concepts

To some readers the following will seem more like a syllabus than a reference, in that you'll want more detail. Others will already know everything mentioned and more. Keeping the goal of brevity in mind, I err on the side of less.


Admittedly a pretty common term in messaging, the topic is no surprise in Kafka. It identifies a class of messages. Kafka supports topic auto-creation, but this can also be switched off. Various Kafka behaviours can be controlled on a per-topic basis.


Messages in Kafka can be optionally keyed. If a message has a key, the key will be used to determine which partition to write it to. Keys are also, ahem, key to log compaction (covered later).


Each topic has a fixed number of partitions. The partition is the unit of distribution; they're core to the way Kafka distributes the messages within a given topic across the nodes in a cluster. They're also core to the way it allows multiple consumers to read from the same topic while also guaranteeing that each consumer reads messages in total order, and to the implementation of consumer groups.

For messages that are submitted with a key, the topic partition to write the message to is determined by Kafka using the hash of the key (though custom partitioners can be configured). For keyless messages Kafka picks a random partition (though it does so with some stickiness, as described in the Kafka FAQ).

For prototyping you can work with the default of one partition per topic and then tune the number later using this Confluent post as your guide.

Key and Message (de)serialization

Kafka does not impose any formatting/structure rules on your messages. As far as Kafka is concerned they're just opaque binary blobs. This means you're free to choose whatever serialization format you like. The Kafka client does provide some built-in (de)serializers that you can use as building blocks, including ones for byte arrays and strings.

In the smart metering power data submission example we're using for this post we'll serialize events as JSON strings. They're quick to get started with, they keep the example code relatively simple, and their human readability minimises the amount of tooling needed for debugging.

Beyond prototyping you'll want to think more carefully before committing to a message format. Language specific serializers may be more compact and faster than JSON, but what about when you need to introduce systems written using other languages? Options that were created with this scenario in mind include:

I'm not going to directly argue in favour of any single choice here but I will refer you to this Confluent post that makes compelling arguments in the respectively named sections for i) Pick[ing] A Single Data Format and ii) Us[ing] Avro as Your Data Format.


A broker is just a single instance of the Kafka service. I've used the term node synonymously with it throughout this post.


Replication is done at the partition level, while the number of replicas to maintain is set at the topic level. The default.replication.factor is 1.

With this default of 1, each partition within a topic is stored on a single node. With a replication factor of N, each partition within a topic will be stored on N nodes, meaning that N - 1 nodes can fail and the partitions (and topic) will still be available (though this ignores the possibility of the remaining node being overloaded by all the traffic now hitting it).

If you don't use replication you don't need to know any more than this. If you do, you'll want to read more to learn how things behave during failures. Two good resources are this LinkedIn post and the Kafka documentation's replication section.


A producer is a client that opens and maintains a connection with a Kafka cluster and then pushes messages to topics. The configuration parameter telling it how to find the cluster is bootstrap.servers.

The Java client's KafkaProducer is relatively simple, and this is reflected in its relatively simple documentation.

If you did the extra reading about replication, I should explain that what I mean by "maintains a connection with a Kafka cluster" is that the producer maintains connections with each node within the cluster that is the leader for a topic partition that the producer is pushing messages to.


They're the opposite of producers. A consumer pulls messages from topics. The configuration parameter telling it how to find the cluster is again bootstrap.servers.

The Java client's KafkaConsumer is not quite so simple, and this is reflected in its more comprehensive documentation. 


If messages were deleted from the distributed log as soon as they had been read, that would be enough to provide at most once delivery. But then it wouldn't be a log.

Offsets are the controls for where in a given partition a consumer reads from. The offset for a given topic's partition is the number of messages in from the first message submitted to the partition to read the next message from.

Kafka pushes the responsibility for tracking offsets out to consumers. This is part of what is meant when Kafka is described as exposing the nature of the distributed log to its consumers, and it's by doing this that it can provide the flexibility and performance that it does.

Offset tracking and commit strategies

I just that said Kafka makes consumers responsible for tracking their offsets. Now I'm going to tell you that Kafka can be responsible for tracking offsets. Sort of.

Users of the Java consumer have three options:

  • Have Kafka automatically commit offsets to itself every auto.commit.interval.ms milliseconds.
    • This is the simplest option, and if you use it you might never interact with offsets in your code. It has the downside of providing the weakest processing guarantees; if Kafka commits an updated offset before you've actually finished processing all those messages older than the new offset, those results will be lost on a crash and won't be regenerated on restart (because your consuming code won't see the messages a second time). If Kafka commits an updated offset after you've finished processing one or more of those messages, you'd end up processing the same messages again after a crash. So you don't get at-most-once-processing but nor do you get at-least-once-processing.
  • Have Kafka commit offsets to itself when told to by your consuming code.
    • If you commit an updated offset before you begin processing a message you can get at-most-once-processing. If you commit an updated offset after you begin processing a message you can get at-least-once-processing. But since the offsets are stored separately from the results you're generating, there is no atomicity, so you can't get exactly-once-processing this way.
  • Have Kafka not commit offsets at all, with the user taking on full responsibility for storing offsets elsewhere and for telling the client where to seek to after restarts.
    • If you atomically commit offsets alongside the results of message processing to the same external store, you can get exactly-once-processing.

For all three options there is a configuration parameter you should be aware of: auto.offset.reset. When a consumer starts reading from a topic for the first time it doesn't have any previous offset to resume from, so auto.offset.reset exists to control where such consumers begin reading from. By default it's set to latest, which means consumers will only see messages pushed to the topic after they first read from it. It can also be set to earliest to make new consumers play through all messages in the topic, or to none to prevent there being any default -- in other words causing an exception to be thrown if consumers don't manually seek to an initial offset when they first read from a topic.

With the first two options I said Kafka can commit offsets to itself but I didn't fully explain where it stores them. There are actually two possible locations: ZooKeeper or Kafka itself. In earlier versions ZooKeeper was the only option, but it was found not to perform so well at this job, hence the addition of the Kafka option in 0.8.2. The ZooKeeper support remains in place to support upgrading older clients; Kafka is the default in newer versions and there isn't any configuration you need to set. What's quite neat is that offsets are actually just stored in compacted topics (we'll cover log compaction really soon). The last paragraph of the Offset Management section of the 0.8.2 announcement has a little bit more on this; it's not essential reading, but it is interesting.

Consumer groups

In the description I gave of what a distributed log is, I said that Kafka can't be categorised as providing publish--subscribe XOR typical message queue semantics (where publish--subscribe means every subscriber receives a copy of a given message, while "typical message queue semantics" means each message is delivered to only one recipient).

Consumer groups are key to Kafka's flexibility here. When a consumer subscribes to a topic it specifies a consumer group to subscribe as. The existence of a consumer group is implied by using its name during subscription; no explicit creation is required.

A consumer group can also be thought of as as "logical subscriber" in publish--subscribe terminology. What this means is that every consumer group that is subscribed to a topic will receive a copy of each message published to that topic (within the bounds of the offsets each consumer within the group reads from). But within a given consumer group the behaviour is like a message queue: each incoming message is delivered to exactly one of the group's consumers. This behaviour can be used to distribute the work of processing messages across multiple consumer nodes.

Kafka implements this consumer group functionality using partitions. Depending on how you use them it may or may not be important that you understand the consequences this has for how messages are distributed within consumer groups and the ordering guarantees your consumers get. If in doubt, this is something that probably is worth knowing; the Kafka documentation's consumers section has more.

Use of ZooKeeper

There's not much that needs to be said about ZooKeeper from the perspective of writing Kafka producers and consumers, which is the perspective this post is written from and for. What you should know is that Kafka does require ZooKeeper. It's not an option. For working with Kafka from other perspectives you will no doubt need to know more, but this is not the post that can tell you those things. The TL;DR is this: Kafka uses ZooKeeper for basically ALL THE METADATA, with topic offsets being one notable exception.

Log compaction

Log compaction is a neat feature that can be used with keyed messages, but it's not the commonly used mode of operation [citation needed]. That combined with the fact that none of the other above concepts depend on it is why I'm describing it last.

When topics running in the normal mode of operation (the mode is called 'delete') reach their configured length bound (whether that be a time and/or space bound), messages are deleted from the tail of the log.

In log compaction mode (the mode is called 'compact'), instead of deleting messages from the tail of the log when the length bound is reached, Kafka builds a new version of the log, retaining on a key-by-key basis only the most recent message that was written to the old un-compacted log.

If it helps, log compaction can be thought of -- in a pretty hand-wavey way -- as being analogous to squashing a sequence of Git commits; the end states are the same, you just lose the history.

This Kafka Wiki page has more.


scala-kafka-client is a set of three modules built to help using Kafka from Scala and Akka:

  • scala-kafka-client. A minimal Scala wrapper around the Java client API, providing some helpers for convenient configuration the client and usage from Scala.

  • scala-kafka-client-akka. Provides an Asynchronous and non-blocking Kafka Consumer that can be convenient when developing applications with Akka. The KafkaConsumerActor has buffering capabilities to increase throughput as well as some helpers to provide easy configuration.

  • scala-kafka-client-tesktkit. Supports integration testing of Kafka client code by providing helpers that can start an in-process Kafka and ZooKeeper server.

Rather than duplicate the documentation in the README and Wiki, I'll give one slightly more detailed usage example for each module. Each one is taken from the smart metering power data submission system mentioned earlier. All three examples are built against the 0.7.0 release.

scala-kafka-client's KafkaProducer

The KafkaProducer is a (very) lightweight Scala wrapper around the official Java KafkaProducer. The main two improvements it provides are:

  1. Unlike the Java KafkaProducer, which just takes configuration as a Map<String, Object> parameter, construction is via a Conf object that has this helpful Conf.apply function:

    def apply[K, V](keySerializer: Serializer[K],
                    valueSerializer: Serializer[V],
                    bootstrapServers: String = "localhost:9092",
                    acks: String = "all",
                    retries: Int = 0,
                    batchSize: Int = 16384,
                    lingerMs: Int = 1,
                    bufferMemory: Int = 33554432): Conf[K, V]

    There's also the option to create Conf objects from a Typesafe config:

    def apply[K, V](config: Config, keySerializer: Serializer[K], valueSerializer: Serializer[V]): Conf[K, V]

    In this case the configuration names and values must match Kafka's ProducerConfig style. The example that follows uses this option.

  2. You get Scala Futures as the result of sends, rather than Java Futures.

Here's what usage of the Scala KafkaProducer looks like in our smart metering power data submission system:

import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord}
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer

class SampleSubmitter(config: Config) {

  private val producer = KafkaProducer(
      keySerializer = new StringSerializer,
      valueSerializer = new JsonSerializer[SubmitSampleCommand])

  private val topic = config.getString("topic")

  def submitSample(meterId: MeterId, submitSampleCommand: SubmitSampleCommand) = producer.send(
    KafkaProducerRecord(topic, meterId.id.toString, submitSampleCommand)

  def close() = producer.close()


Each instance of the Akka HTTP submission frontend instantiates one of these SampleSubmitters and forwards all accepted client POSTs (unmarshalled as SubmitSampleCommands) to it.

For reference, the config block passed in is defined as:

kafka {
  bootstrap.servers = "kafka:9092"
  topic = "samples"

MeterId is:

import java.util.UUID

case class MeterId(id: UUID)

object MeterId {

  def generate = MeterId(UUID.randomUUID)


And SubmitSampleCommand is:

import play.api.libs.functional.syntax._
import play.api.libs.json.Reads._
import play.api.libs.json.Json

case class SubmitSampleCommand(timestamp: Long, power: Double) {
  require(timestamp > 0)

object SubmitSampleCommand {

  implicit val SubmitSampleCommandFormat = (
    (JsPath \ "timestamp").format[Long](min(0L)) and
      (JsPath \ "power").format[Double]
    ) (SubmitSampleCommand.apply, unlift(SubmitSampleCommand.unapply))


For the brevity and simplicity of this example we're just serializing our messages as JSON strings. But for production code you'll want to choose something that gives you a way to handle schema changes -- as noted in the earlier section on serialization.

That said, here's JsonSerializer:

import java.util

import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
import play.api.libs.json.{Json, Writes}

public class JsonSerializer[A: Writes] extends Serializer[A] {

  private val stringSerializer = new StringSerializer

  override def configure(configs: util.Map[String, _], isKey: Boolean) =
    stringSerializer.configure(configs, isKey)

  override def serialize(topic: String, data: A) =
    stringSerializer.serialize(topic, Json.stringify(Json.toJson(data)))

  override def close() =


scala-kafka-client-akka's KafkaConsumerActor

The official Java KafkaConsumer simply provides a blocking poll method. The intended usage pattern is that the caller loops in a blocking client poll thread. Unfortunately there isn't a convenient way to use interfaces like this directly in actor systems. That's where the KafkaConsumerActor comes in.

When you create a KafkaConsumerActor you provide it with a downstream actor. The consumer actor then reads messages from Kafka and forwards them on to your downstream actor.

The consumer actor won't send your downstream actor any more messages until you reply to it with a confirmation that you received and processed those that it already sent. To ensure delivery, the consumer actor can optionally resend unconfirmed messages after a configurable timeout.

To provide a smooth flow of messages to your downstream actor, KafkaConsumerActor maintains its own buffer of messages that it has read from Kafka and is ready to send downstream. This way there is minimal delay between confirming receipt of one set of messages and receiving the next (provided Kafka itself can saturate the consumer).

KafkaConsumerActor supports all three of the offset commit choices supported by the official Java KafkaConsumer that we identified earlier.

In our example power samples are forwarded on to cluster-sharded persistent actors, and we're using the manual offset commit strategy, having Kafka store them.

Here's what usage of the KafkaConsumerActor looks like in our smart metering power data submission system:

import java.util.UUID

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe, Unsubscribe}
import cakesolutions.kafka.akka.{ConsumerRecords, KafkaConsumerActor}
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringDeserializer

object SampleAcceptorActor {

  def props(config: Config, meterShardRegion: ActorRef) = Props(new SampleAcceptorActor(config, meterShardRegion))

  private val extractor = ConsumerRecords.extractor[String, SubmitSampleCommand]


class SampleAcceptorActor(config: Config, meterShardRegion: ActorRef) extends Actor with ActorLogging {

  private val kafkaConsumerActor = context.actorOf(
      consumerConf = KafkaConsumer.Conf(
        keyDeserializer = new StringDeserializer,
        valueDeserializer = new JsonDeserializer[SubmitSampleCommand]
      actorConf = KafkaConsumerActor.Conf(config),

  override def preStart() = {
    kafkaConsumerActor ! Subscribe()

  override def postStop() = {
    kafkaConsumerActor ! Unsubscribe

  override def receive = {

    // extractor recovers the type parameters of ConsumerRecords, so pairs is of type Seq[(Option[String], SubmitSampleCommand)]
    case extractor(consumerRecords) =>

      consumerRecords.pairs.foreach {
        case (None, submitSampleCommand) => log.error(s"Received unkeyed submit sample command: $submitSampleCommand")
        case (Some(meterIdUuidString), submitSampleCommand) =>
          meterShardRegion ! EnvelopedMessage(

       // By committing *after* processing we get at-least-once-processing, but that's OK here because we can identify duplicates by their timestamps
      kafkaConsumerActor ! Confirm(consumerRecords.offsets, commit = true)



In this case the config block is defined as:

  kafka {
    bootstrap.servers = "kafka:9092"
    topics = ["samples"]
    group.id = "metering"
    auto.offset.reset = "earliest"

And here's JsonDeserializer:

import java.util

import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import play.api.libs.json.{Json, Reads}

public class JsonDeserializer[A: Reads] extends Deserializer[A] {

  private val stringDeserializer = new StringDeserializer

  override def configure(configs: util.Map[String, _], isKey: Boolean) =
    stringDeserializer.configure(configs, isKey)

  override def deserialize(topic: String, data: Array[Byte]) =
    Json.parse(stringDeserializer.deserialize(topic, data)).as[A]

  override def close() =


One note of caution regarding failure handling: While KafkaConsumerActor transparently retries in the event of failures communicating with Kafka (and it has highly configurable and extensible retry strategies controlling this), if a defect in KafkaConsumerActor itself causes the actor to be restarted, the newly started instance won't remember the subscription that you opened with the old one. So, you'll need to detect this via some form of supervision and resend a subscription command to the newly started instance. For brevity I've omitted such code from this example, but it's something you should include in any long-running production system using KafkaConsumerActor.

scala-kafka-client-testkit's KafkaServer

The KafkaServer is simple and very useful. It gives you an in-process Kafka server that you can run integration tests against (it also starts an in-process ZooKeeper server for Kafka to use). Create one with the nullary constructor, call startup, and that's it. Just call close when done.

Here's an example of using it:

import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.testkit.{KafkaServer, TestUtils}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import play.api.libs.json.Json

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.Random

class SampleSubmitterSpec extends WordSpec with Matchers with BeforeAndAfterAll {

  private val kafkaServer = new KafkaServer

  private val baseConfig = ConfigFactory.parseString(
       |  bootstrap.servers = "localhost:${kafkaServer.kafkaPort}"

  private val topic = "samples"

  private val consumer = KafkaConsumer(KafkaConsumer.Conf(
         |  topics = ["$topic"]
         |  group.id = "testing-${TestUtils.randomString(5)}"
         |  auto.offset.reset = "earliest"
    keyDeserializer = new StringDeserializer(),
    valueDeserializer = new StringDeserializer()

  override def beforeAll() = kafkaServer.startup()

  override def afterAll() = kafkaServer.close()

  "A Sample Submitter" must {
    "forward commands to Kafka" in {

      val sampleSubmitter = new SampleSubmitter(
             |  topic = "$topic"

      val meterId = MeterId.generate
      val submitSampleCommand = SubmitSampleCommand(
        timestamp = System.currentTimeMillis,
        power = {
          val bedrooms = Random.nextInt(4) + 1
          Math.random * bedrooms * 1000

      val records = consumer.poll(30.seconds.toMillis)
      records.count shouldBe 1
      val record = records.asScala.toList.head
      record.key shouldBe meterId.id.toString
      record.value shouldBe Json.stringify(Json.toJson(submitSampleCommand))





You've hopefully now got a good overview of Kafka, with an understanding of the following:

  • That the distributed log is core to the features it provides and the properties it has.
  • What topics and partitions are.
  • What keyed messages are.
  • That Kafka does not care how you serialize your messages, but that you should.
  • What producers, consumers, consumer groups, subscriptions and offsets are, the trade-offs of different offset commit strategies, and when and where to learn to more about how partitioning can impact the distribution of work within consumer groups.
  • What log compaction is.
  • A bit about replication -- namely that it's enabled by partitioning -- and when and where to learn more.

I also explained why the official Java client is not ideal for use from Scala and from within actor systems, and how this motivated the development of scala-kafka-client. We then saw examples showing that it makes producing to and consuming from Kafka topics really simple.

Thanks for reading, and happy hAkking!

The Repository

Here is a link to our open source project:


Further Reading

Need help on your next project?

Topics: Scala, Akka, Streaming, Kafka

Recent Posts

Posts by Topic

see all

Subscribe to Email Updates