1. Simple Spark Application for fast Enterprise Data Analysis

Application Development Steps

spark

Setup Datasets

Data Source :

Lets assume we have a massive Product Sales itemized data available from Retail Stores.

We also have Product entity data  (Product, Country, Price, SalesManager)

Data Delivery Mode :

a) Continuous Streams fed to Spark RDD

b) Stage the data in MySQL / HDFS and periodically sync up with Spark RDD

Create Spark RDD :

  1. Lets assume we develop a WebApp using Jersey WS.
<servlet-class>com.sun.jersey.spi.spring.container.servlet.SpringServlet</servlet-class>
<init-param>
 <param-name>javax.ws.rs.Application</param-name>
 <param-value>com.spark.analytics.RestRegistry</param  -value>
</init-param> .....

2.  Next we expose the API

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createDataset(DatasetRequest request) { ....... }

3. We can create either simple RDD or Columnar RDD

  • For Columnar Structures : Extract Column Info from the DataSet Request
  • For Columnar Structures : Using Column Metadata create the Tuple holders
    • StringColumnRDD , DateColumnRDD, NumericColumnRDD etc.
  • Connect to real datasource ( MySQL / Hadoop / ES)
  • For Columnar Structure, store the cell value for each row in each ColumnRDD
  • For Row Structure, store the batch datasets :  SparkJavaDriver.getContext().parallelize(batchData)  –>  unifiedRDD.union(intermediateRDD)
  • If data is imported periodically from a Source, then  save the unifiedRDD.saveAsTextFile(hdfs_path)

Create Spark Session :

1. Create the Spark session context

2. Store all the dataset RDDs in session

JavaSparkContext jsc = SparkDriver.getSparkContext();

JavaRDD<String> datasetRdd = jsc.textFile(hdf_file_path).cache();
SessionCache.getInstance().addToRDDPool(cacheKey, datasetRdd);

SessionCache.getInstance().incrementRefCount(cacheKey)

Query the Session :

1. For the current Session Id find the cached Datasets

2. Apply Aggregate functions on the columns

public static class Mapper extends PairFunction<String, String, Double> {

@Override
public Tuple2<String, Double> call(String line) throws Exception {
  // for every column in the sessionQueryModel extract the value from the specific column index
  // write custom logic

  return new Tuple2<String, Double>(strKey.toString(), Double.parseDouble(strValue));
 }
}
Mapper mapFunction = new Mapper(sessionQueryModel);
selectedData = ((JavaRDD<String>)sessionRdd).map(mapFunction);

ReduceByKeyCalculator reducer = new ReduceByKeyCalculator(sessionQueryModel.getBizCol());

finalAggrRDD =selectedData.reduceByKey(reducer, reduce_by_partitions);

 

2.  Spark with Cassandra

https://github.com/datastax/spark-cassandra-connector 

Lightning-fast cluster computing with Spark and Cassandra

This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.

sc.cassandraTable(“test”, “cars”).select(“id”, “model”).where(“color = ?”, “black”).toArray.foreach(println)

3. Spark with ElasticSearch

        http://www.elasticsearch.org/blog/es-hadoop-2-0-1-and-2-1-beta1

Elasticsearch for Apache Hadoop 2.0 added support for Apache Spark, through its Map/Reduce functionality. Beta1 goes way beyond that, providing a dedicated native Spark RDD (or Resilient Distributed Dataset) for Elasticsearch, for both Java and Scala. Thus, one can easily execute searches in Elasticsearch and transparently feed the results back to Spark for transformation.

Dedicated Java and Scala APIs

JavaSparkContext jsc = … JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, “radio/artists”, “?me*”);

JavaRDD<Map<String, Object>> filtered = esRDD.filter( m -> m.values().stream().filter(v -> v.contains(“mega”)));

4.  Spark with H20

https://github.com/h2oai/sparkling-water

Users can in a single invocation and process, get the best of Spark – its elegant APIs, RDDs, multi-tenant Context and H2O’s speed, columnar-compression and fully-featured Machine Learning and Deep-Learning algorithms.

References :  strata slides, http://lintool.github.io/SparkTutorial/

5. Spark SQL

Spark SQL allows seamless aggregation of data by joining heterogeneous datasources 

connect existing BI tools through JDBC

share data with multiple users through cached tables in jdbc driver

use columnar formats (skip unnecessary columns , use required columns)  , store data as a block of columns for a specific column

use partitioning (on filter keys like /year=2015/month=08/ etc.) , force shuffle through repartition ( if we want parallelism like fork-join and then coalesce)

pushing predicates to storages (postgress , mysql)

uses data frames (a distributed collection of rows organized into named columns) , (map-partitions shuffled RDDs , for a well-expressed dataframe , the optimizer to does the magic)

df = sqlContext.read.format(“json”).option(“samplingRatio”,”0.1″).load(“/data/marketing1.json”)

df.write.format(“parquet”).mode(“append”).partitionBy(“year”).saveAsTable(“marketing1”)

// ETL  –>  use a library to read finance staging data

sqlContext.read.format(“com.databricks.spark.finance”).option(“url”,”https://marketingsite.org/rest/api/latest/search&#8221;).option(“user”,”test”).option(“password”,”test”).option(“query”,”…”).load().repartition(1).write.format(“parquest”).saveAsTable(“FinanceData”)

sqlCtx.table(“revenue”).groupBy(“dept”).agg(“dept”, avg(“amount”)).collect()

the cool stuff is custom Python, R , Java , Scala code can be blended with Dataframe operations :  here an attribute ‘City’ from udf is merged with ‘User Id’ from user table

zipToCity = udf(lambda zipCode:  <custom python GeoLoc library>)

def add_demography(events):

u = sqlCtx.table(“users”)

events.join(u, events_user_id == u.user_id).withColumn(“city”, zipToCity(df.zip))

// courtesy : databricks demo

and the best part is since all operations are lazy a great deal of optimization can be performed .

for example, now when we call the add_demographics to find events info and then find training data , users can be scanned while executing Query Plan and data can be columnized and compressed even before executing actual Query .

events = add_demographics(sqlCtx.load(“/data/events”, “parquet”))

training_data = events.where(events.city == “San Francisco”).select(events.timestamp).collect()

Precomputing Joined data and Preprocessing filters

// courtesy : databricks demo

Reference : https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html , https://forums.databricks.com/

Demo code : https://github.com/databricks/spark-pr-dashboard

Online Classes : https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x  ,  https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x

6. DDF

Distributed DataFrame simplifies Analytics on Disparate Data Sources  via a Uniform API Across Engines

http://ddf.io/quickstart.html

Advertisements