This blog talks on - How to load the data from HDFS to HBase using the Map Reduce API.
Prerequisite:
1) Dataset used in this example: http://files.grouplens.org/datasets/movielens/ml-100k/u.user
2) Download the u.user filr and upload to HDFS.
3) Create an HBase Table - create 'User', 'usr'
4) Required jars : hadoop-core-2.2.0.jar & hbase-0.96.0.jar
This program loads the user profile from HDFS to HBast Table (User). The file u.user stores demographic information abt the user in a tab separated list
user id | age | gender | occupation | zip code
Sample Data:
1|24|M|technician|85711
2|24|M|Scientist|85712
2|24|M|Scientist|85712
In Mapper class, we get the data using org.apache.hadoop.io.Text, Split it using the | and use Put API (org.apache.hadoop.hbase.client.Put) to put it to HBase Table.
Mapper Code:
package com.test.HBaseLoadFromHDFS;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class HBaseLoadFromHDFSMapper extends
Mapper<IntWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(
IntWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<IntWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// Sample line is - 1|24|M|technician|85711 (user id | age | gender |
// occupation | zip code )
String fields[] = line.split("|");
byte row[] = Bytes.toBytes(Integer.parseInt(fields[0]));
Put put = new Put(row);
put.add(Bytes.toBytes("usr"), Bytes.toBytes("age"),
Bytes.toBytes(Integer.parseInt(fields[1])));
put.add(Bytes.toBytes("usr"), Bytes.toBytes("gender"),
Bytes.toBytes(fields[2]));
put.add(Bytes.toBytes("usr"), Bytes.toBytes("occupation"),
Bytes.toBytes(fields[3]));
put.add(Bytes.toBytes("usr"), Bytes.toBytes("zipCode"),
Bytes.toBytes(Integer.parseInt(fields[4])));
context.write(new ImmutableBytesWritable(row), put);
}
}
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class HBaseLoadFromHDFSMapper extends
Mapper<IntWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(
IntWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<IntWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// Sample line is - 1|24|M|technician|85711 (user id | age | gender |
// occupation | zip code )
String fields[] = line.split("|");
byte row[] = Bytes.toBytes(Integer.parseInt(fields[0]));
Put put = new Put(row);
put.add(Bytes.toBytes("usr"), Bytes.toBytes("age"),
Bytes.toBytes(Integer.parseInt(fields[1])));
put.add(Bytes.toBytes("usr"), Bytes.toBytes("gender"),
Bytes.toBytes(fields[2]));
put.add(Bytes.toBytes("usr"), Bytes.toBytes("occupation"),
Bytes.toBytes(fields[3]));
put.add(Bytes.toBytes("usr"), Bytes.toBytes("zipCode"),
Bytes.toBytes(Integer.parseInt(fields[4])));
context.write(new ImmutableBytesWritable(row), put);
}
}
We do not need a Reducer for this use case. We mention the HDFS Path for u.usr file and HBase Table as part of the Main code.
Main Code:
package com.test.HBaseLoadFromHDFS;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Loads the data from CSV to HBase table
*
* @author Nisanth Simon
*
*/
public class HBaseLoadFromHDFSMain {
public static void main(String arg[]) throws Exception {
String hdfsInputPath = "/home/usr1/u.user";
Configuration config = new Configuration();
Job job = new Job(config, "Loading the User Profile");
job.setJarByClass(HBaseLoadFromHDFSMain.class);
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Loads the data from CSV to HBase table
*
* @author Nisanth Simon
*
*/
public class HBaseLoadFromHDFSMain {
public static void main(String arg[]) throws Exception {
String hdfsInputPath = "/home/usr1/u.user";
Configuration config = new Configuration();
Job job = new Job(config, "Loading the User Profile");
job.setJarByClass(HBaseLoadFromHDFSMain.class);
FileInputFormat.setInputPaths(job, new Path(hdfsInputPath));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(HBaseLoadFromHDFSMapper.class);
TableMapReduceUtil.initTableReducerJob("User", null, job);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
In my next Blog, I will cover How to copy data from one HBase Table to other HBase Table