Sunday, March 5, 2017

Configuring and Running Apache Kafka in IBM BigInsights

This blog describes on Configuring and running the Kafka from IBM BigInsights.

Apache Kafka is an open source that provides a publish-subscribe model for messaging system. Refer :

I assume that you were aware of  terminologies like Producer, Subscriber, Kafka Brokers, Topic and Partitions. Here, I will be focusing on creating multiple Brokers in BigInsights then create a topic and publish the messages from command line and consumer getting it from the Broker.

Environment: BigInsights 4.2

 Step 1: Creating Kafka Brokers from Ambari

By default, Ambari will have one Kafka Broker configured.  Based on your usecase, you may need to create multiple brokers.

Login to Ambari UI --> Click on Host and add the Kafka Broker to the node where you need to install Broker.

 You can see multiple brokers running in Kafka UI.

Step 2: Create a Topic

Login to one of the node where broker is running.  Then create a topic.

cd /usr/iop/

su kafka -c "./ --create --zookeeper localhost:2181 --replication-factor 2 -partitions 1 --topic CustomerOrder"

You can get the details of the topic using the below describe command.

su kafka -c "./ --describe --zookeeper localhost:2181 --topic CustomerOrder"

Step 3: Start the Producer

In the argument --broker-list, pass all the brokers that are running.

su kafka -c "./ --broker-list, --topic CustomerOrder"

When you run the above command, it will be waiting for user input. You can pass a sample message

{"ID":99, "CUSTOMID":234,"ADDRESS":"12,5-7,westmead", "ORDERID":99, "ITEM":"iphone6", "COST":980}

Step 4: Start the Consumer

Open an other Linux terminal and start the consumer. It will display all the messages send to producer.

su kafka -c "./ --zookeeper localhost:2181 --from-beginning --topic CustomerOrder"


 Thus, We are able to configure and perfom a sample pub-sub system using Kafka.

Thursday, March 2, 2017

Configuring and Running Apache Phoenix in IBM BigInsights

This blog describes on Configuring and running the Phoenix from IBM BigInsights.

Apache Phoenix is an open source that provides SQL on HBase. Refer : 

Environment: BigInsights 4.2

Step 1: Configure Phoenix from Ambari

Login to Ambari UI, then go to HBase Configuration and enable the phoenix.

Save the changes and restart the HBase.

2) Validating the Phoenix

Login to Linux terminal as hbase user and run the below command. It will create the tables and do some select queries. You can see the output in the console.

cd /usr/iop/current/phoenix-client/bin

./ localhost:2181:/hbase-unsecure ../doc/examples/WEB_STAT.sql ../doc/examples/WEB_STAT.csv ../doc/examples/WEB_STAT_QUERIES.sql

3) Running Queries using Phoenix

This section focus on running some queries on Phoenix. Here I am focusing on some basic operations.

Open the Terminal and run the below commands

cd /usr/iop/current/phoenix-client/bin


Create the table then insert some rows and do a select on the table.


upsert into CUSTOMER_ORDER values (1,234,'11,5-7,westmead',99,'iphone7',1200);
upsert into CUSTOMER_ORDER values (2,288,'12,5-7,westmead',99,'iphone6',1000);
upsert into CUSTOMER_ORDER values (3,299,'13,5-7,westmead',99,'iphone5',600);

select * from CUSTOMER_ORDER;

If you like to know about other SQL Query syntax, refer

4) Bulk Loading the data to the table

Here, we are doing a bulk load to the above table.

Upload the data to HDFS

[root@test bin]#
[root@test bin]# hadoop fs -cat /tmp/importData.csv
[root@test bin]#

Run the import command from the terminal

sudo -u hbase hadoop jar ../phoenix-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table CUSTOMER_ORDER --input /tmp/importData.csv

Thus, we are able to configure and perform some basic Queries on Phoenix.

Monday, February 20, 2017

Create and Configure Separate Queue in YARN Capacity Scheduler for running Spark Jobs in IBM BigInsights

This blog talks on - How to create and configure separate queue in YARN Capacity Scheduler Queues for running the Spark jobs.

Environment : BigInsights 4.2

1) Create a queue for Spark from Yarm Queue Manager

Here I am allocating 50% of resources to default queue and rest 50% to Spark Jobs. You can configure the queues based on your use case. You can also create hierarchial queues also.

Login to Ambari UI and go to Yarn Queue Manager


 The default queue is configured to use 100% resources. You need to modify the Capacity and Max Capacity to 50%.

Save the changes by clicking the tick button as shown bellow.

Now, click on +Add Queue button and create a new queue for Spark Jobs.

Save and refresh the queues.

Open the Resource Manager UI and confirm the Queues configured.


2)Submit a Spark job to the queue

Login to the cluster and run the below commands to submit the job.

[root@cluster ~]# su hdfs

[hdfs@cluster root]$ cd /usr/iop/current/spark-client

[hdfs@cluster spark-client]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue sparkQueue  lib/spark-examples.jar 10

In the Yarn Resource Manager UI, you can see the job is running in the new queue

In the logs, you can see the output from the spark job.

Thus, you are able to run the Spark Jobs in different Queue.

Sunday, February 12, 2017

Running HDFS Word Count using Spark Streaming in IBM BigInsights

This blog talks on running a simple word count example to demonstrate Spark Streaming in IBM BigInsights.

Environment : IBM BigInsights 4.2

Step 1: Run the Spark Streaming word Count example for HDFS.

su hdfs

cd /usr/iop/current/spark-client

./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount --master yarn-client lib/spark-examples.jar /tmp/wordcount

 The above statement will be listening to the hdfs folder ( /tmp/wordcount ). Whenever a file is loaded to hdfs folder, it will do a word count and output it.

 Step2: Open another Linux terminal and run the below command as hdfs user.

echo "Hello - Date is `date`" | hadoop fs -put - /tmp/wordcount/test1.txt

In the Linux terminal in step 1, you can see the output of the word count.

The above example will help us to validate the Spark Streaming.

Wednesday, February 8, 2017

Spam classification using Spark MLlib in IBM BigInsights

This blog talks on classifying the SMS messages into Span and Ham using the Spark MLlib.

Environment : IBM BigInsights 4.2

Step 1:  Setup the dataset

We are using the dataset from UCI Machine Learning Repository - SMS Spam Collection Data Set. 

For more details refer -

Download the dataset -

Unzip and upload the file (SMSSpamCollection) to HDFS (/tmp).

 Step 2: Login to Spark Shell

su hdfs
cd /usr/iop/current/spark-client
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

Step 3: In Scala prompt, run below commands

# Read the dataset.
val inputRdd = sc.textFile("/tmp/SMSSpamCollection")

# Get the records that are Spam and Ham
val linesWithSpam = inputRdd.filter(line => line.contains("spam"))
val spam = x => x.split("\t")(1))

val linesWithHam = inputRdd.filter(line => line.contains("ham"))
val ham = x => x.split("\t")(1))

# Import the required mllib classes
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

# Convert the text to vector of 100 features based on term frequency.
var tf = new HashingTF(numFeatures = 100)
val spamFeatures = => tf.transform(email.split(" ")))
val hamFeatures = => tf.transform(email.split(" ")))

# Label the Spam as 1 and ham as 0.
val positiveExamples = features => LabeledPoint(1, features))
val negativeExamples = features => LabeledPoint(0, features))
val training_data = positiveExamples.union(negativeExamples)
# cache the training data

# We use 60% of dataset for training and remaining for testing the model.
val Array(trainset, testset) = training_data.randomSplit(Array(0.6, 0.4))

# We use Logistic Regression model, and make predictions with the resulting model
val lrLearner = new LogisticRegressionWithSGD()
val model =

val predictionLabel = x => (model.predict(x.features), x.label))

val accuracy = predictionLabel.filter(r => r._1 == r._2).count.toDouble / testset.count

println("Model accuracy : " + accuracy)

Thus, we are able to create and run the model to predict the Spam or Ham.