<img height="1" width="1" src="https://www.facebook.com/tr?id=1076094119157733&amp;ev=PageView &amp;noscript=1">

Comparison of Apache Stream Processing Frameworks: Part 1

Posted by Petr Zapletal on Tue, Feb 2, 2016

 

A couple of months ago we were discussing the reasons behind increasing demand for distributed stream processing. I also stated there was a number of available frameworks to address it. Now it’s a time have a look at them and discuss their similarities and differences and their, from my opinion, recommended use cases.

As you probably suspect, distributed stream processing is continuous processing, aggregation and analysis of unbounded data. It’s a general computation model as MapReduce, but we expect latencies in mills or in seconds. These systems are usually modeled as Directed Acyclic Graphs, DAGs.

DAG is a graphical representation of chain of tasks and we use it for description of the topology of streaming job. I’ll help myself with terminology from Akka streams a little bit. So as you can see in the picture below, data flows through chain of processors from sources to sinks which represents the streaming task. And speaking about Akka streams, I think it’s very important to emphasize the word distributed. Because even local solutions can create and run DAG but we’re going to focus only to solutions running on multiple machines

Screen_Shot_2015-12-28_at_16.36.48.png

Points of Interest

When choosing between different systems there are a couple of points we should take care of. So let’s start with the Runtime and Programming model. The programming model provided by a platform determines a lot of its features and it should be sufficient to handle all possible use-cases for the application. This is a really crucial topic and I’ll come back very soon. 

The Functional Primitives exposed by a processing platform should be able to provide rich functionalities at individual message level like map or filter, which are pretty easy to implement even if you want to scale a lot. But it should also provide across messages functionality, like aggregations, and across stream operations like joins for example, which are much harder to scale.

State Management - Most of the applications have stateful processing logic that requires maintaining a state. The platform should allow us to maintain, access and update the state information.

For Message Delivery Guarantees, we have a couple of classes like at most once, at least once and exactly once. At most once delivery means that for each message handed to the mechanism, that message is delivered zero or one times, so messages may be lost. At least once delivery means that for each message handed to the mechanism potentially multiple attempts are made at delivering it, such that at least one succeeds. Messages may be duplicated but not lost. And finally exactly once delivery means that for each message handed to the mechanism exactly one delivery is made to the recipient, the message can neither be lost nor duplicated. So another important thing to consider.

Failures can and will happen at various levels - for example network partitions, disk failures or nodes going down and so on. Platform should be able to recover from all such failures and resume from their last successful state without harming the result

And then, we have more performance related requirements like Latency, Throughput and Scalability which are extremely important in streaming applications.

We should also take care of Maturity and Adoption Leve this information could give us a clue about potential support, available libraries or even stackoverflow answers. And it may help us a lot when choosing correct platform.

And also, last but not least, the Ease of Development and Ease of Operability. It is great when we have super fancy system which covers all our use cases but if we cannot write a program for it or we cannot deploy it we are done anyway

Runtime and Programming Model

Runtime and Programing model, is probably the most important trait of the system because it defines its expressiveness, possible operations and future limitations. Therefore it defines system capabilities and its use cases.

There are two distinctive approaches how to implement streaming system. First one is called the native streaming. It means all incoming records, or events if you want, are processed as they arrive, one by one.


Screen_Shot_2015-12-28_at_16.39.08.png

Second approach is called micro-batching. Short batches are created from incoming records and go through the system. These batches are created according to pre-defined time constant, typically every couple of seconds.

Screen_Shot_2015-12-28_at_16.40.36.png


Both approaches have inherent advantages and disadvantages. Let’s start with native streaming. The great advantage of native streaming is its expressiveness. Because it takes stream as it is, it is not limited by any unnatural abstraction over it. Also as the records are processed immediately upon arrival, achievable latencies of these systems are always better than its micro-batching companions. Apart from that, stateful operations are much easier to implement, as you’ll see later in this post. Native streaming systems have usually lower throughput and fault-tolerance is much more expensive as it has to take care (~persist & replay) of every single record. Also load-balancing is kind of issue. For example, let’s say we have data partitioned by a key and we want to process it. If the processing of some key of the partition is more resource intensive for any reason, this partition quickly becomes the job’s bottleneck

Secondly, the micro-batching. Splitting stream into micro-batches inevitably reduces system expressiveness. Some operations, especially state management or joins and splits, are much harder to implement as systems has to manipulate with whole batch. Moreover, the batch interval connects two things which should never be connected - an infrastructure property and a business logic. On the contrary, fault tolerance and load balancing are much simpler as systems just sends every batch to a worker node and if something goes wrong it just use the different. Lastly, it is good to remark, we can build micro-batching system atop native streaming quite easily

Programming models can be classified as Compositional and Declarative. Compositional approach provides basic building blocks like sources or operators and they must be tied together in order to create expected topology. New components can be usually defined by implementing some kind of interfaces. On the contrary, operators in declarative API are defined as higher order functions. It allows us to write functional code with abstract types and all its fancy stuff and the system creates and optimizes topology itself. Also declarative APIs usually provides more advanced operations like windowing or state management out of the box. We are going to have look at some code samples very soon.

Apache Streaming Landscape

There is a number of diverse frameworks available and it is literally impossible to cover all of them. So I’m forced to limit it somehow and I want to go for popular streaming solutions from Apache landscape which also provide Scala API. Therefore We’re going to focus on Apache Storm and its sibling Trident and on streaming module of very popular Spark. We’re also going talk about streaming system behind linkedIn named Samza and finally, we’re going to discuss promising Apache project Flink. I believe this is a great selection, because even if all of them are streaming systems, they approach various challenges very differently. Unfortunately I won’t talk about proprietary systems like Google MillWheel or Amazon Kinesis and also we’re going to miss interesting but still limitedly adopted systems like Intel GearPump or Apache Apex. That may be for a next time.

Screen_Shot_2015-12-28_at_16.43.04.png
Apache Storm was originally created by Nathan Marz and his team at BackType in 2010. Later it was acquired and open-sourced by Twitter and it became apache top-level project in 2014. Without any doubts, Storm was a pioneer in large scale stream processing and became de-facto industrial standard. Storm is a native streaming system and provides low-level API. Also, storm uses Thrift for topology definition and it also implements Storm multi-language protocol this basically allows to implement our solutions in large number of languages, which is pretty unique and Scala is of course of them.

Trident is a higher level micro-batching system build atop Storm. It simplifies topology building process and also adds higher level operations like windowing, aggregations or state management which are not natively supported in Storm. In addition to Storm's at most once, Trident provides exactly once delivery, on the contrary of Storm’s at most once guarantee. Trident has Java, Clojure and Scala APIs.

As we all know, Spark is very popular batch processing framework these days with a couple of built-in libraries like SparkSQL or MLlib and of course Spark Streaming. Spark’s runtime is build for batch processing and therefore spark streaming, as it was added a little bit later, does micro-batching. The stream of input data is ingested by receivers which create micro-batches and these micro-batches are processed in similar way as other Spark’s jobs. Spark streaming provides high-level declarative API in Scala, Java and Python.

Samza was originally developed in LinkedIn as proprietary streaming solution and with Kafka, which is another great linkedIn contribution to our community, it became key part of their infrastructure. As you’re going to see a little bit later, Samza builds heavily on Kafka’s log based philosophy and both together integrates very well. Samza provides compositional api and of course Scala is supported.

And the last but least, Flink. Flink is pretty old project, it has it’s origins in 2008, but right now is getting quite a lot of attention. Flink is native streaming system and provides a high level API. Flink also provides API for batch processing like Spark, but there is a fundamental distinction between those two. Flink handles batch as a special case of streaming. Everything is a stream and this is definitely better abstraction, because this is how the world really looks like.

That was a quick introduction of the systems and, as you can see on the table below, they do have pretty different traits.

Screen_Shot_2015-12-28_at_16.44.49.png

Counting Words

Wordcount is something like hello world of stream processing, it nicely shows main differences between our frameworks. Let’s start with Storm and please note, the example was simplified significantly. 

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("spout", new RandomSentenceSpout(), 5);
 builder.setBolt("split", new Split(), 8).shuffleGrouping("spout");
 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

 ...

 Map<String, Integer> counts = new HashMap<String, Integer>();

 public void execute(Tuple tuple, BasicOutputCollector collector) {
   String word = tuple.getString(0);
   Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1;
   counts.put(word, count);
   collector.emit(new Values(word, count));
 }

First, let’s have a look at its topology definition. As you can see at line 2, we have to define a spout, or if you want, a source. And then, there is a bold, a processing component, which splits the text into the words. Then I have defined another bolt for actual word count calculation at line 4. Also have a look at the magic numbers 5, 8 and 12. These are the parallelism hints and they define how many independent threads around the cluster will be used for execution of every component. As you can see, all is very manual and low-level. Now, let’s have a look (lines 8 - 15) how is the actual WordCount bolt implemented. As long as Storm does not have in-build support for managed state so I have defined a local state, which is far from being ideal but good as a sample. Apart of that it is not very interesting, so let’s move on and have a look at Trident.

As I mentioned before, Trident is Storm’s micro-batching extension and Trident, apart of many other goodies, provides state management which is pretty useful when implementing wordcount.

public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...

TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"));

...

}

As you can see, I could use higher level operations like each (line 7) and groupBy (line 8), so it’s a little bit better. And also I was able to use Trident managed state for storing counts at line 9. 


Now it is time for a pretty declarative API provided by Apache Spark. Also keep in mind, on the contrary to the previous examples, which were significantly simplified, this is nearly all code you need, to run this simple streaming wordcount.

val conf = new SparkConf().setAppName("wordcount")
val ssc = new StreamingContext(conf, Seconds(1))

val text = ...

val counts = text.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)

counts.print()

ssc.start()
ssc.awaitTermination()

Every spark streaming job requires StreamingContext which is basically the entry point to the streaming functionality. StreamingContext takes a configuration which is, as you can see at line 1, in our case very limited, but more importantly, it defines its batch interval (line 2), which is set to 1 second. Now you can see whole word count computation (lines 6 - 8), quite a difference, isn’t it ? That’s the reason why Spark is sometimes called Distributed Scala. As you can see, it’s quite standard functional code and spark takes care of topology definition and its distributed execution. And now, at line 12, the last part of every spark streaming job - starting the computation, just keep in mind, once started job cannot be modified.

Now let’s have a look at Apache Samza, another representative of compositional API.

class WordCountTask extends StreamTask {

override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector,
coordinator: TaskCoordinator) {

val text = envelope.getMessage.asInstanceOf[String]

val counts = text.split(" ").foldLeft(Map.empty[String, Int]) {
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts))

}

The Topology is defined in samza’s properties file, but for the sake of clarity, you won’t find it here. For us it’s important the task has defined input and output channels and the communication goes through kafka topics. In our case the whole topology is the WordCountTask, which does all the work. In Samza the components are defined by implementing particular interfaces, in this case it’s a StreamTask and I’ve just overridden method process at line 3. Its parameter list contains all what’s need for connecting with the rest of the system. The computation itself at lines 8 - 10 is just a simple Scala.

Now let’s have a look at Flink, as you can see API is pretty similar to spark streaming, but notice we are not setting any batch interval.

 val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val counts = text.flatMap ( _.split(" ") )
.map ( (_, 1) )
.groupBy(0)
.sum(1)

counts.print()

env.execute("wordcount")

Computation itself is pretty straight forward. As you can see, there’s just a couple of functional calls and Flink takes care of its distributed computation.

Conclusion

That’s all folks for today. We went through the necessary theory and also introduced popular streaming framework from Apache landscape. Next time, we’re going to dig a little bit deeper and go through the rest of the points of interest. Hope you find it interesting and stay tuned. Also if you have any questions, don’t hesitate to contact me as I’m always happy to discus the topic.

Recent Posts

Posts by Topic

see all

Subscribe to Email Updates