注意,我们首页需要在命令行中启动Hbase(start-hbase.sh
)。
不要直接复制,最好手敲一遍哦!可以加深一下印象哈。
第一关: HBase的MapReduce快速入门
package com.processdata;
import java.io.IOException;
import java.util.List;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apdplat.word.WordSegmenter;
import org.apdplat.word.segmentation.Word;
import com.util.HBaseUtil;
import com.vdurmont.emoji.EmojiParser;
/**
* 词频统计
*
*/
public class WorldCountMapReduce extends Configured implements Tool {
private static class MyMapper extends TableMapper<Text, IntWritable> {
private static byte[] family = "comment_info".getBytes();
private static byte[] column = "content".getBytes();
@Override
protected void map(ImmutableBytesWritable rowKey, Result result, Context context) {
try {
byte[] value = result.getValue(family, column);
String content = new String(value, "utf-8");
String[] split = content.split(" ");
for (String str : split) {
Text text = new Text(str);
IntWritable v = new IntWritable(1);
context.write(text, v);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
private static byte[] family = "word_info".getBytes();
private static byte[] column = "count".getBytes();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(family, column, Bytes.toBytes(sum));
try {
context.write(null, put);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
public int run(String[] args) throws Exception {
// 配置Job
/********** Begin *********/
// 创建Conf对象
Configuration conf = HBaseConfiguration.create(getConf());
String tablename = args[0]; // 表名
String targetTable = args[1]; // 目标表
// 获取到Job对象
Job job = Job.getInstance(conf);
// 创建Scan对象
Scan scan = new Scan();
// 通过Hbase工具类提交数据
TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);
// 开始提交数据
job.waitForCompletion(true);
return 0;
/********** End *********/
}
}
第二关:HBase的MapReduce使用
package com.processdata;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
/**
* 词频统计
*
*/
public class WorldCountMapReduce2 extends Configured implements Tool {
private static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
public void map(Object object, Text value, Context context) throws IOException, InterruptedException {
/********** Begin *********/
// 根据题意,我们需要根据空格对指定数据进行拆分
String[] split = value.toString().split(" ");
// 循环数组,对值进行分类
for (String str : split) {
Text text = new Text(str.getBytes());
IntWritable v = new IntWritable(1);
context.write(text, v);
}
/********** End *********/
}
}
private static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
private static byte[] family = "word_info".getBytes();
private static byte[] column = "count".getBytes();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
/********** Begin *********/
int sum = 0; // 用于统计
// 循环Map中分类的值,进行相加
for (IntWritable value : values) {
sum += value.get();
}
// 将key和value进行聚和
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(family, column, Bytes.toBytes(sum));
// 通过文件方式将其输出
context.write(null, put);
/********** End *********/
}
}
@Override
public int run(String[] args) throws Exception {
// 配置Job
/********** Begin *********/
// 配置
Configuration conf = HBaseConfiguration.create(getConf());
String file = args[0]; // 输入文件
String targetTable = args[1]; // 输出表
Job job = Job.getInstance(conf);
// Map的Key的输入类型
job.setMapOutputKeyClass(Text.class);
// Map的Value的输入类型
job.setMapOutputValueClass(IntWritable.class);
// 需要执行的MapReduce类
job.setJarByClass(WorldCountMapReduce2.class);
// 文件输入格式
FileInputFormat.addInputPath(job, new Path(file));
// 设置Mapper类
job.setMapperClass(MyMapper.class);
// 开始执行任务
TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);
job.waitForCompletion(true);
return 0;
/********** End *********/
}
}
13 条评论
一起加油哦
要得,要得φ( ̄∇ ̄o)
这是哪个大佬哦,我怎么没看见过这个头像。哈哈
别别别,我可承受不起。
建议博主增加木马课堂上的作业
哈哈,建议好好学习哦。
谢谢楼主的代码,粘贴复制可快乐了
不知道偷偷复制啊 说出来干什么 ୧(๑•̀⌄•́๑)૭
不要复制粘贴呀。再这样,我以后就继续用图片了
哇~ 好厉害呀~
低调呀。嘘