Lets assume we are receiving huge amount of streaming events for connected cars.

First the Spark App need to subscribe to the Kafka topic

String bootstrapServers = “localhost:9092”;

String subscribeType = “subscribe”;

String topics = “carposition”;

Specify the structure of the data

StructType carPositionStruct = new StructType()

.add(“carId”, “string”)

.add(“streetName”, “string”)

.add(“pathType”, “string”)

.add(“pathCovered”, “double”)

.add(“timestamp”, “string”)

.add(“timeInTransit”, “int”);

Define the Stream

Dataset carPositionStreams = spark

.readStream()

.format(“kafka”)

.option(“kafka.bootstrap.servers”, bootstrapServers)

.option(subscribeType, topics)

.option(“startingOffsets”, “earliest”)

.load()

.withColumn(“message”, from_json(column(“value”).cast(“string”), carPositionStruct))

// cast the binary value to a string and parse it as json

.select(“message.*”) // unnest the json

.as(Encoders.bean(CarPositionData.class));

Save the schema

carPositionStreams.createOrReplaceTempView(“carPositionStreams”);

Pre-aggregate data

String sql = “select pathName, count(carId) as car_count from carPositionStreams group by pathName”;

Dataset carBasicAgg = spark.sql(sql);

Register a query with Streaming Session

StreamingQuery streamingQry = carBasicAgg.writeStream()

.format(“memory”)

.trigger(ProcessingTime.create(“10 seconds”))

.queryName(“carAggSummary”)

.outputMode(“complete”)

//.option(“checkpointLocation”, “/tmp/summary/checkpoints/”)

.start();

spark.table(“carAggSummary”);

Store query result into the table

if(streamingQry != null && streamingQry.runId() != null && streamingQry.status().isDataAvailable()) {

String query = “select pathName, sum(car_count) as total_count from carAggSummary group by

pathName”;

Dataset carCountTotal = spark.sql(query2);

System.out.println(“|||========== CarCountSummary ================|||”);

carCountTotal.show();

System.out.println(“|||=======================================|||”);

carCountTotal.createOrReplaceTempView(“carAggSummary”);

// We can also save the result in parquet format for further analysis

Dataset carAggSummaryData = spark.table(“carAggSummary”);

carAggSummaryData.toDF().write().parquet(“/data/summary/carPosition/”+new Date().getTime()+”/”);

}

Spark Dependency

spark-core_2.11, spark-streaming_2.11, spark-streaming-kafka-0-8_2.10, spark-sql_2.11, spark-sql-kafka-0-10_2.11,  scala-library 2.11.0, scala-reflect 2.11.0

Data Analytics using Spark & Cassandra

SparkConf conf = new SparkConf(true).setMaster(“local”).setAppName(“CarAggr”).set(“spark.executor.memory”, “2g”).set(“spark.cassandra.connection.host”, “localhost”);

JavaSparkContext sparkContext = new JavaSparkContext(conf);

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(sparkContext);

JavaRDD filteredRDD = functions.cassandraTable(“car_data”, “car_stats_by_city”)

.select(“avgdistancetravelled”).where(“cityid=?”, “SJ001”)
.where(“year=?”, 2017)

.filter(new org.apache.spark.api.java.function.Function<CassandraRow, Boolean>() {

@Override

public Boolean call(CassandraRow row) throws Exception {

int month = row.getInt(“month”);

return month >= 1 && month < 30;

}

}).filter(new org.apache.spark.api.java.function.Function<CassandraRow, Boolean>() {

@Override

public Boolean call(CassandraRow row) throws Exception {

int month = row.getInt(“day”);

return month >= 1 && month < 30;

}

});

JavaRDD cachedRdd = filteredRDD.cache();

LongStream values = cachedRdd.collect().parallelStream().mapToLong(new ToLongFunction() {

@Override

public long applyAsLong(CassandraRow row) {

return row.getLong(“avgdistancetravelled”);

}

});

long avgVal = values.sum() / values.count();

Spark-Cassandra Dependency

cassandra-driver-core 3.1.4, spark-cassandra-connector_2.11 2.0.0,  spark-cassandra-connector_2.11  2.0.2,  spark-sql_2.11 2.1.1, spark-core_2.11 2.1.1, scala-library 2.11.8

Its worth noting, for Cassandra Version >= 3.10, one can effectively use Group By

lets assume we need to measure some basic stats (sum,avg,min,max) within certain time-interval and advanced stats for certain metrics (bytesTransferred, trafficLatency, applicationCount, bandwidthConsumed)