Brief discussion on Streaming and Data Processing Pipeline Technologies. – clarifies few streaming myths !

Myth 1: There’s no streaming without batch (the Lambda Architecture)
Myth 2: Latency and Throughput: Choose One
Myth 3: Micro-batching means better throughput
Myth 4: Exactly once? Completely impossible.
Myth 5: Streaming only applies to “real-time”
Myth 6: So what? Streaming is too hard anyway.

Apache Beam Data Processing Pipeline

Lets see how Apache Beam has simplified real-time streaming through Data Processing Pipelines.

Question ~ How can we apply Map-Reduce Programming Model on time-sensitive data which can be infinitely big, completely unordered, unbounded with unknown delays (fast/late)

Answer ~ Adopt Apache Beam  slide link


Lets look at a concrete example:



Pipeline p = Pipeline.create(options);
//input for this transform is the PCollection of text lines 
//generated by the TextIO.Read transform
// Anonymous DoFn executed on each element that tokenizes the text lines into individual words.
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       for (String word : c.element().split("[^a-zA-Z']+")) {
         if (!word.isEmpty()) {
//  each element in output PCollection represents an individual 
word in the text.

Conceptually , Apache Beam allows users to focus on~

~ Whats being computed (elements / aggregates / composites) ?   —  types of transformations within the pipeline. This includes things like computing sums, building histograms, training machine learning models

~ Where in event time ?  — use of event-time windowing (fixed / sliding / sessions)

~ When in processing time ?  — use of watermarks (when input for a given window is complete) and triggers (specification of early results (for speculative, partial results emitted before the window is complete) and late results (for cases where the watermark is only an estimate of completeness, and more input data may arrive after the watermark claims the input for a given window is complete)

~ How do the refinements of results relate ? — discarding (where results are all independent and distinct), accumulating (where later results build upon prior ones), or accumulating and retracting (where both the accumulating value plus a retraction for the previously triggered value(s) are emitted)

Following diagram nicely depicts the What / Where / When / How

Ref :



Concepts ~

In Dataflow, the lifetime of windows is bound by the progression of the watermark: after the watermark proceeds past some user-specified horizon beyond the end of a window, a final result for that window is produced, and the state for it garbage collected. If inputs become delayed for some reason, and the watermark is able to capture that fact then the Dataflow pipeline will continue to provide correct answers.

Another important concept is Session ~ which is sequence of events terminated by a gap of inactivity larger than some timeout.

Beam’s most important data structure is PCollection ~ very large immutable bag of elements encoded as byte string

PCollection<String> raw =;
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input.apply(Sum.integersPerKey());

Another important Data Structure is PCollectionTuple

 * // Create a PCollectionTuple with three PCollections:
 * PCollectionTuple pcs =
 *     PCollectionTuple.of(tag1, pc1)
 *                     .and(tag2, pc2)
 *                     .and(tag3, pc3);
 * // Create an empty PCollectionTuple:
 * Pipeline p = ...;
 * PCollectionTuple pcs2 = PCollectionTuple.empty(p);
 * // Get PCollections out of a PCollectionTuple, using the same tags
 * // that were used to put them in:
 * PCollection<Integer> pcX = pcs.get(tag2);
 * PCollection<String> pcY = pcs.get(tag1);
 * PCollection<Iterable<String>> pcZ = pcs.get(tag3);

Beam automatically distributes the processing code (DoFns) of ‘ParDo Transforms’ to multiple workers in parallel.

ParDo processing ~ exactly similar to Mapper class of ‘Map/Shuffle/Reduce’ paradigm !

Apache Beam Key Features:

— Dataflow service guarantees that every element in your input PCollection is processed by a DoFn instance exactly once.
— Dataflow service does not guarantee how many times a DoFn will be invoked.
— Dataflow service does not guarantee exactly how the distributed elements are grouped—that is, it does not guarantee which (if any) elements are processed together.
— The Dataflow service does not guarantee the exact number of DoFn instances that will be created over the course of a pipeline.
— The Dataflow service is fault-tolerant, and may retry your code multiple times in the case of worker issues.

— The Dataflow service may create backup copies of the code, and can have issues with manual side effects (such as if your code relies upon or creates temporary files with non-unique names).
— The Dataflow service serializes element processing per DoFn instance. Your code does not need to be strictly thread-safe; however, any state shared between multiple DoFn instances must be thread-safe.

— In Dataflow, the periodic updates are driven by triggers

Dataflow provides Autoscaling and Dynamic Work Rebalancing

References and Tutorials:

Spark Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on top of Spark SQL


$ bin/run-examplesql.streaming.JavaStructuredKafkaWordCount host1:port1,host2:port2 subscribe topic1,topic2

SparkSession spark = SparkSession

// Create DataSet representing the stream of input lines from kafka
    Dataset<String> lines = spark
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .selectExpr("CAST(value AS STRING)")

// Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
    }, Encoders.STRING()).groupBy("value").count();

// Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()

Example of Windowed Count
// Split the lines into words, retaining timestamps
    Dataset<Row> words = lines
      .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
        new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
          public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
            List<Tuple2<String, Timestamp>> result = new ArrayList<>();
            for (String word : t._1.split(" ")) {
              result.add(new Tuple2<>(word, t._2));
            return result.iterator();
        Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
      ).toDF("word", "timestamp");

// Group the data by window and word and compute the count of each group
    Dataset<Row> windowedCounts = words.groupBy(
      functions.window(words.col("timestamp"), windowDuration, slideDuration),


While we are discussing about Spark Streaming, its worth mentioning ~ once can build a comprehensive ETL pipeline using Apache NiFi (the Swiss Knife of Dataflow)

Guaranteed Delivery:
Data Buffering w/Back Pressure and Pressure Release:
Prioritizing Queue:
Flow based QOS (low latency high throughput loss tolerance)

A new product suite ~ kylo will soon be open sourced to enable ~ comprehensive ‘Big Data Ingestion and Analysis’ using Nifi and Spark

Kafka Streams

Kafka is the most important component in the streaming system. So its very encouraging to know about Kafka Streaming

Since developers already use Kafka as the de-facto distributed messaging queue, Streaming DSL comes very handy.

 Kafka-Streaming without DSL

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

KStreamBuilder builder = new KStreamBuilder();

// TODO: the following can be removed with a serialization factory

Map<String, Object> serdeProps = new HashMap<>();
final Serializer<PageView> pageViewSerializer = new JsonPOJOSerializer<>();

serdeProps.put("JsonPOJOClass", PageView.class);
pageViewSerializer.configure(serdeProps, false);
final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();

serdeProps.put("JsonPOJOClass", PageView.class);
pageViewDeserializer.configure(serdeProps, false);

final Serde<PageView> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);

final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();

serdeProps.put("JsonPOJOClass", UserProfile.class);
userProfileSerializer.configure(serdeProps, false);

final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();

serdeProps.put("JsonPOJOClass", UserProfile.class);
userProfileDeserializer.configure(serdeProps, false);

final Serde<UserProfile> userProfileSerde = Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer);

final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();

serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
wPageViewByRegionSerializer.configure(serdeProps, false);

final Deserializer<WindowedPageViewByRegion> wPageViewByRegionDeserializer = new JsonPOJODeserializer<>();

serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
wPageViewByRegionDeserializer.configure(serdeProps, false);

final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde = Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer);

final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();

serdeProps.put("JsonPOJOClass", RegionCount.class);
regionCountSerializer.configure(serdeProps, false);

final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", RegionCount.class);
regionCountDeserializer.configure(serdeProps, false);

final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);

KStream<String, PageView> views =, pageViewSerde, "streams-pageview-input");

KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, "streams-userprofile-input");

KStream<WindowedPageViewByRegion, RegionCount> regionCount = views.leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
    public PageViewByRegion apply(PageView view, UserProfile profile) {

      PageViewByRegion viewByRegion = new PageViewByRegion();
      viewByRegion.user = view.user; =;
      if (profile != null) {
          viewByRegion.region = profile.region;
      } else {
          viewByRegion.region = "UNKNOWN";
      return viewByRegion;
   }).map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
     public KeyValue<String, PageViewByRegion> apply(String user,     
        PageViewByRegion viewRegion) {

         return new KeyValue<>(viewRegion.region, viewRegion);
}).countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())     

.map(new KeyValueMapper<Windowed<String>, Long,   KeyValue<WindowedPageViewByRegion, RegionCount>>() {

       public KeyValue<WindowedPageViewByRegion, RegionCount> 
          apply(Windowed<String> key, Long value) {

          WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();

          wViewByRegion.windowStart = key.window().start();
          wViewByRegion.region = key.key();                          
          RegionCount rCount = new RegionCount();
          rCount.region = key.key();

          rCount.count = value;
          return new KeyValue<>(wViewByRegion, rCount);

// write to the result topic, regionCountSerde, "streams-pageviewstats-typed-output");

 KafkaStreams streams = new KafkaStreams(builder, props);

// usually the stream application would be running forever,

// in this example we just let it run for some time and stop since the input data is finite.

Kafka-Streaming with DSL

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")

// create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")

.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

 // connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");

KStreamBuilder builder = new KStreamBuilder();
KStream source1 ="topic1", "topic2");
KTable source2 = builder.table("topic3", "stateStoreName");

//Transform a Stream
KStream mapped = source1.mapValue(record -> record.get("category"));

KTable, Long> counts = source1.groupByKey().aggregate(
  () -> 0L,  // initial value
  (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value

TimeWindows.of("counts", 5000L).advanceBy(1000L), 
 // intervals in milliseconds
 Serdes.Long() // serde for aggregated value

KStream joined = source1.leftJoin(source2,(record1, record2) -> record1.get("user") + "-" + record2.get("region"););

// writing streams back to kafka"topic4");

// filter, map, mapValues, etc, are stateless transformation operations and can be applied to both KStream and KTable

Reactive KAFKA

Reactor Kafka API enables messages to be published to Kafka and consumed from Kafka using functional APIs with non-blocking back-pressure and very low overheads.

MapR Streams

  • MapR Streams offer – full independence of the producer and consumer – No more overhead due to brokers
  • Persistence – with complete isolation of producer and consumer to work and strong reliability and consistency (leverages MapR-FS and MapR-DB’s core persistence library)
  • Otherwise, messages will disappear if the producer and consumer are not coordinated to take the delivery as soon as the data appears.
  • MapR Streams includes a new file system object type known as a stream that has no parallel in Kafka.
  • Streams are first-class objects in the MapR file system, alongside files, directories, links, and NoSQL tables.
  • A Kafka cluster consists of a number of server processes called brokers that collectively manage message topics, while a MapR cluster has no equivalent of a broker.
  • Topics and partitions are stored in the stream objects on a MapR cluster.
  • MapR continuously syncs up all metadata (message offsets , consumer cursors) along with actual data without any performance penalty , where as keeping data synced in kafka is really tough.
  • Each MapR stream can contain billions of topics and partitions, and each MapR cluster can have billions of streams. In comparison, it is not considered good practice to have more than about a thousand partitions on any single Kafka broker.
  • MapR streams can be replicated to different clusters across intermittent network connections. The replication pattern can contain cycles without causing problems, and streams can be updated in multiple locations at once. Message offsets are pre served in all such replicated copies. Fast Global Replication – agnostic of Topology and provides best fail-over mechanism.
  • The distribution of topic partitions and portions of partitions across a MapR cluster is completely automated, with no administrative actions required. This is different from Kafka, where it is assumed that administrators will manually reposition parti tion replicas in many situations.
  • The streams in a MapR cluster inherit all of the security, permissions, and disaster-recovery capabilities of the basic MapR platform.
  • Enormously high rates of messages/second.
    MapR Streams + Flink offer 10x better performance than Kafka+Flink (bypasses Kafka network saturation issues)
  • Prefer Flink Data Generator to minimize conversion between source -> message streams -> Flink
  • Streams + Flink can utilize off-heap memory. They and can be effectively bundled inside a network appliance / on-prem box for blazing-fast real-time streaming analytics.


Flink Key Features:

  • guaranteed fail-safe checkpointing via ‘asynchronous distribution of snapshotting’ without any hit on data flow so that upon failure one can rollback to previous state
  • allows programmer to define ‘stream pattern template’ so that CEP processor can discover those patterns from incoming streams automatically.
  • Link allows querying its ‘shared distributed key/value store’ cleverly maintained inside its stream processor (without requiring any Lambda Architecture)
  • coming soon ‘streaming SQL’ and ‘auto scaling’
  • its well-known Flink can combine the data-flow steps into optimized processes whereas storm topology needs to run every spout/bolt as a separate process leading to high cpu usage.
  • if the business work-flow already captured as Topology , then it can be submitted to Flink which will then optimize the data-transformation steps and allocate minimum processes.
  • uses only 0.5% of the resources of legacy systems (improvement of 200% with zero tuning – via intelligent byte-code engineering)