There are various approaches to create Machine Learning Pipelines:

1. TensorFlow Graph
2. Apache Spark ML Pipelines
3. Sparkling Water (Spark + H2O)
5. PredictionIO

In part-1 , we shall discuss TensorFlow and Spark ML Pipeline

1. TensorFlow Graph

Here we shall run a tensorflow application using a Jupyter notebook

Setup Conda:

Once Conda is installed check current configuration:

$conda info

platform : osx-64
conda version : 3.10.0
conda-build version : 1.11.0
python version : 3.4.3.final.0
requests version : 2.9.1

Create a Conda Tensorflow environment:

$conda create -n tensorflow python=3.5
$source activate tensorflow
$conda install -c conda-forge tensorflow

Test TensorFlow:

$python

import tensorflow as tf
hello = tf.constant(‘Hello, TensorFlow!’)
sess = tf.Session()
print(sess.run(hello))
Hello, TensorFlow!
a = tf.constant(10)
b = tf.constant(32)
print(sess.run(a + b))

Install Jupyter Notebook:

$sudo python3 -m pip install jupyterhub notebook ipykernel
$sudo python3 -m ipykernel install

// you may need to install pip3 ~ sudo easy_install pip3
$pip install –user matplotlib

// conda install -c anaconda pyqt=4.11.4
// conda install -c conda-forge matplotlib=1.5.3

Run the notebook from the tensorflow conda shell

$(tensorflow)$$$:notebooks #$ jupyter notebook

Lets work on a simple Linear Regression problem

~ where we need to predict the ‘fat content’ based on (weight, age)  and discover the relationship between predictor and response variable.

Code Link ~ https://github.com/backstopmedia/tensorflowbook/blob/master/chapters/04_machine_learning_basics/linear_regression.py

import tensorflow as tf

W = tf.Variable(tf.zeros([2, 1]), name=”weights”)
b = tf.Variable(0., name=”bias”)
def inference(X):
return tf.matmul(X, W) + b
def loss(X, Y):
Y_predicted = inference(X)
return tf.reduce_sum(tf.squared_difference(Y, Y_predicted))
def inputs():
# Data from http://people.sc.fsu.edu/~jburkardt/datasets/regression/x09.txt
# age in years and weight in kilograms with blood fat content
weight_age = [[84, 46], [73, 20], [65, 52], [70, 30], [76, 57], [69, 25], [63, 28], [72, 36], [79, 57], [75, 44], [27, 24], [89, 31], [65, 52], [57, 23], [59, 60], [69, 48], [60, 34], [79, 51], [75, 50], [82, 34], [59, 46], [67, 23], [85, 37], [55, 40], [63, 30]]
blood_fat_content = [354, 190, 405, 263, 451, 302, 288, 385, 402, 365, 209, 290, 346, 254, 395, 434, 220, 374, 308, 220, 311, 181, 274, 303, 244]

return tf.to_float(weight_age), tf.to_float(blood_fat_content)
def train(total_loss):
learning_rate = 0.0000001
return tf.train.GradientDescentOptimizer(learning_rate).minimize(total_loss)
def evaluate(sess, X, Y):
print(sess.run(inference([[80., 25.]]))) # ~ 303
print(sess.run(inference([[65., 25.]]))) # ~ 256

# Launch the graph in a session, setup boilerplate
with tf.Session() as sess:

tf.initialize_all_variables().run()

X, Y = inputs()

total_loss = loss(X, Y)
train_op = train(total_loss)

coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

# actual training loop
training_steps = 1000
for step in range(training_steps):
sess.run([train_op])
if step % 10 == 0:
print(“loss: “, sess.run([total_loss]))

# predicts a contnuous real number
evaluate(sess, X, Y)
# we will observe that the model has learnt that
# – blood fat should decay with weight when age remains same

coord.request_stop()
coord.join(threads)
sess.close()

Visualize the TensorFlow Graph

Open a Shell from jupyter
$tensorboard –logdir=”my_graph”

… Starting TensorBoard b’29’ on port 6006
http:/10.237.113.43:6006

Additional Notes:

Find installation location for tensorflow:

$python -c ‘import os; import inspect; import tensorflow; $print(os.path.dirname(inspect.getfile(tensorflow)))’

/Users/M#/anaconda/envs/tensorflow/lib/python3.4/site-packages/tensorflow

TensorFlow Integrations with Keras and Spark:

  • Keras is a high-level neural networks library, written in Python and capable of running on top of either TensorFlow or Theano.

https://github.com/fchollet/keras/blob/master/keras/applications/music_tagger_crnn.py

2.  ML Pipelines from Apache Spark

a) Machine Learning Pipeline in Spark

Example : Gradient Boosted Tree Regressor

spark-ml-flow

df = sqlContext.read.format('csv').option("header", 'true').load("bikeSharing/data-001/hour.csv")

print "Our dataset has %d rows." % df.count()

df=df.drop("instant").drop("dteday").drop("casual").drop("registered")

# The following call takes all columns (df.columns) and casts them 
# using Spark SQL to a numeric type (DoubleType).
from pyspark.sql.functions import col 
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

# Split the dataset randomly into 70% for training and 30% for testing.
train, test = df.randomSplit([0.7, 0.3])
print "We have %d training examples and %d test examples." % (train.count(), test.count())

display(train.select("hr", "cnt"))

# Define the feature processing stages of the Pipeline:
# Assemble feature columns into a feature vector.
# Identify categorical features, and index them.

from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('cnt')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

# define the model training stage of the Pipeline. 
# GBTRegressor takes feature vectors and labels as input and learns to predict labels of new examples.

from pyspark.ml.regression import GBTRegressor
# Takes the "features" column and learns to predict "cnt"
gbt = GBTRegressor(labelCol="cnt")

# wrap the model training stage within a CrossValidator stage. #CrossValidator knows how to call the GBT algorithm with different hyperparameter settings

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
# - maxDepth: max depth of each decision tree in the GBT ensemble
# - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small. In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
 .addGrid(gbt.maxDepth, [2, 5])\
 .addGrid(gbt.maxIter, [10, 100])\
 .build()
# We define an evaluation metric. This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

# finally ties up feature processing stage and model in pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

# train the pipeline
pipelineModel = pipeline.fit(train)

# make predictions and evaluate result
predictions = pipelineModel.transform(test)

display(predictions.select("cnt", "prediction", *featuresCols))

rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse

display(predictions.select("hr", "prediction"))

Ref: https://databricks.com/blog/2016/06/01/preview-apache-spark-2-0-an-anthology-of-technical-assets.html

Ref: https://databricks.com/blog/2016/05/31/apache-spark-2-0-preview-machine-learning-model-persistence.html

b) Keystone ML

KeystoneML makes constructing even complicated machine learning pipelines easy. Here’s an example text categorization pipeline which creates bigram features and creates a Naive Bayes model based on the 100,000 most common features.

val trainData = NewsGroupsDataLoader(sc, trainingDir)

val predictor = Trim.then(LowerCase())
  .then(Tokenizer())
  .then(new NGramsFeaturizer(1 to conf.nGrams))
  .then(TermFrequency(x => 1))
  .thenEstimator(CommonSparseFeatures(conf.commonFeatures))
  .fit(trainData.data)
  .thenLabelEstimator(NaiveBayesEstimator(numClasses))
  .fit(trainData.data, trainData.labels)
  .then(MaxClassifier)

Parallelization of the pipeline fitting process is handled automatically and pipeline nodes are designed to scale horizontally.

Once the pipeline has been fit on training data, you can apply it to test data and evaluate its effectiveness.

val test = NewsGroupsDataLoader(sc, testingDir)
val predictions = predictor(test.data)
val eval = MulticlassClassifierEvaluator(predictions, test.labels, numClasses)

println(eval.summary(newsgroupsData.classes))

 

 

Advertisements