Sunday, March 5, 2017

Configuring and Running Apache Kafka in IBM BigInsights

This blog describes on Configuring and running the Kafka from IBM BigInsights.

Apache Kafka is an open source that provides a publish-subscribe model for messaging system. Refer :

I assume that you were aware of  terminologies like Producer, Subscriber, Kafka Brokers, Topic and Partitions. Here, I will be focusing on creating multiple Brokers in BigInsights then create a topic and publish the messages from command line and consumer getting it from the Broker.

Environment: BigInsights 4.2

 Step 1: Creating Kafka Brokers from Ambari

By default, Ambari will have one Kafka Broker configured.  Based on your usecase, you may need to create multiple brokers.

Login to Ambari UI --> Click on Host and add the Kafka Broker to the node where you need to install Broker.

 You can see multiple brokers running in Kafka UI.

Step 2: Create a Topic

Login to one of the node where broker is running.  Then create a topic.

cd /usr/iop/

su kafka -c "./ --create --zookeeper localhost:2181 --replication-factor 2 -partitions 1 --topic CustomerOrder"

You can get the details of the topic using the below describe command.

su kafka -c "./ --describe --zookeeper localhost:2181 --topic CustomerOrder"

Step 3: Start the Producer

In the argument --broker-list, pass all the brokers that are running.

su kafka -c "./ --broker-list, --topic CustomerOrder"

When you run the above command, it will be waiting for user input. You can pass a sample message

{"ID":99, "CUSTOMID":234,"ADDRESS":"12,5-7,westmead", "ORDERID":99, "ITEM":"iphone6", "COST":980}

Step 4: Start the Consumer

Open an other Linux terminal and start the consumer. It will display all the messages send to producer.

su kafka -c "./ --zookeeper localhost:2181 --from-beginning --topic CustomerOrder"


 Thus, We are able to configure and perfom a sample pub-sub system using Kafka.

Thursday, March 2, 2017

Configuring and Running Apache Phoenix in IBM BigInsights

This blog describes on Configuring and running the Phoenix from IBM BigInsights.

Apache Phoenix is an open source that provides SQL on HBase. Refer : 

Environment: BigInsights 4.2

Step 1: Configure Phoenix from Ambari

Login to Ambari UI, then go to HBase Configuration and enable the phoenix.

Save the changes and restart the HBase.

2) Validating the Phoenix

Login to Linux terminal as hbase user and run the below command. It will create the tables and do some select queries. You can see the output in the console.

cd /usr/iop/current/phoenix-client/bin

./ localhost:2181:/hbase-unsecure ../doc/examples/WEB_STAT.sql ../doc/examples/WEB_STAT.csv ../doc/examples/WEB_STAT_QUERIES.sql

3) Running Queries using Phoenix

This section focus on running some queries on Phoenix. Here I am focusing on some basic operations.

Open the Terminal and run the below commands

cd /usr/iop/current/phoenix-client/bin


Create the table then insert some rows and do a select on the table.


upsert into CUSTOMER_ORDER values (1,234,'11,5-7,westmead',99,'iphone7',1200);
upsert into CUSTOMER_ORDER values (2,288,'12,5-7,westmead',99,'iphone6',1000);
upsert into CUSTOMER_ORDER values (3,299,'13,5-7,westmead',99,'iphone5',600);

select * from CUSTOMER_ORDER;

If you like to know about other SQL Query syntax, refer

4) Bulk Loading the data to the table

Here, we are doing a bulk load to the above table.

Upload the data to HDFS

[root@test bin]#
[root@test bin]# hadoop fs -cat /tmp/importData.csv
[root@test bin]#

Run the import command from the terminal

sudo -u hbase hadoop jar ../phoenix-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table CUSTOMER_ORDER --input /tmp/importData.csv

Thus, we are able to configure and perform some basic Queries on Phoenix.

Monday, February 20, 2017

Create and Configure Separate Queue in YARN Capacity Scheduler for running Spark Jobs in IBM BigInsights

This blog talks on - How to create and configure separate queue in YARN Capacity Scheduler Queues for running the Spark jobs.

Environment : BigInsights 4.2

1) Create a queue for Spark from Yarm Queue Manager

Here I am allocating 50% of resources to default queue and rest 50% to Spark Jobs. You can configure the queues based on your use case. You can also create hierarchial queues also.

Login to Ambari UI and go to Yarn Queue Manager


 The default queue is configured to use 100% resources. You need to modify the Capacity and Max Capacity to 50%.

Save the changes by clicking the tick button as shown bellow.

Now, click on +Add Queue button and create a new queue for Spark Jobs.

Save and refresh the queues.

Open the Resource Manager UI and confirm the Queues configured.


2)Submit a Spark job to the queue

Login to the cluster and run the below commands to submit the job.

[root@cluster ~]# su hdfs

[hdfs@cluster root]$ cd /usr/iop/current/spark-client

[hdfs@cluster spark-client]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue sparkQueue  lib/spark-examples.jar 10

In the Yarn Resource Manager UI, you can see the job is running in the new queue

In the logs, you can see the output from the spark job.

Thus, you are able to run the Spark Jobs in different Queue.

Sunday, February 12, 2017

Running HDFS Word Count using Spark Streaming in IBM BigInsights

This blog talks on running a simple word count example to demonstrate Spark Streaming in IBM BigInsights.

Environment : IBM BigInsights 4.2

Step 1: Run the Spark Streaming word Count example for HDFS.

su hdfs

cd /usr/iop/current/spark-client

./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount --master yarn-client lib/spark-examples.jar /tmp/wordcount

 The above statement will be listening to the hdfs folder ( /tmp/wordcount ). Whenever a file is loaded to hdfs folder, it will do a word count and output it.

 Step2: Open another Linux terminal and run the below command as hdfs user.

echo "Hello - Date is `date`" | hadoop fs -put - /tmp/wordcount/test1.txt

In the Linux terminal in step 1, you can see the output of the word count.

The above example will help us to validate the Spark Streaming.

Wednesday, February 8, 2017

Spam classification using Spark MLlib in IBM BigInsights

This blog talks on classifying the SMS messages into Span and Ham using the Spark MLlib.

Environment : IBM BigInsights 4.2

Step 1:  Setup the dataset

We are using the dataset from UCI Machine Learning Repository - SMS Spam Collection Data Set. 

For more details refer -

Download the dataset -

Unzip and upload the file (SMSSpamCollection) to HDFS (/tmp).

 Step 2: Login to Spark Shell

su hdfs
cd /usr/iop/current/spark-client
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

Step 3: In Scala prompt, run below commands

# Read the dataset.
val inputRdd = sc.textFile("/tmp/SMSSpamCollection")

# Get the records that are Spam and Ham
val linesWithSpam = inputRdd.filter(line => line.contains("spam"))
val spam = x => x.split("\t")(1))

val linesWithHam = inputRdd.filter(line => line.contains("ham"))
val ham = x => x.split("\t")(1))

# Import the required mllib classes
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

# Convert the text to vector of 100 features based on term frequency.
var tf = new HashingTF(numFeatures = 100)
val spamFeatures = => tf.transform(email.split(" ")))
val hamFeatures = => tf.transform(email.split(" ")))

# Label the Spam as 1 and ham as 0.
val positiveExamples = features => LabeledPoint(1, features))
val negativeExamples = features => LabeledPoint(0, features))
val training_data = positiveExamples.union(negativeExamples)
# cache the training data

# We use 60% of dataset for training and remaining for testing the model.
val Array(trainset, testset) = training_data.randomSplit(Array(0.6, 0.4))

# We use Logistic Regression model, and make predictions with the resulting model
val lrLearner = new LogisticRegressionWithSGD()
val model =

val predictionLabel = x => (model.predict(x.features), x.label))

val accuracy = predictionLabel.filter(r => r._1 == r._2).count.toDouble / testset.count

println("Model accuracy : " + accuracy)

Thus, we are able to create and run the model to predict the Spam or Ham.

Wednesday, October 5, 2016

Java JDBC Application to connect to IBM BigSQL or IBM DB2 in a Kerberos Cluster

This blog provides the complete code to connect to the IBM BigSQL Database in a Kerberos Cluster. The same code works for IBM DB2 also.

In the below code, we do not pass the database User/Passowd to connect to Database. Here, we use the Kerberos Authentication to connect to Database.

1) Java - JDBC Code

public class KerberosTest {

       public static void main(String[] args) {
             if (null != args && args.length != 2) {
                    throw new IllegalArgumentException(
                                 "Invalid arguments. " +
                                 "Specify headnode hostname/ip " +
                                 "& database port");

             String ServerName = args[0];
             int PortNumber = Integer.parseInt(args[1]);
             String DatabaseName = "bigsql";

             java.util.Properties properties = new java.util.Properties();
             // 11 is the integer value for kerberos
             properties.put("securityMechanism", new String("11"));
             // Provide the Kerberos Principal

             String url = String.format("jdbc:db2://%s:%d/%s", ServerName,
                           PortNumber, DatabaseName);

             java.sql.Connection con = null;
             try {
             } catch (Exception e) {
                    System.out.println("Error: " +
                                 "failed to load Db2 jcc driver.");

             try {
                    System.out.println("url: " + url);
                    con = java.sql.DriverManager.
                                 getConnection(url, properties);
                    java.sql.Statement s2 = con.createStatement();

                    try {
                           s2.executeUpdate("drop table t1");
                           s2.executeUpdate("drop table tbint");

                           System.out.println("Drop Hadoop & DB2 " +
                                        "tables successfull!!!");
                    } catch (Exception e) {
                           System.out.println("drop is failing");

                    try {
                           // Create DB2 Table
                           s2.executeUpdate("create table t1 (c1 int)");
                           // Create BigSQL Table
                                        "create hadoop table " +
                                        "if not exists "+
                                        "tbint " +
                                        "(col1 INT, col2 INT, col3 INT)");

                                        "Created Hadoop & DB2 tables " +
                    } catch (Exception e) {
                           System.out.println("create is failing");

                    // Insert to DB2 Table
                    String str = "insert into t1 values (100)";

                    // Query the DB2 Table
                    java.sql.PreparedStatement ps = con
                                 .prepareStatement("select * from t1");
                    java.sql.ResultSet rs = ps.executeQuery();

                    while ( {

                    // Insert to BIGSQL Table
                    str = "insert into tbint values(1,2,3),(1,2,3),(1,2,3)";

                    // Query the BIGSQL Table
                    ps = con.prepareStatement("select * from tbint");
                    rs = ps.executeQuery();

                    while ( {
                           System.out.printf("%s,%s,%s", rs.getString(1),
                                        rs.getString(2), rs.getString(3));

             } catch (Exception e) {

To complie the above code, ensure the DB2 jars - db2jcc_license_cu.jar, db2jcc.jar & db2jcc4.jar in the classpath.

2) Testing the JDBC Application in Kerberos Cluster

Do the Kinit and get the valid ticket.

[biadmin@btestserver ~]$
[biadmin@btestserver ~]$ date
Wed Oct  5 16:50:18 PDT 2016
[biadmin@btestserver ~]$
[biadmin@btestserver ~]$
[biadmin@btestserver ~]$ /opt/ibm/biginsights/jdk/jre/bin/kinit -k -t /opt/ibm/biginsights/conf/security/keytabs/biadmin.keytab biadmin/
New ticket is stored in cache file /home/biadmin/krb5cc_biadmin
[biadmin@btestserver ~]$
[biadmin@btestserver ~]$
[biadmin@btestserver ~]$ klist -c krb5cc_biadmin
Ticket cache: FILE:krb5cc_biadmin
Default principal: biadmin/

Valid starting     Expires            Service principal
10/05/16 16:50:22  10/06/16 16:50:22  krbtgt/IBM.COM@IBM.COM
[biadmin@btestserver ~]$
[biadmin@btestserver ~]$

Run the Java code.

[biadmin@btestserver ~]$
[biadmin@btestserver ~]$ java -jar /opt/nisan/BigSQL_jdbc_kerberos.jar "" 51000
url: jdbc:db2://
Drop Hadoop & DB2 tables successfull!!!
Created Hadoop & DB2 tables successfully!!!
[biadmin@btestserver ~]$

3) Verifying the records inserted to the table

[biadmin@btestserver ~]$
[biadmin@btestserver ~]$ su bigsql
[bigsql@btestserver biadmin]$
[bigsql@btestserver biadmin]$ db2 "connect to bigsql"

   Database Connection Information

 Database server        = DB2/LINUXX8664 10.6.2
 SQL authorization ID   = BIGSQL
 Local database alias   = BIGSQL

[bigsql@btestserver biadmin]$
[bigsql@btestserver biadmin]$ db2 "select * from biadmin.t1"

  1 record(s) selected.

[bigsql@btestserver biadmin]$
[bigsql@btestserver biadmin]$ db2 "select * from biadmin.tbint"
COL1        COL2        COL3
----------- ----------- -----------
          1           2           3
          1           2           3
          1           2           3

  3 record(s) selected.

[bigsql@btestserver biadmin]$

I will be covering the JDBC Application to connect to Apache Hive in my next blog.

Wednesday, July 13, 2016

Identifying Emotions from tweets for a Social Event using R in IBM Data Scientist Workbench

This blog describes the steps on - How to identify the emotions from tweets for an event. As an example, I have taken the event "Britian Exit".  I am using IBM Data Scientist Workbench, that is an online platform where you can directly execute R statements using RStudio. I would recommend you to use the  Data Scientist Workbench because it has RStudio and Apache Spark in a single platform without installing it, in your machine.

The blog covers

1) Setting up the infrastructure IBM Data Scientist Workbench
2) Setting up Twitter App for OAuth.
3) Connecting to Twitter using R
4) Cleansing the tweets
5) Identifying the emotions
6) Plot the graph with emotions & tweets

1) Setting up the infrastructure - IBM Data Scientist Workbench

Create an online account in

Login to Data Scientist Workbench and click on RStudio IDE



2) Setting up Twitter App for OAuth

Login to  to and create an Application.

Open the "Keys and Access Tokens" Tab and get the Consumer Key, Consumer Secret, Access Token, Access Token Secret.

 3) Connecting to Twitter using R

We are using existing R Libraries to connect to Twitter. Install the required libraries


Below statements will load the libraries & Connect to twitter. We are doing a search for some popular hashtags.

> library("twitteR")
> library("ROAuth") 
> dir.create(file.path("/resources", "BRExit"), mode = "0777")
> setwd("/resources/BRExit")
> getwd()
[1] "/resources/BRExit"
> download.file(url="", destfile="cacert.pem")
trying URL ''
Content type 'unknown' length 250607 bytes (244 KB)
downloaded 244 KB

>## Provide the proper keys & token got from step 2.
> consumer_key <- 'xxxxxxxxxxx'
> consumer_secret <- 'xxxxxxxxxxxxxxxxxxxxxxx'
> access_token <- 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
> access_secret <- 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
> twiter_handle <- setup_twitter_oauth(consumer_key,consumer_secret,access_token,access_secret)
[1] "Using direct authentication"
Use a local file ('.httr-oauth'), to cache OAuth access credentials between R sessions?

1: Yes
2: No

Selection: 1
Adding .httr-oauth to .gitignore

Now, we query the Twitter to get the tweets related to hashtags.

> search_term <- "#brexit,#strongerin,#yes2eu"
> search_term <- unlist(strsplit(search_term,","))
> tweets = list()
> ## You may get warning because of Twitter rate limit.
> ## If there is many hashtags, then you may need to stop it after sometime since
> ## Twitter will impose the Rate Limit and you will be getting the exception to 
> ## getting the tweets. 
> for(i in 1:length(search_term)){
     tweets <- c(tweets,result)
     tweets <- unique(tweets)
Warning message:
In doRppAPICall("search/tweets", n, params = params, retryOnRateLimit = retryOnRateLimit,  :
  1500 tweets were requested but the API can only return 107
> ## Collected 3107 tweets  
> length(tweets)
[1] 3107
> ## Display 5 tweets
> head(tweets)
[1] "AAPLRoom: AAPL Trading Room Newsletter is out! #trading #brexit"

[1] "DVATW: If #TheresaMay becomes PM then the hope and optimism of a fast clean #Brexit is over."

[1] "melvine: RT @RoubiniGlobal: #Brexit Circus: Which branch of the #EU line will the UK take? #MindTheGap"

Save the tweets to a file for other types of analytics on the data.

> file<-NULL
> if (file.exists("tweetsBRExit.csv")){file<- read.csv("tweetsBRExit.csv")}
> df <-"rbind", lapply(tweets,
> df<-rbind(df,file)
> df <- df[!duplicated(df[c("id")]),]
> write.csv(df,file="tweetsBRExit.csv",row.names=FALSE)

You could see the tweet is exported to /resources/BRExit/tweetsBRExit.csv. It has various informations that can be helpfull for building another insights like influencers, geo influence etc.

I am sharing the column names for your reference.


4) Cleansing the tweets 

We do some cleansing of tweets like removing the whitespace, numbers, punctuatoions etc.

> library(tm)
Loading required package: NLP
> twitter_brexit_df = twListToDF(tweets)
> r_text_corpus <- Corpus(VectorSource(twitter_brexit_df$text))
> r_text_cleansing <- tm_map(r_text_corpus, stripWhitespace)
> r_text_cleansing <- tm_map(r_text_cleansing, removeNumbers)
> r_text_cleansing <- tm_map(r_text_cleansing, removePunctuation)
> r_text_cleansing <- tm_map(r_text_cleansing, content_transformer(tolower))

5) Identifying the emotions

We are using the Library syuzhet to identify the emotions of the tweets. The get_nrc_sentiment implements Saif Mohammad’s NRC Emotion lexicon

> install.packages("syuzhet")
> library(syuzhet)
> isNull <- function(data) {
> text_vec = c()
> anger = c() ; anticipation=c() ; disgust=c() ; fear=c() ; joy=c() ;
> sadness=c() ; surprise=c() ; rust=c() ; nrc_negative=c() ; nrc_positive=c();
> for(i in 1:length(r_text_cleansing)){
     text <- lapply(r_text_cleansing[i], as.character)
     text <- gsub("http\\w+", "", text)
     nrc_emotions <- get_nrc_sentiment(text)
     text_vec[i] <- text
     anger[i] <- isNull(nrc_emotions$anger)
     anticipation[i] <- isNull(nrc_emotions$anticipation)
     disgust[i] <- isNull(nrc_emotions$disgust)
     fear[i] <- isNull(nrc_emotions$fear)
     joy[i] <- isNull(nrc_emotions$joy)
     sadness[i] <- isNull(nrc_emotions$sadness)
     surprise[i] <- isNull(nrc_emotions$surprise)
     rust[i] <- isNull(nrc_emotions$rust)
     nrc_negative[i] <- isNull(nrc_emotions$negative)
     nrc_positive[i] <- isNull(nrc_emotions$positive)
> nrc_df <- data.frame(text_vec,anger,anticipation,disgust,fear,joy,sadness,surprise,
> nrc_df[1:2,2:11]
  anger anticipation disgust fear joy sadness surprise rust nrc_negative nrc_positive
1     0            0       0    0   0       0        0    0            0            0
2     0            2       0    0   3       0        2    0            0            3

Plot the graph with Emotions & Tweets

Plot the graph in R.

> par(mar=c(5.1,5,4.1,2.1))
> barplot(
     sort(colSums(prop.table(nrc_df[, 2:9]))), 
     horiz = TRUE, 
     cex.names = 0.7,
     las = 1, 
     main = "Emotions for Britian Exit", 

I was tracking the tweets for last couple of weeks and I could see the change in emotions as time progressed.