Dynamic Structures:

Example: User Preferences

  • Assume every user has got different kind of preferences and each preference has got dynamic set of values
    • its so easy to setup a MEAN App and keep pumping dynamic Json into Mongo or Couchbase
    • UserPref : {  UserName :  [ Pref1 : { k1:v1.. k2:v2…. } , Pref2 : {..k3:v3… k4:v4..} ] }

Example: Find what users like most – https://docs.mongodb.org/manual/tutorial/aggregation-with-user-preference-data/

  •  MongoDB Map-Reduce makes it really simple to query such indexed documents in real-time
  • db.users.aggregate(
      [
        { $unwind : "$prefers" },
        { $group : { _id : "$prefers" , number : { $sum : 1 } } },
        { $sort : { number : -1 } },
        { $limit : 5 }
      ]
    )

Key-Value Store:

Example: Twitter Data

  • Assume Tweets-Users and User-Tweets are stored in large cassandra / hbase stores
  • Now say every tweet contains a list of dynamic features which can be easily stored in a document store against the user-id_tweet-hash key or  a timeuuid
  • create table tweets (tweetid uuid Pky, username text, body text)
  • create table userline (username text, tweetid timeuuid, body text, pky(username,tweetid))
    • cassandra supports billions of columns … so keep adding more and more features and duplicate correlated data (that can wait to be Eventually Consistent)
    • Sentiments :  { positivity , negativity , score } , ranks : { retweets , popularity , penalty , sensitivity , threatLevel }
  • Depending upon the Data, one can use simple Redis Key-Value (Data Structures – List, Set) or use Distributed Sorted Map (HBase/ Cassandra) Ref : http://www.slideshare.net/jericevans/cassandra-by-example-data-modelling-with-cql3

Columnar Structures:

Example: In-Memory Columnar Structure (IMCS) for Stock Events

top 10 IBM quotes with maximal close price for first quarter of 2014:

select (Quote_project(ibm.*, cs_top_max_pos(Close, 10))).* from Quote_get(‘IBM’, ‘01-Jan-2014’, ’31-Mar-2014’) ibm;

Advantage of columnar model:

  • Reducing size of fetched data: only columns involved in query are accessed.
  • Vector operations. Applying an operator to set of values makes it possible to minimize interpretation cost.
  • Use SIMD instructions of modern processors to accelerate execution of vector operations.
  • Compression of data. Such simple compression algorithm like RLE allows not only to reduce used space, but also minimize number of performed operations.

Reference : https://www.pgcon.org/2014/schedule/attachments/322_IMCS.pdf

Streaming Unstructured Texts:

Example: Logs : https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html

ElasticSearch is most widely used for Log Search

curl -XPUT 'xyz.com/_template/logTemplate' -d '{
 "template" : "mylogs*",
 "order" : 21,
 "settings" : {
  "index.analysis.analyzer.my_own_lowercase.type" : "custom",
  "index.analysis.analyzer.my_own_lowercase.tokenizer" : "keyword",
  "index.analysis.analyzer.my_own_lowercase.filter.0" : "lowercase",
 },
 "mappings" : {
  "message" : {
   "properties" : {
    "message" : { "type" : "string" },
    "tags" : { "type" : "string", "analyzer" : "my_own_lowercase" },
    "nick" : { "type" : "string", "analyzer" : "my_own_lowercase" }
   }
  }
 }
}'
  • Streaming Logs are unstructured data which can be easily converted into Spark Streams from say Kafka Events
  • Spark SQL makes it really easy to query the Spark RDDs   ( select log_line from LSTRM group by severity )
    • mashup the current data with existing historical data and compute Stats
    • store the Stats in fast lookup stores ( Redis / Mongo )
  •  Directly store the structured / processed Result into Elastic Search
    • Advantages of storing structure in Elastic Search :
      • KPI Analytics using Kibana
      • Real-time Query and Notifications using Percolator and Alerts

Timeseries:

Example: Metrics : https://influxdb.com/docs/v0.9/query_language/data_exploration.html

  • Get the last hour of data from the two series events, and errors. Here’s a regex example:
    select * from /^stats\./i where time > now() - 1h;
  • select * from events
    where (email =~ /.*gmail.*/ or email =~ /.*yahoo.*/) and state = 'ny';
  • -- 95th percentile of response times in 30 second intervals
    select percentile(value, 95) from response_times group by time(30s);

Streaming Structured Data:

Example: Streaming Events

  • Events consumed per User
trainingDataTable = sql(""" SELECT e.action, u.age, u.latitude, u.longitude
 FROM Users u
 JOIN Events e
 ON u.userId = e.userId""")
val trainingData = trainingDataTable map { row =>
val features = Array[Double](row(1), row(2), row(3))
   LabeledPoint(row(0), features)
 }
val model = new LogisticRegressionWithSGD().run(trainingData)

** Note how easy it is to construct a Spark RDD Model using Spark SQL and transform the model subsequently.
Reference : http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model-39976454/56-val_model_new_LogisticRegressionWithSGD_runtrainingData

Network Structures:

Example: Social Connections

  • build various graph structures based on different kinds of weights or influencer (friends affinity)
  • when connectivity is very dynamic (social circles or product qualities or movie preferences or stock valuations) its always better to model the entities in a graphical structures
MATCH (scifi:Movie {name:"StarWars"})
MATCH (johan:Person {name:"Johan"})
CREATE (johan)-[:FRIEND]->(:Person:Expert {name:"Max"})-[:Likes]->(scifi)

Ref: http://neo4j.com/developer/cypher-query-language/#_create_second_degree_friends_and_expertise

Big DataGraph:

Example: User Interest Property Graph : http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html

  • Social behavior of an user is primarily determined through various attributes like gender , age , brand affinity , anniversaries , birthdays , education , media consumption .
  • Big Data Graph Store (Gremlin) is the preferable data store.
    • involves offline data inference and attribute matching algorithms ( matching registered user’s email / location / address  with  social profile / tweets / fb posts)
  • Build large vertex and edge data:
    • val vertexRDD: RDD[(Long, (String, Int))] and edgeRDD: RDD[Edge[Int]]
    • val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

Ontology:

Example: Finance Strategies

<owl:Class rdf:ID="Large-Format">
  <rdfs:subClassOf rdf:resource="#Camera"/>
  <rdfs:subClassOf>
    <owl:Restriction>
      <owl:onProperty rdf:resource="#body"/>
      <owl:allValuesFrom rdf:resource="#BodyWithNonAdjustableShutterSpeed"/>
   </owl:Restriction>
  </rdfs:subClassOf>
</owl:Class>

Ref:  http://protege.stanford.edu/

Example: Medical UMLS

Complex Map Reduce Problems

Example: Map-Reduce programming model for Page Rank  – https://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/

class Mapper
   method Initialize
      H = new AssociativeArray
   method Map(id n, object N)
      p = N.PageRank  / N.OutgoingRelations.size()
      Emit(id n, object N)
      for all id m in N.OutgoingRelations do
         H{m} = H{m} + p
   method Close
      for all id n in H do
         Emit(id n, value H{n})
class Reducer
   method Reduce(id m, [s1, s2,...])
      M = null
      p = 0
      for all s in [s1, s2,...] do
          if IsObject(s) then
             M = s
          else
             p = p + s
      M.PageRank = p
      Emit(id m, item M)

Federated Query

PrestoDB – very impressive , reference – http://blogs.impetus.com/big_data/hadoop_ecosystem/Data%20Analysis%20using%20Presto.do

Example : Get top 2 purchases from each user by combining data from Hive & Cassandra in single Presto Query

presto:default> select * from (select *, row_number() over(partition by id order by amount desc)as rnk from (select hive_user_info.id, hive_user_info.fname, hive_user_info.gender, hive_user_info.age, hive_user_info.salary, cassandra_user_purchases.item, cassandra_user_purchases.time, cassandra_user_purchases.place, cassandra_user_purchases.quanity, cassandra_user_purchases.amount from user_info hive_user_info join cassandra.stockticker.user_purchases cassandra_user_purchases on hive_user_info.id = cassandra_user_purchases.user_id)) where rnk <=2;

Workflow of Query

Spark SQL allows seamless aggregation of data by joining heterogeneous datasources 

connect existing BI tools through JDBC

share data with multiple users through cached tables in jdbc driver

use columnar formats (skip unnecessary columns , use required columns)  , store data as a block of columns for a specific column

use partitioning (on filter keys like /year=2015/month=08/ etc.) , force shuffle through repartition ( if we want parallelism like fork-join and then coalesce)

pushing predicates to storages (postgress , mysql)

uses data frames (a distributed collection of rows organized into named columns) , (map-partitions shuffled RDDs , for a well-expressed dataframe , the optimizer to does the magic)

df = sqlContext.read.format(“json”).option(“samplingRatio”,”0.1″).load(“/data/marketing1.json”)

df.write.format(“parquet”).mode(“append”).partitionBy(“year”).saveAsTable(“marketing1”)

// ETL  –>  use a library to read finance staging data

sqlContext.read.format(“com.databricks.spark.finance”).option(“url”,”https://marketingsite.org/rest/api/latest/search&#8221;).option(“user”,”test”).option(“password”,”test”).option(“query”,”…”).load().repartition(1).write.format(“parquest”).saveAsTable(“FinanceData”)

sqlCtx.table(“revenue”).groupBy(“dept”).agg(“dept”, avg(“amount”)).collect()

the cool stuff is custom Python, R , Java , Scala code can be blended with Dataframe operations :  here an attribute ‘City’ from udf is merged with ‘User Id’ from user table

zipToCity = udf(lambda zipCode:  <custom python GeoLoc library>)

def add_demography(events):

u = sqlCtx.table(“users”)

events.join(u, events_user_id == u.user_id).withColumn(“city”, zipToCity(df.zip))

// courtesy : databricks demo

and the best part is since all operations are lazy a great deal of optimization can be performed .

for example, now when we call the add_demographics to find events info and then find training data , users can be scanned while executing Query Plan and data can be columnized and compressed even before executing actual Query .

events = add_demographics(sqlCtx.load(“/data/events”, “parquet”))

training_data = events.where(events.city == “San Francisco”).select(events.timestamp).collect()

Precomputing Joined data and Preprocessing filters

// courtesy : databricks demo

Reference :

Advertisements