Data Model Reference

Following are just some examples of general data model patterns

Once the business requirements are translated into the concrete use cases ; actual data models and query strategies can be specified with proper details.

 

Network Structures

Usecases :  Model dynamic connectivity

DB :  Neo4j – for medium data set works well

Big Data Graph Store – Titan  (very useful for Graph Traversal),  Spark Graph – useful for Graph Processing (Page Ranks)

 

Neo4j Model

Example: Social Connections

  • build various graph structures based on different kinds of weights or influencer (friends affinity)
  • when connectivity is very dynamic (social circles or product qualities or movie preferences or stock valuations) its always better to model the entities in a graphical structures

MATCH (scifi:Movie {name:”StarWars”}) MATCH (johan:Person {name:”Johan”}) CREATE (johan)-[:FRIEND]->(:Person:Expert {name:”Max”})-[:Likes]->(scifi)

Ref: http://neo4j.com/developer/cypher-query-language/#_create_second_degree_friends_and_expertise

 

Spark GraphX :

http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html

it is often desirable to be able to move between table and graph views of the same physical data and to leverage the properties of each view to easily and efficiently express computation.

Refs : http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html

val graph = Graph(vertices, edges, “”).subgraph(vpred = ….. ).cache  // read wikipedia articles

val prGraph = graph.staticPageRank(5).cache

// Graph.staticPageRank returns a graph whose vertex attributes are the PageRank values of each page

find the titles of the ten most important articles.

val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
}

titleAndPrGraph.vertices.top(10) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + “: ” + t._2._1))

Gremlin :

http://tinkerpop.incubator.apache.org/docs/3.1.0-incubating/#_mutating_the_graph , http://tinkerpop.incubator.apache.org/docs/3.1.0-incubating/#_connecting_via_rest

http://www.datastax.com/wp-content/uploads/2012/08/C2012-Titan-MatthiasBroecheler.pdf

It offers both OLTP and OLAP (breath-first-search) functionality

Example of simple Analytics Query :

// who is hercules’ grandfather?g.V.has(‘name’,’hercules’).out(‘father’).out(‘father’).name

Reference : http://sql2gremlin.com/  , http://sql2gremlin.com/graph/

SQL

SELECT Products.ProductName      FROM ProductsINNER JOIN Categories        ON Categories.CategoryID = Products.CategoryID     WHERE Categories.CategoryName = ‘Beverages’

Gremlin

gremlin> g.V().has(“name”,”Beverages”).in(“inCategory”).values(“name”)

Json Models

Usecases : Metadata , Dynamic Models.

DB :  MongoDB, Couchbase  – can be leveraged as fast in-memory caching , suitable for medium to large (GB) dataset

User Profile

{ profile : { name: … , address: … , gender: …. },

preferences : { [chocobar , chips ..] },

balance : { current: 20, topup: 1 },

feedback : { 
[ vending_m1: {rating: satisfied , comments: …},

vending_m2: {rating: bad, comments: … } ]

…..

}}   // document#1

Lets look into another Example:

  • User Preferences :  assume every user has got different kind of preferences and each preference has got dynamic set of values

Example: Find what users like most – https://docs.mongodb.org/manual/tutorial/aggregation-with-user-preference-data/

  •  MongoDB Map-Reduce makes it really simple to query such indexed documents in real-time

db.users.aggregate(

[{ $unwind : “$prefers” },

{ $group : { _id : “$prefers” , number : { $sum : 1 } } },                  { $sort : { number : -1 } },

{ $limit : 5 }   ] )

 

 

For large no of User Profiles , Column Family can be used .

—  RowKey : 1001 [Name : Bill] , [Email:abc@def] , [City: New York] , [Bday: 7 May, 1980]

— RowKey : 1001 [Name : Joy] , [mnp@def]

 

Design by Query Patterns

DB : Cassandra – for Big data set

Machine Usage data

Row-Key:VM01 è [user1:25],[user2:30]….

 

Row-Key CF-1 CF-2
Machine-1 Daily Weekly
Avg Txn Time

 

Max Txn Time

 

Min Txn Time

 

Avg Txn Time

 

Max Txn Time

 

Min Txn Time

 

Machine-2 Daily Weekly
Avg Txn Time

 

Max Txn Time

 

Min Txn Time

 

Avg Txn Time

 

Max Txn Time

 

Min Txn Time

 

 

User Time with spent with machine

 

Row-Key CF-1 CF-2
User-1 Daily Weekly
Avg Time

 

Max Time

 

Min Time

 

Avg Time

 

Max Time

 

Min Time

 

User-2 Daily Weekly
Avg Time

 

Max Time

 

Min Time

 

Avg Time

 

Max Time

 

Min Time

 

 

 

Combine co-related data for a specific Machine Provider

 

Row-Key CF-1 CF-2
Machine-Provider-1 SFO-Inventory SFO-Revenue
Machine1_Inv

 

Machine2_Inv

 

 

Machine1_Rev

 

Machine2_Inv

 

Machine-Provider -2 Portland-Inv Portland-Rev
Machine1_Inv

 

 

Machine2_Inv

 

Machine1_Rev

 

Machine2_Inv

 

 

 

Aggregation Results

 

Usecase :  For large data set use Cassandra / For small data set use Redis / MongoDB

 

Best-selling products 

 [product2 =>  #sales]

Sales per Provider :

[vending_machine_provider =>  [product1 : #sales] , [product2: #sales] ]

 

Event Logs

 

DB :  Elastic Search   (store 6 months of data) – suitable for medium to large (GB) dataset

Example :  Transaction Log, Session Activity Log, Application Log

 

Transactions

[ {userId:””, timestamp: “” , transaction_amount: “”, status : “”}, {userId:””, timestamp: “” , transaction_amount: “”, status : “”},

{userId:””, timestamp: “” , transaction_amount: “”,status: “”} ],

 

ElasticSearch is most widely used for Log Search

Example: Logs : https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html

curl -XPUT ‘xyz.com/_template/logTemplate‘ -d ‘{  “template” : “mylogs*”,  “order” : 21,  “settings” : {   “index.analysis.analyzer.my_own_lowercase.type” : “custom”,   “index.analysis.analyzer.my_own_lowercase.tokenizer” : “keyword”,   “index.analysis.analyzer.my_own_lowercase.filter.0” : “lowercase”,  },

“mappings” : {   “message” : {    “properties” :

{ “message” : { “type” : “string” },     “tags” : { “type” : “string”, “analyzer” : “my_own_lowercase” },     “nick” : { “type” : “string”, “analyzer” : “my_own_lowercase” }    }   }  } }’

Advantages of storing structure in Elastic Search :

  • KPI Analytics using Kibana
  • Real-time Query / Policy Execution / Notifications using Percolator / Alerts / Watch

For huge amount of Unstructured Logs – Cassandra can be used

[RowKey : <session_id / mobile_phone_no> è Timestamp1 : Event_Log1  , Timestamp2, Event_Log2  ]

** Note **  KeySpace can have dynamic column structure in each row

 

Geo Json Queries

Elastic Search :

https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-shape-query.html

Histograms :

Elastic Search : https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-histogram-aggregation.html

Term Clusters

Elastic Search : https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html

 

Timeseries of Metrics

DB : InfluxDB as it offers built-in Stat Functions and time-windowing functions ,  Elastic Search – very powerful Histogram and Multi-Level Aggregation functions

Ref: https://influxdb.com/docs/v0.9/query_language/data_exploration.htm

  • Get the last hour of data from the two series events, and errors. Here’s a regex example:

select * from /^stats\./i where time > now() – 1h;

  • select * from events where (email =~ /.*gmail.*/ or email =~ /.*yahoo.*/) and state = ‘ny’;
  • — 95th percentile of response times in 30 second intervals select percentile(value, 95) from response_times group by time(30s);

 

 

Querying Heterogeneous Data sources :

 

Federated Query :

PrestoDB – very impressive , reference – http://blogs.impetus.com/big_data/hadoop_ecosystem/Data%20Analysis%20using%20Presto.do

Example : Get top 2 purchases from each user by combining data from Hive & Cassandra in single Presto Query

presto:default> select * from (select *, row_number() over(partition by id order by amount desc)as rnk from (select hive_user_info.id, hive_user_info.fname, hive_user_info.gender, hive_user_info.age, hive_user_info.salary, cassandra_user_purchases.item, cassandra_user_purchases.time, cassandra_user_purchases.place, cassandra_user_purchases.quanity, cassandra_user_purchases.amount from user_info hive_user_info join cassandra.stockticker.user_purchases cassandra_user_purchases on hive_user_info.id = cassandra_user_purchases.user_id)) where rnk <=2;

 

Spark SQL 

 

Spark SQL allows seamless aggregation of data by joining heterogeneous datasources 

> connect existing BI tools through JDBC

> share data with multiple users through cached tables in jdbc driver

> use columnar formats (skip unnecessary columns , use required columns)  , store data as a block of columns for a specific column

> use partitioning (on filter keys like /year=2015/month=08/ etc.) , force shuffle through repartition ( if we want parallelism like fork-join and then coalesce)

> pushing predicates to storages (postgress , mysql)

> uses data frames (a distributed collection of rows organized into named columns) , (map-partitions shuffled RDDs , for a well-expressed dataframe , the optimizer to does the magic)

df = sqlContext.read.format(“json”).option(“samplingRatio”,”0.1″).load(“/data/marketing1.json”)

df.write.format(“parquet”).mode(“append”).partitionBy(“year”).saveAsTable(“marketing1”)

// ETL  –>  use a library to read finance staging data

sqlContext.read.format(“com.databricks.spark.finance”).option(“url”,”https://marketingsite.org/rest/api/latest/search“).option(“user”,”test”).option(“password”,”test”).option(“query”,”…”).load().repartition(1).write.format(“parquest”).saveAsTable(“FinanceData”)

sqlCtx.table(“revenue”).groupBy(“dept”).agg(“dept”, avg(“amount”)).collect()

the cool stuff is custom Python, R , Java , Scala code can be blended with Dataframe operations :  here an attribute ‘City’ from udf is merged with ‘User Id’ from user table

zipToCity = udf(lambda zipCode:  <custom python GeoLoc library>)

def add_demography(events):

u = sqlCtx.table(“users”)

events.join(u, events_user_id == u.user_id).withColumn(“city”, zipToCity(df.zip))

// courtesy : databricks demo

and the best part is since all operations are lazy a great deal of optimization can be performed .

for example, now when we call the add_demographics to find events info and then find training data , users can be scanned while executing Query Plan and data can be columnized and compressed even before executing actual Query .

events = add_demographics(sqlCtx.load(“/data/events”, “parquet”))

training_data = events.where(events.city == “San Francisco”).select(events.timestamp).collect()

Precomputing Joined data and Preprocessing filters

// courtesy : databricks demo

Query Workflow

  • Streaming Events consumed per User

trainingDataTable = sql(“”” SELECT e.action, u.age, u.latitude, u.longitude  FROM Users u  JOIN Events e  ON u.userId = e.userId”””)

 

val trainingData = trainingDataTable map { row => val features = Array[Double](row(1), row(2), row(3))    LabeledPoint(row(0), features)  }

val model = new LogisticRegressionWithSGD().run(trainingData)

 

** See how easy it is to construct a Spark RDD Model using Spark SQL and transform the model subsequently.

Reference : http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model-39976454/56-val_model_new_LogisticRegressionWithSGD_runtrainingData

 

Advanced Analytics Results

Either in offline or in real-time Statistical and Machine Learning computations can be performed and results  can be stored in Redis / MongoDB for Quick Lookup

  1. Infer the cause for lower usage of a machine in certain area :

— regularly calculate volatility (standard deviations) of related metrics and find the anomalies by detecting outliers (say falling outside Lower Bollinger Band i.e. mean – 2*sd)

     correlate the outliers 
(say certain machines) with other related Business Metrics

  1. Collaborative Filtering to find which machines are working best for users

—  metrics : user feedback / ratings

— generate internal recommendations / inventory replenishment events etc.

  1. Generate clusters of related items

 

 

Ontology

Usecase :  Technology-independent Model to capture relationships between Business Entities

<owl:Class rdf:ID=”Large-Format”>

<rdfs:subClassOf rdf:resource=”#Camera”/>

<rdfs:subClassOf>

<owl:Restriction>

<owl:onProperty rdf:resource=”#body”/>

<owl:allValuesFrom rdf:resource=”#BodyWithNonAdjustableShutterSpeed”/>    </owl:Restriction>

</rdfs:subClassOf>

</owl:Class>

Ref:  http://protege.stanford.edu/

Specific Topics

Cassandra Data Modelling

General Cassandra Concepts:

  1. Keyspace – What method should I use to turn my application key into a Row Key?
    1. partitioner determines how your application keys get turned into Row Keys.
  2. Row Key – Which Node is my value on?
    1. ReplicationFactor determines how many nodes get a copy of a particular key
  3. Column Family – Which file on the node is my value in?
    1. Writes on ColumnFamiliy  => in parallel stored in memory (memtable) and written out to disk (commit log)
  4. Column Name – Which piece of the file on the node contains my value?
    1.  memtable is periodically written out to disk in in column-order for efficient look-up
  5. Column Value – My value!
  6. ConsistencyLevel determines how many nodes must successfully record a write
  7. Timestamp is used to select the latest writes
  8. As commitlog is serial write-only, it could be put on a separate disk for faster writes
  9. Gossip is used to continuously propagate node status and information to other nodes.
    1. ReadRepair propagates the most recent column value to the nodes from which client is reading data.

General Cassandra Modelling References:

Cassandra Modelling Suggestions:

  • key’s cardinality should be high enough to not cause hot spots on any of the drives.
  • It’s better to have N requests retrieving single rows, rather than single requests retrieving N rows (target as small a hash value as possible)
    • Limit the primary key to exactly what you’ll be searching with.
    • If you plan on searching the data with a similar, but different criteria, then make it a separate table.
  • Cassandra writes are cheap.
    • Don’t try to cache up data to send to the database for storage all at once.
    • If you have a portion of the data, write it out.  Don’t wait for the entire recored to be received from the application.
  • If you need to store the same piece of data in M different tables, then write it out M times.

Time Series Cassandra Modelling:

Examples :
1) OpenNMS – common ‘distributed map relationship between ‘resource’ , ‘time’ , ‘metric’ and ‘value’
So we can maintain multiple types of metrics at a given timestamp.
https://www.youtube.com/watch?v=ovMo5pIMj8M

 

For reference, look into – https://github.com/OpenNMS/newts , how OpenNMS Timeseriese model is queried –  l

The entire Tree structure is merged-sorted-stored into ….  a single row > column family > column > value

2) Time Series Stream Processing with Spark and Cassandra

– Training Course: https://academy.datastax.com/courses/getting-started-apache-spark

– https://www.youtube.com/watch?v=fBWLzB0FMX4

picture1

3) Time Series Library Spark-ts

http://sryza.github.io/spark-timeseries/0.3.0/index.html

http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/

Functionality

Time Series Manipulation

  • Aligning
  • Lagging
  • Slicing by date-time
  • Missing value imputation
  • Conversion between different time series data layouts

 

Time Series Math and Stats

  • Exponentially weighted moving average (EWMA) models
  • Autoregressive integrated moving average (ARIMA) models
  • Generalized autoregressive conditional heteroskedastic (GARCH) models
  • Missing data imputation
  • Augmented Dickey-Fuller test
  • Durbin-Watson test
  • Breusch-Godfrey test
  • Breusch-Pagan test
Advanced Notes

 

 

 

Advertisements