第一关:SQL API使用
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class Step1 {
public static void main(String[] args) throws Exception {
// 请在此处编写代码
/********* Begin *********/
// 创建Flink运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建Table环境
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
// 读取csv文件创建DataSource
DataSource<Player> platerDataSource = env.readCsvFile("/root/score.csv")
.fieldDelimiter(",")
.pojoType(Player.class, "sj", "player",
"cc", "sf", "time", "zg", "qd", "gm", "score");
// 创建表对象
Table table = batchTableEnvironment.fromDataSet(platerDataSource);
batchTableEnvironment.registerTable("mytable",table);
// 查询
Table table1 = batchTableEnvironment.sqlQuery("select player, round(sum(score),1) as sum_score FROM mytable GROUP BY player ORDER BY sum_score desc LIMIT 3");
// 将 Table 转换成 DataSet 输出
DataSet<Result> playerDataDataSet = batchTableEnvironment.toDataSet(table1, Result.class);
playerDataDataSet.print();
/********* End *********/
}
// 定义Result实例对象
public static class Result {
public String player;
public double sum_score;
//重写toString方法
@Override
public String toString() {
return "Result{" +
"player='" + player + '\'' +
", sum_score=" + sum_score +
'}';
}
}
// 定义球员实例对象
public static class Player {
private String sj; // 赛季
private String player; // 球员
private int cc; // 出场
private int sf; // 首发
private double time; // 时间
private double zg; // 助攻
private double qd; // 抢断
private double gm; // 盖帽
private double score; // 得分
public String getSj() {
return sj;
}
public void setSj(String sj) {
this.sj = sj;
}
public String getPlayer() {
return player;
}
public void setPlayer(String player) {
this.player = player;
}
public int getCc() {
return cc;
}
public void setCc(int cc) {
this.cc = cc;
}
public int getSf() {
return sf;
}
public void setSf(int sf) {
this.sf = sf;
}
public double getTime() {
return time;
}
public void setTime(double time) {
this.time = time;
}
public double getZg() {
return zg;
}
public void setZg(double zg) {
this.zg = zg;
}
public double getQd() {
return qd;
}
public void setQd(double qd) {
this.qd = qd;
}
public double getGm() {
return gm;
}
public void setGm(double gm) {
this.gm = gm;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
// 重写toString方法
@Override
public String toString() {
return "Player{" +
"sj='" + sj + '\'' +
", player='" + player + '\'' +
",cc='" + cc + '\'' +
",sf='" + sf + '\'' +
",time='" + time + '\'' +
",zg='" + zg + '\'' +
",qd='" + qd + '\'' +
",gm='" + gm + '\'' +
",score='" + score + '\'' +
'}';
}
}
}
第二关:Table API使用
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.GroupedTable;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class Step2 {
public static void main(String[] args) throws Exception {
// 请在此处编写代码
/********* Begin *********/
// 创建Flink运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建Table环境
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
// 读取csv文件创建DataSource
DataSource<Player> platerDataSource = env.readCsvFile("/root/score.csv")
.fieldDelimiter(",")
.pojoType(Player.class, "sj", "player",
"cc", "sf", "time", "zg", "qd", "gm", "score");
// 创建表对象
Table table = batchTableEnvironment.fromDataSet(platerDataSource);
batchTableEnvironment.registerTable("mytable",table);
// 查询
//Table table1 = batchTableEnvironment.sqlQuery("select player, round(sum(score),1) as sum_score FROM mytable GROUP BY player ORDER BY sum_score desc LIMIT 3");
Table mytable = batchTableEnvironment.scan("mytable"); // 扫描表
Table filter = mytable.filter("score>32"); // 过滤出 score 大于 32 的球员
GroupedTable player = filter.groupBy("player"); // 对球员分组
Table select = player.select("player"); // 查询出 player 列
// 将 Table 转换成 DataSet 输出
DataSet<Result> playerDataDataSet = batchTableEnvironment.toDataSet(select, Result.class);
playerDataDataSet.print();
/********* End *********/
}
// 定义Result实例对象
public static class Result {
public String player;
//重写toString方法
@Override
public String toString() {
return "Result{" +
"player='" + player + '\'' +
'}';
}
}
// 定义球员实例对象
public static class Player {
private String sj; // 赛季
private String player; // 球员
private int cc; // 出场
private int sf; // 首发
private double time; // 时间
private double zg; // 助攻
private double qd; // 抢断
private double gm; // 盖帽
private double score; // 得分
public String getSj() {
return sj;
}
public void setSj(String sj) {
this.sj = sj;
}
public String getPlayer() {
return player;
}
public void setPlayer(String player) {
this.player = player;
}
public int getCc() {
return cc;
}
public void setCc(int cc) {
this.cc = cc;
}
public int getSf() {
return sf;
}
public void setSf(int sf) {
this.sf = sf;
}
public double getTime() {
return time;
}
public void setTime(double time) {
this.time = time;
}
public double getZg() {
return zg;
}
public void setZg(double zg) {
this.zg = zg;
}
public double getQd() {
return qd;
}
public void setQd(double qd) {
this.qd = qd;
}
public double getGm() {
return gm;
}
public void setGm(double gm) {
this.gm = gm;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
// 重写toString方法
@Override
public String toString() {
return "Player{" +
"sj='" + sj + '\'' +
", player='" + player + '\'' +
",cc='" + cc + '\'' +
",sf='" + sf + '\'' +
",time='" + time + '\'' +
",zg='" + zg + '\'' +
",qd='" + qd + '\'' +
",gm='" + gm + '\'' +
",score='" + score + '\'' +
'}';
}
}
}
第三关:用户自定义函数UDF使用
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Step3 {
public static void main(String[] args) throws Exception {
// 创建批处理运行环境
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
// 创建表环境
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(environment);
// 读取数据
DataSource<taxi> dataSource = environment.readCsvFile("/root/data.csv").fieldDelimiter("\t").
ignoreFirstLine().pojoType(taxi.class, "TRIP_ID", "CALL_TYPE", "ORIGIN_CALL", "TAXI_ID", "ORIGIN_STAND", "_TIMESTAMP", "POLYLINE");
// 定义表对象
Table table = tableEnvironment.fromDataSet(dataSource);
// 请在此处编写代码
/********* Begin *********/
// 注册自定义函数
tableEnvironment.registerFunction("DataTimeFormat",new DataTimeFormat());
// 注册表
tableEnvironment.registerTable("mytable",table);
// 使用自定义函数查询
Table table1 = tableEnvironment.sqlQuery("select TRIP_ID,CALL_TYPE,DataTimeFormat(_TIMESTAMP) as FORMATTIME from mytable order by FORMATTIME limit 5");
// 将Table 转化为 DataSet
DataSet<Result> resultDataSet = tableEnvironment.toDataSet(table1, Result.class);
resultDataSet.print();
/********* End *********/
}
// 自定义DataTimeFormat函数,对日期进行格式化
public static class DataTimeFormat extends ScalarFunction {
// 需要格式化的日期转化规则
public SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public String eval(String _TIMESTAMP) {
// 返回格式化之后的数据
return simpleDateFormat.format(new Date(Long.parseLong(_TIMESTAMP)));
}
}
// 定义Result实例对象
public static class Result {
public String TRIP_ID;
public String CALL_TYPE;
public String FORMATTIME;
//重写toString方法
@Override
public String toString() {
return "Result{" +
"TRIP_ID='" + TRIP_ID + '\'' +
", CALL_TYPE=" + '\'' + CALL_TYPE + '\'' +
", FORMATTIME=" + '\'' + FORMATTIME + '\'' +
'}';
}
}
// taxi 实例
public static class taxi {
public String TRIP_ID;
public String CALL_TYPE;
public String ORIGIN_CALL;
public String TAXI_ID;
public String ORIGIN_STAND;
public String _TIMESTAMP;
public String POLYLINE;
@Override
public String toString() {
return "taxi{" +
"TRIP_ID='" + TRIP_ID + '\'' +
", CALL_TYPE='" + CALL_TYPE + '\'' +
", ORIGIN_CALL='" + ORIGIN_CALL + '\'' +
", TAXI_ID='" + TAXI_ID + '\'' +
", ORIGIN_STAND='" + ORIGIN_STAND + '\'' +
", TIMESTAMP='" + _TIMESTAMP + '\'' +
", POLYLINE='" + POLYLINE + '\'' +
'}';
}
}
}
第4关:用户自定义函数 UDTF 的使用
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
public class FlinkTest {
public static void main(String[] args) throws Exception {
// 创建 Flink 运行环境(批处理环境)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建表环境
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
// 读取 csv 文件创建Dataset
DataSource<Student> studentDataSource = env.readCsvFile("/root/a.csv").includeFields(true, true, true).fieldDelimiter(",").ignoreFirstLine().pojoType(Student.class, "name", "bir", "score");
// 创建表对象
Table table = batchTableEnvironment.fromDataSet(studentDataSource);
// 请在此处编写代码
/********* Begin *********/
// 注册自定义函数
batchTableEnvironment.registerFunction("splitFun",new SplitFun());
// 注册表
batchTableEnvironment.registerTable("mytable",table);
// 使用自定义函数查询
Table table1 = batchTableEnvironment.sqlQuery("select name,dat,tim from mytable,LATERAL TABLE(splitFun(bir)) as T(dat, tim)");
// 将Table转化为DataSet
DataSet<Result> resultDataSet = batchTableEnvironment.toDataSet(table1, Result.class);
resultDataSet.print();
/********* End *********/
}
// 自定义函数,将出生日期进行拆分
public static class SplitFun extends TableFunction<Tuple2<String, String>> {
// 通过用什么进行拆分的构造函数
public SplitFun() {
}
// 用于构造方法进行传参
public SplitFun(String separator) {
this.separator = separator;
}
private String separator = " ";
public void eval(String str) {
String[] datas = str.split(separator); // 对数据进行拆分
// 获取拆分后前面的数据和后面的数据
collect(new Tuple2<String, String>(datas[0], datas[1]));
}
}
// 返回结果
public static class Result{
public String name;
public String dat;
public String tim;
@Override
public String toString() {
return " Result{" +
"name='" + name + '\'' +
", dat=" + dat +
", tim=" + tim +
'}';
}
}
public static class Student{
private String name;
private int score;
private String bir;
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getBir() {
return bir;
}
public void setBir(String addr) {
this.bir = addr;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", score='" + score + '\'' +
",addr='" + bir + '\'' +
'}';
}
}
}
6 条评论
感谢博主分享,继续加油୧(๑•̀⌄•́๑)૭
要得要得,你也要加油哦,希望你以后不看参考也可以做出来哦!
好滴好滴∠( ᐛ 」∠)_
古德jio!!还有吗
没有啦
顽张って!୧(๑•̀⌄•́๑)૭