注意,我们需要编写3个Java文件
第一个文件:DBHelper.java(数据库工具类)
package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {
/********** begin **********/
// mysql驱动
private static final String driver = "com.mysql.jdbc.Driver";
// mysql连接地址
private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
// mysql的用户名
private static final String username = "root";
// mysql的密码
private static final String password = "123123";
// 连接对象
private static Connection conn = null;
// 设置静态变量,用于加载数据库驱动
static {
try {
// 加载数据库驱动
Class.forName(driver);
} catch (Exception e) {
// 抛出异常
e.printStackTrace();
}
}
// 创建连接类,连接数据库
public static Connection getConn() {
// 如果对象不存在,开始创建
if (conn == null) {
try {
// 创建连接对象
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
// 如果无法创建数据库连接对象,则抛出数据库异常
e.printStackTrace();
}
// 创建成功后返回这个对象
return conn;
}
// 如果对象存在,直接返回连接对象
return conn;
}
public static void main(String[] args) {
// 通过数据库连接工具类进行连接
Connection conn = DBHelper.getConn();
}
/********** end **********/
}
第二文件:JsonMap.java(MapReduce的操作类)
package com;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class JsonMap extends Mapper<LongWritable, Text, NullWritable, Put> {
/********** begin **********/
// 用户存放城市编码和城市名字的数据
Map<String, String> map = new HashMap<String, String>();
Put put;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 通过数据库工具类获取对象
Connection conn = DBHelper.getConn();
try {
// 查询数据
Statement s = conn.createStatement();
String sql = "select * from province";
ResultSet r = s.executeQuery(sql);
// 循环结果
while (r.next()) {
// 获取城市编码
String cityCode = r.getString(1);
// 获取城市的名字
String cityName = r.getString(2);
// 将获取到的数据放入map集合中
// 前面是键,后面是值
map.put(cityCode, cityName);
}
} catch (SQLException e) {
// 如果数据异常时,抛出该异常
e.printStackTrace();
}
}
// 编写map类
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取数据
String line = value.toString();
// 开始解析Json数据
JSONObject j = JSONObject.parseObject(line);
// 因为要解析的数据有14条,用一个数组存放这个14条数据
String[] data = new String[14];
data[0] = j.getString("id");
data[1] = j.getString("company_name");
data[2] = j.getString("eduLevel_name");
data[3] = j.getString("emplType");
data[4] = j.getString("jobName");
// 对工资进行解析
String salary = j.getString("salary");
if (salary.contains("K-")) {
Double a = Double.valueOf(salary.substring(0, salary.indexOf("K")));
Double b = Double.valueOf(salary.substring(salary.indexOf("-") + 1, salary.lastIndexOf("K")));
data[5] = (a + b) / 2 + "";
} else {
data[5] = "0";
}
data[6] = j.getString("createDate");
data[7] = j.getString("endDate");
data[8] = j.getString("city_code");
data[9] = j.getString("companySize");
data[10] = j.getString("welfare");
data[11] = j.getString("responsibility");
data[12] = j.getString("place");
data[13] = j.getString("workingExp");
// 检查这些数据是否为空
for (String s : data) {
if (s.equals("") || s == null) {
// 如果为空直接返回,不执行下面的操作
return;
}
}
String info = "info";
// 将所有数据添加进info中
put = new Put(data[0].getBytes());
put.addColumn(info.getBytes(), "company_name".getBytes(), data[1].getBytes());
put.addColumn(info.getBytes(), "eduLevel_name".getBytes(), data[2].getBytes());
put.addColumn(info.getBytes(), "emplType".getBytes(), data[3].getBytes());
put.addColumn(info.getBytes(), "jobName".getBytes(), data[4].getBytes());
put.addColumn(info.getBytes(), "salary".getBytes(), data[5].getBytes());
put.addColumn(info.getBytes(), "createDate".getBytes(), data[6].getBytes());
put.addColumn(info.getBytes(), "endDate".getBytes(), data[7].getBytes());
put.addColumn(info.getBytes(), "city_name".getBytes(), data[8].getBytes());
put.addColumn(info.getBytes(), "companySize".getBytes(), data[9].getBytes());
put.addColumn(info.getBytes(), "welfare".getBytes(), data[10].getBytes());
put.addColumn(info.getBytes(), "responsibility".getBytes(), data[11].getBytes());
put.addColumn(info.getBytes(), "place".getBytes(), data[12].getBytes());
put.addColumn(info.getBytes(), "workingExp".getBytes(), data[13].getBytes());
// 将它写入到HDFS中
context.write(NullWritable.get(), put);
}
/********** end **********/
}
第三个文件:JsonTest.class(测试操作类)
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
public class JsonTest {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
// 设置zookeeper的配置
config.set("hbase.zookeeper.quorum", "127.0.0.1");
Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin();
TableName tableName = TableName.valueOf("job");
boolean isExists = admin.tableExists(tableName);
if (!isExists) {
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
tableDescriptor.setColumnFamily(family); // 设置列族
admin.createTable(tableDescriptor.build()); // 创建表
} else {
admin.disableTable(tableName);
admin.deleteTable(tableName);
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build();// 构建列族对象
tableDescriptor.setColumnFamily(family); // 设置列族
admin.createTable(tableDescriptor.build()); // 创建表
}
/********** begin **********/
// 设置需要执行任务
Job job = Job.getInstance(config);
// 设置map的执行任务
job.setJarByClass(JsonTest.class);
job.setMapperClass(JsonMap.class);
job.setMapOutputKeyClass(NullWritable.class);
// 因为该任务没有用到reduce,所以需要设置其值为0
job.setNumReduceTasks(0);
// 设置数据的所在位置
FileInputFormat.setInputPaths(job, new Path("/root/data/data.json"));
// 开始执行任务
TableMapReduceUtil.initTableReducerJob("job", null, job);
TableMapReduceUtil.addDependencyJars(job);
// 提交任务
boolean b = job.waitForCompletion(true);
// 退出任务
System.exit(b ? 0 : 1);
/********** end **********/
}
}
上面编写完代码后,我们需要到命令行去启动Hbase
启动hbase(start-hbase.sh
)完成后,我们就可以开始测评啦!
3 条评论
龙争虎斗彼岸花 罗总给你一个家
更新的好快 好厉害啊
哈哈,那必须的哈(๑•̀ㅁ•́ฅ)