Monday, February 20, 2017

Create and Configure Separate Queue in YARN Capacity Scheduler for running Spark Jobs in IBM BigInsights

This blog talks on - How to create and configure separate queue in YARN Capacity Scheduler Queues for running the Spark jobs.

Environment : BigInsights 4.2

1) Create a queue for Spark from Yarm Queue Manager

Here I am allocating 50% of resources to default queue and rest 50% to Spark Jobs. You can configure the queues based on your use case. You can also create hierarchial queues also.

Login to Ambari UI and go to Yarn Queue Manager

 

 The default queue is configured to use 100% resources. You need to modify the Capacity and Max Capacity to 50%.
























Save the changes by clicking the tick button as shown bellow.


  
Now, click on +Add Queue button and create a new queue for Spark Jobs.
























  
Save and refresh the queues.
















































Open the Resource Manager UI and confirm the Queues configured.
 


















 

2)Submit a Spark job to the queue

Login to the cluster and run the below commands to submit the job.

[root@cluster ~]# su hdfs

[hdfs@cluster root]$ cd /usr/iop/current/spark-client

[hdfs@cluster spark-client]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue sparkQueue  lib/spark-examples.jar 10


 
In the Yarn Resource Manager UI, you can see the job is running in the new queue

















In the logs, you can see the output from the spark job.








































 
Thus, you are able to run the Spark Jobs in different Queue.


Sunday, February 12, 2017

Running HDFS Word Count using Spark Streaming in IBM BigInsights

This blog talks on running a simple word count example to demonstrate Spark Streaming in IBM BigInsights.

Environment : IBM BigInsights 4.2

Step 1: Run the Spark Streaming word Count example for HDFS.

su hdfs

cd /usr/iop/current/spark-client

./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount --master yarn-client lib/spark-examples.jar /tmp/wordcount


 The above statement will be listening to the hdfs folder ( /tmp/wordcount ). Whenever a file is loaded to hdfs folder, it will do a word count and output it.


 Step2: Open another Linux terminal and run the below command as hdfs user.

echo "Hello - Date is `date`" | hadoop fs -put - /tmp/wordcount/test1.txt



In the Linux terminal in step 1, you can see the output of the word count.















The above example will help us to validate the Spark Streaming.

Wednesday, February 8, 2017

Spam classification using Spark MLlib in IBM BigInsights

This blog talks on classifying the SMS messages into Span and Ham using the Spark MLlib.

Environment : IBM BigInsights 4.2

Step 1:  Setup the dataset

We are using the dataset from UCI Machine Learning Repository - SMS Spam Collection Data Set. 

For more details refer -
https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

Download the dataset - https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip

Unzip and upload the file (SMSSpamCollection) to HDFS (/tmp).


 Step 2: Login to Spark Shell


su hdfs
cd /usr/iop/current/spark-client
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m











Step 3: In Scala prompt, run below commands

# Read the dataset.
val inputRdd = sc.textFile("/tmp/SMSSpamCollection")

# Get the records that are Spam and Ham
val linesWithSpam = inputRdd.filter(line => line.contains("spam"))
val spam = linesWithSpam.map( x => x.split("\t")(1))

val linesWithHam = inputRdd.filter(line => line.contains("ham"))
val ham = linesWithHam.map( x => x.split("\t")(1))



















# Import the required mllib classes
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

# Convert the text to vector of 100 features based on term frequency.
var tf = new HashingTF(numFeatures = 100)
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val hamFeatures = ham.map(email => tf.transform(email.split(" ")))

# Label the Spam as 1 and ham as 0.
val positiveExamples = spamFeatures.map( features => LabeledPoint(1, features))
val negativeExamples = hamFeatures.map( features => LabeledPoint(0, features))
val training_data = positiveExamples.union(negativeExamples)
# cache the training data
training_data.cache()

# We use 60% of dataset for training and remaining for testing the model.
val Array(trainset, testset) = training_data.randomSplit(Array(0.6, 0.4))





















# We use Logistic Regression model, and make predictions with the resulting model
val lrLearner = new LogisticRegressionWithSGD()
val model = lrLearner.run(trainset)

val predictionLabel = testset.map( x => (model.predict(x.features), x.label))

val accuracy = predictionLabel.filter(r => r._1 == r._2).count.toDouble / testset.count

println("Model accuracy : " + accuracy)















Thus, we are able to create and run the model to predict the Spam or Ham.