注意,我们需要编写3个Java文件

第一个文件:DBHelper.java(数据库工具类)

package com;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class DBHelper {
    /********** begin **********/
    // mysql驱动
    private static final String driver = "com.mysql.jdbc.Driver";
    // mysql连接地址
    private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
    // mysql的用户名
    private static final String username = "root";
    // mysql的密码
    private static final String password = "123123";
    // 连接对象
    private static Connection conn = null;
    // 设置静态变量,用于加载数据库驱动
    static {
        try {
            // 加载数据库驱动
            Class.forName(driver);
        } catch (Exception e) {
            // 抛出异常
            e.printStackTrace();
        }
    }

    // 创建连接类,连接数据库
    public static Connection getConn() {
        // 如果对象不存在,开始创建
        if (conn == null) {
            try {
                // 创建连接对象
                conn = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                // 如果无法创建数据库连接对象,则抛出数据库异常
                e.printStackTrace();
            }
            // 创建成功后返回这个对象
            return conn;
        }
        // 如果对象存在,直接返回连接对象
        return conn;
    }

    public static void main(String[] args) {
        // 通过数据库连接工具类进行连接
        Connection conn = DBHelper.getConn();
    }
    /********** end **********/
}

第二文件:JsonMap.java(MapReduce的操作类)

package com;

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

public class JsonMap extends Mapper<LongWritable, Text, NullWritable, Put> {
    /********** begin **********/

    // 用户存放城市编码和城市名字的数据
    Map<String, String> map = new HashMap<String, String>();
    Put put;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 通过数据库工具类获取对象
        Connection conn = DBHelper.getConn();
        try {
            // 查询数据
            Statement s = conn.createStatement();
            String sql = "select * from province";
            ResultSet r = s.executeQuery(sql);
            // 循环结果
            while (r.next()) {
                // 获取城市编码
                String cityCode = r.getString(1);
                // 获取城市的名字
                String cityName = r.getString(2);
                // 将获取到的数据放入map集合中
                // 前面是键,后面是值
                map.put(cityCode, cityName);
            }
        } catch (SQLException e) {
            // 如果数据异常时,抛出该异常
            e.printStackTrace();
        }
    }

    // 编写map类
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取数据
        String line = value.toString();
        // 开始解析Json数据
        JSONObject j = JSONObject.parseObject(line);
        // 因为要解析的数据有14条,用一个数组存放这个14条数据
        String[] data = new String[14];
        data[0] = j.getString("id");
        data[1] = j.getString("company_name");
        data[2] = j.getString("eduLevel_name");
        data[3] = j.getString("emplType");
        data[4] = j.getString("jobName");
        // 对工资进行解析
        String salary = j.getString("salary");
        if (salary.contains("K-")) {
            Double a = Double.valueOf(salary.substring(0, salary.indexOf("K")));
            Double b = Double.valueOf(salary.substring(salary.indexOf("-") + 1, salary.lastIndexOf("K")));
            data[5] = (a + b) / 2 + "";
        } else {
            data[5] = "0";
        }
        data[6] = j.getString("createDate");
        data[7] = j.getString("endDate");
        data[8] = j.getString("city_code");
        data[9] = j.getString("companySize");
        data[10] = j.getString("welfare");
        data[11] = j.getString("responsibility");
        data[12] = j.getString("place");
        data[13] = j.getString("workingExp");
        // 检查这些数据是否为空
        for (String s : data) {
            if (s.equals("") || s == null) {
                // 如果为空直接返回,不执行下面的操作
                return;
            }
        }
        String info = "info";
        // 将所有数据添加进info中
        put = new Put(data[0].getBytes());
        put.addColumn(info.getBytes(), "company_name".getBytes(), data[1].getBytes());
        put.addColumn(info.getBytes(), "eduLevel_name".getBytes(), data[2].getBytes());
        put.addColumn(info.getBytes(), "emplType".getBytes(), data[3].getBytes());
        put.addColumn(info.getBytes(), "jobName".getBytes(), data[4].getBytes());
        put.addColumn(info.getBytes(), "salary".getBytes(), data[5].getBytes());
        put.addColumn(info.getBytes(), "createDate".getBytes(), data[6].getBytes());
        put.addColumn(info.getBytes(), "endDate".getBytes(), data[7].getBytes());
        put.addColumn(info.getBytes(), "city_name".getBytes(), data[8].getBytes());
        put.addColumn(info.getBytes(), "companySize".getBytes(), data[9].getBytes());
        put.addColumn(info.getBytes(), "welfare".getBytes(), data[10].getBytes());
        put.addColumn(info.getBytes(), "responsibility".getBytes(), data[11].getBytes());
        put.addColumn(info.getBytes(), "place".getBytes(), data[12].getBytes());
        put.addColumn(info.getBytes(), "workingExp".getBytes(), data[13].getBytes());
        // 将它写入到HDFS中
        context.write(NullWritable.get(), put);
    }
    /********** end **********/

}

第三个文件:JsonTest.class(测试操作类)

package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

public class JsonTest {

    public static void main(String[] args) throws Exception {
        Configuration config = HBaseConfiguration.create();
        // 设置zookeeper的配置
        config.set("hbase.zookeeper.quorum", "127.0.0.1");
        Connection connection = ConnectionFactory.createConnection(config);
        Admin admin = connection.getAdmin();
        TableName tableName = TableName.valueOf("job");

        boolean isExists = admin.tableExists(tableName);
        if (!isExists) {
            TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
            ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
            tableDescriptor.setColumnFamily(family); // 设置列族
            admin.createTable(tableDescriptor.build()); // 创建表
        } else {
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
            TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
            ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
            tableDescriptor.setColumnFamily(family); // 设置列族
            admin.createTable(tableDescriptor.build()); // 创建表

        }

        /********** begin **********/
        // 设置需要执行任务
        Job job = Job.getInstance(config);
        // 设置map的执行任务
        job.setJarByClass(JsonTest.class);
        job.setMapperClass(JsonMap.class);
        job.setMapOutputKeyClass(NullWritable.class);
        // 因为该任务没有用到reduce,所以需要设置其值为0
        job.setNumReduceTasks(0);
        // 设置数据的所在位置
        FileInputFormat.setInputPaths(job, new Path("/root/data/data.json"));
        // 开始执行任务
        TableMapReduceUtil.initTableReducerJob("job", null, job);
        TableMapReduceUtil.addDependencyJars(job);
        // 提交任务
        boolean b = job.waitForCompletion(true);
        // 退出任务
        System.exit(b ? 0 : 1);
        /********** end **********/
    }
}

上面编写完代码后,我们需要到命令行去启动Hbase

启动hbase(start-hbase.sh)完成后,我们就可以开始测评啦!

最后修改:2021 年 07 月 01 日
如果觉得我的文章对你有用,请随意赞赏