CodeRap

V1

2022/03/20阅读:45主题:默认主题

从0到1成为Flink源码Contributor之Flink编程模型(3)

前置要求

  • Java技术体系
  • Junit单元测试
  • Idea软件使用
  • Flink自建版本custom-test

带有Transformation的HelloWorld程序

我们在上一篇文章的基础上增加一个MapFunction的算子操作,并运行起来

@Test
public void testFlinkHelloWorld2() throws Exception {
    DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080);
  
    SingleOutputStreamOperator<Long> map = lines.map(((line -> Long.parseLong(line))));
  
    map.print();
  
    streamExecutionEnvironment.execute();
}

该流式HelloWorld程序主要是从8080网络端口读取数据,并通过MapFunction算子操作把字符串数据转换成Long型数据,最后输出

我们看一下的算子流图长什么样子 可以看出,该算子流图与我们的代码一一对应,通过Source进行了Map的Transformation转换,然后通过Sink输出结果

那么Source、Transformation、Sink到底是什么?

Flink编程模型

正如上一篇文章说的Flink的应用程序编程无外呼三部曲 图片官网连接: https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/overview

简单来说就是通过Source、Transformation、Sink等算子表示的Transformation之间的转换,最终在API层面的表现就是DataStream之间的转换

那么我们具体看看Source、Transformation、Sink到底是什么 Source: 数据源,用于从集合或外部系统接入数据源,其本质是实现SourceFunction被Operator封装并组成的Transformation(源码) Transformation: 数据转换的各种操作,可以将数据转换计算成你想要的数据,其本质是一系统被Operator封装的ProcessFunction并组成的Transformation(源码) Sink: 接收器,将转换计算后的数据发送到控制台或输入到外部系统,其本质是实现SinkFunction被Operaotr封装并组成的Transformation(源码) 虽然在源码层面Source、Tranforamtion和Sink都是Transformation,但为了概念不混淆,对外声明上我们还是按照Souce、Transformation和Sink来加以区别,毕竟Source与Sink与普通的正常的Transformation实现还是有自己的不同之处

Fink Source

那么Flink的Source有哪些类型呢? 总体上可以分为内置的Source与自定义的Source,但本质上都是直接或间隔调用StreamExecutionEnvironment类的addSource方法、实现了SourceFunction并被Operator所封装再组成的Transformation、最后对外提供使用的DataStreamSource 内置的Flink的Source主要有:

1) fromElements(...): 来自元素集合
2) fromCollection(...): 来自集合数据
3) fromParallelCollection(...): 来自并行集合数据
4) readTextFile(...)/readFile(...)/readFileStream(...): 来自文件
5) socketTextStream(...): 来自文本网络流
6) createInput(...)/createFileInput(...): 来自支持InputSplit的InputFormat类型,例如flink-connector-jdbc的JdbcRowDataInputFormat

从并发的角度看,Flink的Source有两种实现方法,一种是不支持并发,例如socketTextStream,另一种是支持并发,例如Kafka的多分区形式,具体源码实现可以参考后文

Flink Source背后的Transformation主要是 LegacySourceTransformation 与 SourceTransformation

Flink Transformation

Flink的Transformation种类繁多,主要是基于DataStream或KeyedStream(本质也是DataStream但提供更丰富的keyBy后的算子操作类型) 具体有以下这些:

1) Map: 最简单的转换之一,一对一处理,输入是一条数据流数据,输出的也是一条数据流数据
2) FlatMap: 一对多处理,输入一条数据流数据,输出0条或多条数据流数据
3) Filter: 对数据流数据进行过滤操作,符合条件的数据流数据输向下游,不符合条件的则过滤条不给下游流处理
4) KeyBy: 基于 key 对流进行分区,分区操作有多种类型,分区函数也有多种
5) Reduce: 对数据流作聚合操作汇总出一条数据流数据,常用的 average/sum/min/max/count 等方法都可使用 reduce 可实现
6) Window: 允许按横切的时间或数据流条数对 KeyedStream 进行分组聚合统计并输出汇总数据流
7) WindowAll: 在非分区数据流上进行Window操作
8) Union: 将两个或多个数据流组合在一起再进行算子操作
9) Window Join: 根据 key 分区将同一个 window 的两个数据流 Join 起来
...

具体链接可以参考官网: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview

Flink Sink

Flink的Sink主要向外部系统输出数据,内置的Sink种类不多,都是通过DataStream操作并返回DataStreamSink,主要有:

1) print(): 最常用的,用于向控制台输出,一般用于单元测试或调试程序等
2) writeAsText(...): 写出成文本文件
3) writeAsCsv(...): 写出成Csv文件
4) writeToSocket(...): 向网络流输出数据
...

Flink Sink背后的Transformation主要是 LegacySinkTransformation 与 SinkTransformation

查看Flink的Transformation是怎么进行数据转换操作并判断数据流数据是否向下游输出的

新的HelloWorld的FlatMap流式应用程序如下:

@Test
public void testFlinkHelloWorld3() throws Exception {
    DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080);
  
    SingleOutputStreamOperator<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out) throws Exception {
            if (value.contains(",")) {
                String[] values = value.split(",");
                for (String temp : values) {
                    out.collect(temp);
                }
            } else {
                out.collect(value);
            }
        }
    });
  
    flatMap.print();
  
    streamExecutionEnvironment.execute();
}

程序很简单,就是控制台中输入的字符串中如果函数逗号的,则拆分成多条字符串并向下游输出多条数据,如果没有则原封不动的向下游输出数据


欢迎大家关注

分类:

后端

标签:

后端

作者介绍

CodeRap
V1