Tuesday, May 27, 2014

Map Reduce Program: Loading data from HDFS to HBase


This blog talks on - How to load the data from HDFS to HBase using the Map Reduce API.

Prerequisite:
1) Dataset used in this example: http://files.grouplens.org/datasets/movielens/ml-100k/u.user
2) Download the u.user filr and upload to HDFS.
3) Create an HBase Table - create 'User', 'usr'
4) Required jars : hadoop-core-2.2.0.jar & hbase-0.96.0.jar

This program loads the user profile from HDFS to HBast Table (User). The file u.user stores  demographic information abt the user in a tab separated list

user id | age | gender | occupation | zip code

Sample Data:
1|24|M|technician|85711
 2|24|M|Scientist|85712

In Mapper class, we get the data using org.apache.hadoop.io.Text, Split it using the | and use Put API (org.apache.hadoop.hbase.client.Put) to put it to HBase Table.

Mapper Code:

package com.test.HBaseLoadFromHDFS;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseLoadFromHDFSMapper extends
        Mapper<IntWritable, Text, ImmutableBytesWritable, Put> {

    @Override
    protected void map(
            IntWritable key,
            Text value,
            org.apache.hadoop.mapreduce.Mapper<IntWritable, Text, ImmutableBytesWritable, Put>.Context context)
            throws IOException, InterruptedException {

        String line = value.toString();
        // Sample line is - 1|24|M|technician|85711 (user id | age | gender |
        // occupation | zip code )
        String fields[] = line.split("|");
      
        byte row[] = Bytes.toBytes(Integer.parseInt(fields[0]));
      
        Put put = new Put(row);
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("age"),
                Bytes.toBytes(Integer.parseInt(fields[1])));
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("gender"),
                Bytes.toBytes(fields[2]));
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("occupation"),
                Bytes.toBytes(fields[3]));
        put.add(Bytes.toBytes("usr"), Bytes.toBytes("zipCode"),
                Bytes.toBytes(Integer.parseInt(fields[4])));

        context.write(new ImmutableBytesWritable(row), put);

    }

}
We do not need a Reducer for this use case. We mention the HDFS Path for u.usr file and HBase Table as part of the Main code.

Main Code:

package com.test.HBaseLoadFromHDFS;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Loads the data from CSV to HBase table
 *
 * @author Nisanth Simon
 *
 */
public class HBaseLoadFromHDFSMain {

    public static void main(String arg[]) throws Exception {

        String hdfsInputPath = "/home/usr1/u.user";

        Configuration config = new Configuration();
        Job job = new Job(config, "Loading the User Profile");
        job.setJarByClass(HBaseLoadFromHDFSMain.class);

        FileInputFormat.setInputPaths(job, new Path(hdfsInputPath));

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setMapperClass(HBaseLoadFromHDFSMapper.class);

        TableMapReduceUtil.initTableReducerJob("User", null, job);

        job.setNumReduceTasks(0);

        job.waitForCompletion(true);

    }

}
In my next Blog, I will cover How to copy data from one HBase Table to other HBase Table


No comments: