Thursday, October 1, 2015

Building a Twitter Application to collect the tweets on Hadoop

This blog talks about building a BigInsights Application to get the real-time Twitter Feeds.

Step 1: Register the application in Twitter and generate the OAuth Keys

a) Login to https://apps.twitter.com/app/new and create an Application





b) Open the "Keys and Access Tokens" Tab and get the Consumer Key and Consumer Secret



c) In the same page, click the button "Create my access token" to generate the access token






We will be using the Consumer Key / Consumer Secret and Access Token / Access Token Secret in our BigInsights Applications to get the feeds from Twitter.

Step 2: Building a BigInsights Application to get the Feeds from Twitter.

a) Create a BigInsights Project - TwitterSearchApp.

Refer http://big-analytics.blogspot.com.au/2015/07/building-web-crawler-in-ibm-biginsights.html to install Text Analytics Plugin in Eclipse.




b)  I am using twitter4j API to connect to Twitter. Download twitter4j-core-*.jar from http://twitter4j.org

Put the twitter4j-core-*.jar under /TwitterSearchApp/BIApp/workflow/lib/twitter4j-core-4.0.4.jar




c) Create a java file TweetReader.java under src folder in package com.test.reader.





Content of TweetReader.java



package com.test.reader;

import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import twitter4j.JSONObject;
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.conf.ConfigurationBuilder;

public class TweetReader {

    Twitter twitter;

    /**
     * Get the Twitter Instance
     */
    public Twitter getConnection(String consumerKey, String consumerSecret,
            String accessToken, String accessTokenSecret, String httpProxyHost,
            int httpProxyPort) throws Exception {
        if (twitter == null) {
            ConfigurationBuilder cb = new ConfigurationBuilder();
            cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
                    .setOAuthConsumerSecret(consumerSecret)
                    .setOAuthAccessToken(accessToken)
                    .setOAuthAccessTokenSecret(accessTokenSecret)
                    .setHttpProxyHost(httpProxyHost)
                    .setHttpProxyPort(httpProxyPort);
            TwitterFactory tf = new TwitterFactory(cb.build());
            twitter = tf.getInstance();
        }
        return twitter;
    }

    // Get the Tweets based on the search Term
    private List<Status> getTweets(String keyword) throws Exception {
        List<Status> tweets = new ArrayList<Status>();
        List<Status> temp = null;
        QueryResult result = null;
        Query query = new Query(keyword);
        query.setLang("en");

        int i = 2;
        do {
            result = this.twitter.search(query);
            temp = result.getTweets();
            tweets.addAll(temp);
            i++;

        } while ((query = result.nextQuery()) != null && i < 5);
        return tweets;
    }

    // Convert the Tweet Object to JSON Array
    public String getTweets(String[] keywordArray) throws Exception {
        StringBuffer buff = new StringBuffer();
        buff.append("[");
        int start = 0;
        for (String keyword : keywordArray) {

            Thread.sleep(1L);
            List<Status> actTemp = getTweets(keyword);

            // Remove the duplicate tweets
            Set<Status> temp = new HashSet<Status>();
            for (Status tweet : actTemp) {
                temp.add(tweet);
            }

            for (Status tweet : temp) {
                String text = tweet.getText();

                if (start != 0) {
                    buff.append(",\n");
                }
                ++start;

                JSONObject obj = new JSONObject();
                obj.put("CreatedAt", tweet.getCreatedAt().toString());
                obj.put("User", tweet.getUser().getScreenName());
                obj.put("Text", text);
                obj.put("RetweetCount", tweet.getRetweetCount());
                obj.put("FriendsCount", tweet.getUser().getFriendsCount());
                obj.put("FollowersCount", tweet.getUser().getFollowersCount());
                obj.put("Id", new Long(tweet.getId()));
                obj.put("InfluencerScore",
                        (tweet.getUser().getFollowersCount() / tweet.getUser()
                                .getFriendsCount()));
                String rec = obj.toString();

                buff.append(rec);
            }

        }
        buff.append("]");

        return buff.toString();
    }

    // Write the file to HDFS
    private void writeOutput(Configuration conf, String hdfsPath, String content)
            throws Exception {
        FileSystem fs = FileSystem.get(conf);

        FSDataOutputStream fos = fs.create(new Path(hdfsPath));
        OutputStream out = fos.getWrappedStream();
        out.write(content.getBytes());

        fos.close();
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] keywordArray = args[0].split("\n");
        String hdfsPath = args[1];
        String consumerKey = args[2];
        String consumerSecret = args[3];
        String accessToken = args[4];
        String accessTokenSecret = args[5];
        String proxyIP = args[6];
        String proxyPort = args[7];

        TweetReader reader = new TweetReader();
        reader.getConnection(consumerKey, consumerSecret, accessToken,
                accessTokenSecret, proxyIP, Integer.parseInt(proxyPort));
        String content = reader.getTweets(keywordArray);
        reader.writeOutput(conf, hdfsPath, content);

    }

}




d) Edit the /TwitterSearchApp/BIApp/application/application.xml


<application-template xmlns="http://biginsights.ibm.com/application" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <name>TwitterSearch</name>
    <properties>
        <property isInputPath="false" isOutputPath="false" isRequired="true" label="Search" name="keywordArray" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="true" label="Output Directory" name="hdfsPath" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="true" label="consumerKey" name="consumerKey" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="true" label="consumerSecret" name="consumerSecret" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="true" label="accessToken" name="accessToken" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="true" label="accessTokenSecret" name="accessTokenSecret" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="false" label="proxyIP" name="proxyIP" paramtype="STRING" uitype="textfield"/>
        <property isInputPath="false" isOutputPath="false" isRequired="false" label="proxyPort" name="proxyPort" paramtype="STRING" uitype="textfield"/>
    </properties>
    <assets>
        <asset id="TwitterSearch" type="WORKFLOW"/>
    </assets>
    <imagePath>n-mark-icon.jpg</imagePath>
</application-template>





e) Update the file /TwitterSearchApp/BIApp/workflow/config-default.xml with below details

<configuration>
    <property>
        <name>keywordArray</name>
        <value>null</value>
    </property>
    <property>
        <name>hdfsPath</name>
        <value>null</value>
    </property>
    <property>
        <name>consumerKey</name>
        <value>null</value>
    </property>
    <property>
        <name>consumerSecret</name>
        <value>null</value>
    </property>
    <property>
        <name>accessToken</name>
        <value>null</value>
    </property>
    <property>
        <name>accessTokenSecret</name>
        <value>null</value>
    </property>
    <property>
        <name>proxyIP</name>
        <value>null</value>
    </property>
    <property>
        <name>proxyPort</name>
        <value>null</value>
    </property>
</configuration>




f) Update the /TwitterSearchApp/BIApp/workflow/workflow.xml

<workflow-app name="wfapp" xmlns="uri:oozie:workflow:0.2">
    <start to="tweetSearch"/>
    <!-- add actions here -->
    <action name='tweetSearch'>
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>default</value>
                </property>
            </configuration>
            <main-class>com.test.reader.TweetReader</main-class>
            <arg>${keywordArray}</arg>
            <arg>${hdfsPath}</arg>
            <arg>${consumerKey}</arg>
            <arg>${consumerSecret}</arg>
            <arg>${accessToken}</arg>
            <arg>${accessTokenSecret}</arg>
            <arg>${proxyIP}</arg>
            <arg>${proxyPort}</arg>
        </java>
        <ok to="end"/>
        <error to="kill"/>
    </action>
   
    <kill name="kill">
        <message>error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>





e) Publish the Applications to BigInsights Cluster. Refer http://big-analytics.blogspot.com.au/2015/07/building-web-crawler-in-ibm-biginsights.html to publish the Application.


Step 3: Running the Twitter Application from BigInsights Cluster

a) Deploy the published Application in the Cluster



The Search Parameter is used to define the keywords to be searched in Twitter.

Provide Consumer Key / Consumer Secret and Access Token / Access Token Secret.

Provide the ProxyIP & ProxyPort details to connect to Internet from your Cluster.




Click Run

After completing the Job, open the newly generated file  as sheets and File Reader as JSON Array to see the output.





The above example, we used the Twitter Rest API to get the tweets from Twitter. There are some API Rate Limitations per User or per Application - Refer https://dev.twitter.com/rest/public/rate-limiting. If you want to avoid these limitations you can consider Gnip or DataSift.


No comments: