第一关:SQL API使用

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class Step1 {
    public static void main(String[] args) throws Exception {
        // 请在此处编写代码
        /********* Begin *********/

        // 创建Flink运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 创建Table环境
        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
        // 读取csv文件创建DataSource
        DataSource<Player> platerDataSource = env.readCsvFile("/root/score.csv")
        .fieldDelimiter(",")
        .pojoType(Player.class, "sj", "player",
                "cc", "sf", "time", "zg", "qd", "gm", "score");

        // 创建表对象
        Table table = batchTableEnvironment.fromDataSet(platerDataSource);
        batchTableEnvironment.registerTable("mytable",table);
        // 查询
        Table table1 = batchTableEnvironment.sqlQuery("select player, round(sum(score),1) as sum_score FROM mytable GROUP BY player ORDER BY sum_score desc LIMIT 3");
        // 将 Table 转换成 DataSet 输出
        DataSet<Result> playerDataDataSet = batchTableEnvironment.toDataSet(table1, Result.class);
        playerDataDataSet.print();
  
        /********* End *********/
    }

    // 定义Result实例对象
    public static class Result {
        public String player;
        public double sum_score;

        //重写toString方法
        @Override
        public String toString() {
            return "Result{" +
                    "player='" + player + '\'' +
                    ", sum_score=" + sum_score +
                    '}';
        }
    }

    // 定义球员实例对象
    public static class Player {
        private String sj; // 赛季
        private String player; // 球员
        private int cc; // 出场
        private int sf; // 首发
        private double time; // 时间
        private double zg; // 助攻
        private double qd; // 抢断
        private double gm; // 盖帽
        private double score; // 得分

        public String getSj() {
            return sj;
        }
        public void setSj(String sj) {
            this.sj = sj;
        }
        public String getPlayer() {
            return player;
        }
        public void setPlayer(String player) {
            this.player = player;
        }
        public int getCc() {
            return cc;
        }
        public void setCc(int cc) {
            this.cc = cc;
        }
        public int getSf() {
            return sf;
        }
        public void setSf(int sf) {
            this.sf = sf;
        }
        public double getTime() {
            return time;
        }
        public void setTime(double time) {
            this.time = time;
        }
        public double getZg() {
            return zg;
        }
        public void setZg(double zg) {
            this.zg = zg;
        }
        public double getQd() {
            return qd;
        }
        public void setQd(double qd) {
            this.qd = qd;
        }
        public double getGm() {
            return gm;
        }
        public void setGm(double gm) {
            this.gm = gm;
        }
        public double getScore() {
            return score;
        }
        public void setScore(double score) {
            this.score = score;
        }

        // 重写toString方法
        @Override
        public String toString() {
            return "Player{" +
                    "sj='" + sj + '\'' +
                    ", player='" + player + '\'' +
                    ",cc='" + cc + '\'' +
                    ",sf='" + sf + '\'' +
                    ",time='" + time + '\'' +
                    ",zg='" + zg + '\'' +
                    ",qd='" + qd + '\'' +
                    ",gm='" + gm + '\'' +
                    ",score='" + score + '\'' +
                    '}';
        }
    }
}

第二关:Table API使用

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.GroupedTable;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class Step2 {
    public static void main(String[] args) throws Exception {
        // 请在此处编写代码
        /********* Begin *********/

        // 创建Flink运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 创建Table环境
        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
        // 读取csv文件创建DataSource
        DataSource<Player> platerDataSource = env.readCsvFile("/root/score.csv")
        .fieldDelimiter(",")
        .pojoType(Player.class, "sj", "player",
                "cc", "sf", "time", "zg", "qd", "gm", "score");

        // 创建表对象
        Table table = batchTableEnvironment.fromDataSet(platerDataSource);
        batchTableEnvironment.registerTable("mytable",table);
        // 查询
        //Table table1 = batchTableEnvironment.sqlQuery("select player, round(sum(score),1) as sum_score FROM mytable GROUP BY player ORDER BY sum_score desc LIMIT 3");
        Table mytable = batchTableEnvironment.scan("mytable"); // 扫描表
        Table filter = mytable.filter("score>32"); // 过滤出 score 大于 32 的球员
        GroupedTable player = filter.groupBy("player"); // 对球员分组
        Table select = player.select("player"); // 查询出 player 列
        // 将 Table 转换成 DataSet 输出
        DataSet<Result> playerDataDataSet = batchTableEnvironment.toDataSet(select, Result.class);
        playerDataDataSet.print();
        /********* End *********/
    }

    // 定义Result实例对象
    public static class Result {
        public String player;

        //重写toString方法
        @Override
        public String toString() {
            return "Result{" +
                    "player='" + player + '\'' +
                    '}';
        }
    }

    // 定义球员实例对象
    public static class Player {
        private String sj; // 赛季
        private String player; // 球员
        private int cc; // 出场
        private int sf; // 首发
        private double time; // 时间
        private double zg; // 助攻
        private double qd; // 抢断
        private double gm; // 盖帽
        private double score; // 得分

        public String getSj() {
            return sj;
        }
        public void setSj(String sj) {
            this.sj = sj;
        }
        public String getPlayer() {
            return player;
        }
        public void setPlayer(String player) {
            this.player = player;
        }
        public int getCc() {
            return cc;
        }
        public void setCc(int cc) {
            this.cc = cc;
        }
        public int getSf() {
            return sf;
        }
        public void setSf(int sf) {
            this.sf = sf;
        }
        public double getTime() {
            return time;
        }
        public void setTime(double time) {
            this.time = time;
        }
        public double getZg() {
            return zg;
        }
        public void setZg(double zg) {
            this.zg = zg;
        }
        public double getQd() {
            return qd;
        }
        public void setQd(double qd) {
            this.qd = qd;
        }
        public double getGm() {
            return gm;
        }
        public void setGm(double gm) {
            this.gm = gm;
        }
        public double getScore() {
            return score;
        }
        public void setScore(double score) {
            this.score = score;
        }

        // 重写toString方法
        @Override
        public String toString() {
            return "Player{" +
                    "sj='" + sj + '\'' +
                    ", player='" + player + '\'' +
                    ",cc='" + cc + '\'' +
                    ",sf='" + sf + '\'' +
                    ",time='" + time + '\'' +
                    ",zg='" + zg + '\'' +
                    ",qd='" + qd + '\'' +
                    ",gm='" + gm + '\'' +
                    ",score='" + score + '\'' +
                    '}';
        }
    }
}

第三关:用户自定义函数UDF使用

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import java.text.SimpleDateFormat;
import java.util.Date;

public class Step3 {
    public static void main(String[] args) throws Exception {
        // 创建批处理运行环境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        // 创建表环境
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(environment);
  
        // 读取数据
        DataSource<taxi> dataSource = environment.readCsvFile("/root/data.csv").fieldDelimiter("\t").
                ignoreFirstLine().pojoType(taxi.class, "TRIP_ID", "CALL_TYPE", "ORIGIN_CALL", "TAXI_ID", "ORIGIN_STAND", "_TIMESTAMP", "POLYLINE");
        // 定义表对象
        Table table = tableEnvironment.fromDataSet(dataSource);
        // 请在此处编写代码
        /********* Begin *********/
  
        // 注册自定义函数
        tableEnvironment.registerFunction("DataTimeFormat",new DataTimeFormat());
        // 注册表
        tableEnvironment.registerTable("mytable",table);
        // 使用自定义函数查询
        Table table1 = tableEnvironment.sqlQuery("select TRIP_ID,CALL_TYPE,DataTimeFormat(_TIMESTAMP) as FORMATTIME  from mytable order by FORMATTIME limit 5");
        // 将Table 转化为 DataSet
        DataSet<Result> resultDataSet = tableEnvironment.toDataSet(table1, Result.class);
        resultDataSet.print();
       /********* End *********/
    }

    // 自定义DataTimeFormat函数,对日期进行格式化
    public static class DataTimeFormat extends ScalarFunction {

        // 需要格式化的日期转化规则
        public SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        public String eval(String _TIMESTAMP) {
            // 返回格式化之后的数据
            return simpleDateFormat.format(new Date(Long.parseLong(_TIMESTAMP)));
        }
    }

    // 定义Result实例对象
    public static class Result {
        public String TRIP_ID;
        public String CALL_TYPE;
        public String FORMATTIME;

        //重写toString方法
        @Override
        public String toString() {
            return "Result{" +
                    "TRIP_ID='" + TRIP_ID + '\'' +
                    ", CALL_TYPE=" + '\'' + CALL_TYPE + '\'' +
                    ", FORMATTIME=" + '\'' + FORMATTIME + '\'' +
                    '}';
        }
    }

    // taxi 实例
    public static class taxi {
        public String TRIP_ID;
        public String CALL_TYPE;
        public String ORIGIN_CALL;
        public String TAXI_ID;
        public String ORIGIN_STAND;
        public String _TIMESTAMP;
        public String POLYLINE;

        @Override
        public String toString() {
            return "taxi{" +
                    "TRIP_ID='" + TRIP_ID + '\'' +
                    ", CALL_TYPE='" + CALL_TYPE + '\'' +
                    ", ORIGIN_CALL='" + ORIGIN_CALL + '\'' +
                    ", TAXI_ID='" + TAXI_ID + '\'' +
                    ", ORIGIN_STAND='" + ORIGIN_STAND + '\'' +
                    ", TIMESTAMP='" + _TIMESTAMP + '\'' +
                    ", POLYLINE='" + POLYLINE + '\'' +
                    '}';
        }
    }
}

第4关:用户自定义函数 UDTF 的使用

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.TableFunction;

public class FlinkTest {
    public static void main(String[] args) throws Exception {
        // 创建 Flink 运行环境(批处理环境)
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 创建表环境
        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
        // 读取 csv 文件创建Dataset
        DataSource<Student> studentDataSource = env.readCsvFile("/root/a.csv").includeFields(true, true, true).fieldDelimiter(",").ignoreFirstLine().pojoType(Student.class, "name", "bir", "score");
        // 创建表对象
        Table table = batchTableEnvironment.fromDataSet(studentDataSource);
        // 请在此处编写代码
        /********* Begin *********/

        // 注册自定义函数
        batchTableEnvironment.registerFunction("splitFun",new SplitFun());
        // 注册表
        batchTableEnvironment.registerTable("mytable",table);
        // 使用自定义函数查询
        Table table1 = batchTableEnvironment.sqlQuery("select name,dat,tim from mytable,LATERAL TABLE(splitFun(bir)) as T(dat, tim)");
        // 将Table转化为DataSet
        DataSet<Result> resultDataSet = batchTableEnvironment.toDataSet(table1, Result.class);
        resultDataSet.print();
        /********* End *********/
    }

    // 自定义函数,将出生日期进行拆分
    public static class SplitFun extends TableFunction<Tuple2<String, String>> {
        // 通过用什么进行拆分的构造函数
        public SplitFun() {
        }
        // 用于构造方法进行传参
        public SplitFun(String separator) {
            this.separator = separator;
        }
        private String separator = " ";
        public void eval(String str) {
            String[] datas = str.split(separator); // 对数据进行拆分
            // 获取拆分后前面的数据和后面的数据
            collect(new Tuple2<String, String>(datas[0], datas[1]));
        }
    }

    // 返回结果
    public static class Result{
        public String name;
        public String dat;
        public String tim;

        @Override
        public String toString() {
            return " Result{" +
                    "name='" + name + '\'' +
                    ", dat=" + dat +
                    ", tim=" + tim +
                    '}';
        }
    }

    public static class Student{
        private String name;
        private int score;
        private String bir;
        public int getScore() {
            return score;
        }

        public void setScore(int score) {
            this.score = score;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
        public String getBir() {
            return bir;
        }

        public void setBir(String addr) {
            this.bir = addr;
        }

        @Override
        public String toString() {
            return "Student{" +
                    "name='" + name + '\'' +
                    ", score='" + score + '\'' +
                    ",addr='" + bir + '\'' +
                    '}';
        }
    }
}
最后修改:2021 年 07 月 01 日
如果觉得我的文章对你有用,请随意赞赏