注意,我们首页需要在命令行中启动Hbase(start-hbase.sh)。

不要直接复制,最好手敲一遍哦!可以加深一下印象哈。

第一关: HBase的MapReduce快速入门

package com.processdata;

import java.io.IOException;
import java.util.List;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apdplat.word.WordSegmenter;
import org.apdplat.word.segmentation.Word;
import com.util.HBaseUtil;
import com.vdurmont.emoji.EmojiParser;

/**
 * 词频统计
 *
 */
public class WorldCountMapReduce extends Configured implements Tool {

    private static class MyMapper extends TableMapper<Text, IntWritable> {
        private static byte[] family = "comment_info".getBytes();
        private static byte[] column = "content".getBytes();

        @Override
        protected void map(ImmutableBytesWritable rowKey, Result result, Context context) {
            try {
                byte[] value = result.getValue(family, column);
                String content = new String(value, "utf-8");
                String[] split = content.split(" ");
                for (String str : split) {
                    Text text = new Text(str);
                    IntWritable v = new IntWritable(1);
                    context.write(text, v);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
        private static byte[] family = "word_info".getBytes();
        private static byte[] column = "count".getBytes();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) {

            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(family, column, Bytes.toBytes(sum));
            try {
                context.write(null, put);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }

    @Override
    public int run(String[] args) throws Exception {
        // 配置Job
        /********** Begin *********/
  
        // 创建Conf对象
        Configuration conf = HBaseConfiguration.create(getConf());
  
        String tablename = args[0]; // 表名
        String targetTable = args[1]; // 目标表
  
        // 获取到Job对象
        Job job = Job.getInstance(conf);

        // 创建Scan对象
        Scan scan = new Scan();
  
        // 通过Hbase工具类提交数据
        TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);
  
        // 开始提交数据
        job.waitForCompletion(true);
        return 0;
        /********** End *********/
    }
}

第二关:HBase的MapReduce使用

package com.processdata;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;

/**
 * 词频统计
 *
 */
public class WorldCountMapReduce2 extends Configured implements Tool {

    private static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

        @Override
        public void map(Object object, Text value, Context context) throws IOException, InterruptedException {
            /********** Begin *********/
            // 根据题意,我们需要根据空格对指定数据进行拆分
            String[] split = value.toString().split(" ");
            // 循环数组,对值进行分类
            for (String str : split) {
                Text text = new Text(str.getBytes());
                IntWritable v = new IntWritable(1);
                context.write(text, v);
            }
            /********** End *********/
        }
    }

    private static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
        private static byte[] family = "word_info".getBytes();
        private static byte[] column = "count".getBytes();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {

            /********** Begin *********/
            int sum = 0; // 用于统计

            // 循环Map中分类的值,进行相加
            for (IntWritable value : values) {
                sum += value.get();
            }

            // 将key和value进行聚和
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(family, column, Bytes.toBytes(sum));

            // 通过文件方式将其输出
            context.write(null, put);
            /********** End *********/
        }

    }

    @Override
    public int run(String[] args) throws Exception {
        // 配置Job
        /********** Begin *********/

        // 配置
        Configuration conf = HBaseConfiguration.create(getConf());
        String file = args[0]; // 输入文件
        String targetTable = args[1]; // 输出表
        Job job = Job.getInstance(conf);
        // Map的Key的输入类型
        job.setMapOutputKeyClass(Text.class);
        // Map的Value的输入类型
        job.setMapOutputValueClass(IntWritable.class);
        // 需要执行的MapReduce类
        job.setJarByClass(WorldCountMapReduce2.class);
        // 文件输入格式
        FileInputFormat.addInputPath(job, new Path(file));
        // 设置Mapper类
        job.setMapperClass(MyMapper.class);
        // 开始执行任务
        TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);
        job.waitForCompletion(true);
        return 0;
        /********** End *********/
    }
}

最后修改:2021 年 07 月 01 日
如果觉得我的文章对你有用,请随意赞赏