北京网站设计制作关键词优化,外贸网店系统,如何创建网站,html写一个心形网页常用处理函数 处理函数概述 基本处理函数ProcessFunction介绍使用示例 按键分区处理函数KeyedProcessFunction介绍定时器Timer和定时服务TimerService使用示例其他 窗口处理函数ProcessWindowFunction介绍ProcessAllWindowFunction介绍使用示例 流的合并处理函数CoProcessFunct… 常用处理函数 处理函数概述 基本处理函数ProcessFunction介绍使用示例 按键分区处理函数KeyedProcessFunction介绍定时器Timer和定时服务TimerService使用示例其他 窗口处理函数ProcessWindowFunction介绍ProcessAllWindowFunction介绍使用示例 流的合并处理函数CoProcessFunction介绍使用示例 流的联结处理函数窗口联结 JoinFunction间隔联结 ProcessJoinFunction迟到数据的处理 广播流处理函数KeyedBroadcastProcessFunctionBroadcastProcessFunction使用示例 处理函数
概述 处理函数Processing Function是Apache Flink中用于对数据流上的元素进行处理的核心组件之一。处理函数负责定义数据流上的数据如何被处理允许开发人员编写自定义逻辑以执行各种操作如转换、聚合、筛选、连接等并在处理后生成输出数据流。 对于数据流都可以直接调用.process()方法进行自定义处理传入的参数就叫作处理函数也可以把它划分为转换算子。 基本处理函数 ProcessFunction是最基本的处理函数基于DataStream直接调用.process()时作为参数传入 ProcessFunction介绍
ProcessFunction是一个抽象类它继承AbstractRichFunction有两个泛型类型参数
1.输入的数据类型2.处理完成之后输出数据类型内部单独定义了两个方法
1.必须要实现的抽象方法.processElement()2.一个非抽象方法.onTimer()ProcessFunction类如下
/*** 处理流元素的函数** 对于输入流中的每个元素调用processElement(ObjectProcessFunction.ContextCollector) 可以产生零个或多个元素作为输出* 还可以通过提供的ProcessFunction.Context查询时间和设置计时器** 对于触发计时器将调用onTimer(longProcessFunction.OnTimerContextCollector) 可以再次产生零个或多个元素作为输出并注册其他计时器** param I 输入元素的类型* param O 输出元素的类型*/
PublicEvolving
public abstract class ProcessFunctionI, O extends AbstractRichFunction {private static final long serialVersionUID 1L;/*** 处理输入流中的一个元素对于流中的每个元素都会调用一次** 可以使用输出零个或多个元素收集器参数并使用更新内部状态或设置计时器ProcessFunction.Context参数** param value 输入值类型与流中数据类型一致* param ctx ProcessFunction的内部抽象类Context表示当前运行的上下文可以获取当前时间戳用于查询时间和注册定时器的定时服务* param out 用于返回结果值的收集器与out.collect()方法向下游发数据类似*/public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;/*** 当使用设置计时器时调用TimerService* * 只有在注册好的定时器触发的时候才会调用而定时器是通过定时服务TimerService来注册的* * 事件时间语义下就是由水位线watermark来触发* * 也可以自定义数据按照时间分组、定时触发计算输出结果实现类似窗口window的功能** param timestamp 触发计时器的时间戳指设定好的触发时间* param ctx 上下文* param out 用于返回结果值的收集器*/public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}
}使用示例 基本处理函数ProcessFunction的使用与基本的转换操作类似直接基于DataStream调用.process()方法传入一个ProcessFunction作为参数用来定义处理逻辑。 具体举例使用示例如下 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceInteger stream env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, -6));/*** 创建OutputTag对象* 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation)*/OutputTagInteger evenTag new OutputTag(even, Types.INT);OutputTagInteger oddTag new OutputTag(odd, Types.INT);// 使用process算子SingleOutputStreamOperatorInteger process stream.process(new ProcessFunctionInteger, Integer() {Overridepublic void processElement(Integer value, Context ctx, CollectorInteger out) throws Exception {if (value 0) {if (value % 2 0) {// 偶数放到侧输出流evenTag中// 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据ctx.output(evenTag, value);} else if (value % 2 1) {// 奇数放到侧输出流oddTag中ctx.output(oddTag, value);}} else {// 负数 数据放到主流中out.collect(value);}}});// 在主流中根据标签 获取 侧输出流SideOutputDataStreamInteger even process.getSideOutput(evenTag);SideOutputDataStreamInteger odd process.getSideOutput(oddTag);// 打印主流process.printToErr(主流-负数-job);//打印 侧输出流even.print(偶数-job);odd.print(奇数-job);env.execute();}奇数-job:1 1
偶数-job:2 2
奇数-job:1 3
偶数-job:2 4
奇数-job:1 5
主流-负数-job:2 -6按键分区处理函数 KeyedProcessFunction对流按键分区后的处理函数基于KeyedStream调用.process()时作为参数传入。要想使用定时器必须基于KeyedStream KeyedProcessFunction介绍 KeyedProcessFunction与ProcessFunction的定义几乎完全一样区别只是在于类型参数多了一个K这是当前按键分区的key的类型。 按键分区处理函数接口如下
public abstract class KeyedProcessFunctionK, I, O extends AbstractRichFunction {public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}
} 定时器Timer和定时服务TimerService 另外在KeyedStream中是支持使用定时服务TimerService可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册定时事件。 在onTimer()方法中可以实现定时处理的逻辑而它触发的前提是之前曾经注册过定时器、并且现在已经到了触发时间。 注册定时器的功能是通过上下文中提供的定时服务来实现的。
// 获取定时服务
TimerService timerService ctx.timerService();TimerService是Flink关于时间和定时器的基础服务接口对应的操作主要有三个获取当前时间注册定时器以及删除定时器具体方法如下
// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线事件时间
long currentWatermark();// 注册处理时间定时器当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器当水位线超过time时触发
void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);注意 尽管处理函数中都可以访问TimerService不过只有基于KeyedStream的处理函数才能去调用注册和删除定时器的方法 使用示例 直接基于keyBy之后的KeyedStream直接调用.process()方法传入KeyedProcessFunction的实现类参数 必须实现processElement()抽象方法用来处理流中的每一个数据必须实现非抽象方法onTimer()用来定义定时器触发时的回调操作public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorTuple2String, Integer streamSource env.socketTextStream(IP, 8086)// 将输入数据转换为Tuple2.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// keyBy分区KeyedStreamTuple2String, Integer, String keyByStream streamSource.keyBy(a - a.f0);// 按键分区处理函数SingleOutputStreamOperatorInteger process keyByStream.process(new KeyedProcessFunctionString, Tuple2String, Integer, Integer() {/*** 来一条数据调用一次* param value 当前数据* param ctx 上下文* param out 收集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorInteger out) throws Exception {//获取当前数据的keyString currentKey ctx.getCurrentKey();
p();// 获取定时服务TimerService timerService ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime ctx.timestam// 注册事件时间定时器timerService.registerEventTimeTimer(3000L);System.out.println(key: currentKey 当前数据: value 当前时间: currentEventTime 注册一个3s定时器);/*** 时间进展到定时器注册的时间调用该方法* param timestamp 定时器被触发时的时间* param ctx 上下文* param out 采集器*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorInteger out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey ctx.getCurrentKey();System.out.println(key: currentKey 时间: timestamp 定时器触发);}});process.print();env.execute();}其他
1.注册一个事件时间的定时器
事件时间定时器通过watermark来触发即watermark 注册的时间水印watermark 当前最大事件时间 - 等待时间 -1ms例子等待3s3s定时器事件时间6s 则watermark 6s - 3s -1ms 2.99s,不会触发3s的定时器// 数据中提取出来的事件时间
Long currentEventTime ctx.timestam
// 注册事件时间定时器
timerService.registerEventTimeTimer(3000L);
System.out.println(key: currentKey 当前数据: value 当前时间: currentEventTime 注册一个3s定时器);输入数据如下当输入7时水位线是7-34s-1ms3.99s即水位线超过定时器3s执行触发回调操作
nc -lk 8086
key1,1
key1,2
key2,3
key2,4
key1,5
key2,6
key1,7控制台输出
key: key1 当前数据: (key1,1) 当前时间: 1000 注册一个3s定时器
key: key1 当前数据: (key1,2) 当前时间: 2000 注册一个3s定时器
key: key2 当前数据: (key2,3) 当前时间: 3000 注册一个3s定时器
key: key2 当前数据: (key2,4) 当前时间: 4000 注册一个3s定时器
key: key1 当前数据: (key1,5) 当前时间: 5000 注册一个3s定时器
key: key2 当前数据: (key2,6) 当前时间: 6000 注册一个3s定时器
key: key1 当前数据: (key1,7) 当前时间: 7000 注册一个3s定时器
key: key1 时间: 3000 定时器触发
key: key2 时间: 3000 定时器触发注意 TimerService会以键和时间戳为标准对定时器进行去重因此对于每个key和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次 2.注册一个处理时间的定时器
long currentTs timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(currentTs 3000L);System.out.println(key: currentKey 当前数据: value 当前时间: currentTs 注册一个3s后的定时器);输入测试数据如下
key1,1
key2,2当注册一个处理时间的定时器3s后定时器会触发操作
key: key1 当前数据: (key1,1) 当前时间: 1688136512301 注册一个3s后的定时器
key: key2 当前数据: (key2,2) 当前时间: 1688136514179 注册一个3s后的定时器
key: key1 时间: 1688136515301 定时器触发
key: key2 时间: 1688136517179 定时器触发3.获取process当前watermark
long currentWatermark timerService.currentWatermark();
System.out.println(当前数据: value 当前watermark: currentWatermark);key1,1
key1,2
key1,3结论每次process处理watermark是指上一条数据的事件时间-等待时间例如3-2-1ms-1001
当前数据(key1,1),当前watermark-9223372036854775808
当前数据(key1,2),当前watermark-2001
当前数据(key1,3),当前watermark-10014.删除一个处理时间定时器 // 注册处理时间定时器long currentTs timerService.currentProcessingTime();long timer currentTs 3000;timerService.registerProcessingTimeTimer(timer);System.out.println(key: currentKey 当前数据: value 当前时间: currentTs 注册一个3s后的定时器);// 在3000毫秒后删除处理时间定时器if(key1.equals(currentKey)){timerService.deleteProcessingTimeTimer(timer)}输入测试数据
key1,1
key2,2控制台输出结果
key: key1 当前数据: (key1,1) 当前时间: 1688138104565 注册一个3s后的定时器
key: key2 当前数据: (key2,2) 当前时间: 1688138106441 注册一个3s后的定时器
key: key2 时间: 1688138109441 定时器触发窗口处理函数 窗口处理函数就是一种典型的全窗口函数它是基于WindowedStream直接调用.process()方法 窗口处理函数有2个
1.ProcessWindowFunction 开窗之后的处理函数也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入必须是keyBy的数据流 2.ProcessAllWindowFunction 同样是开窗之后的处理函数基于AllWindowedStream调用.process()时作为参数传入必须是非keyBy的数据流 ProcessWindowFunction介绍
ProcessWindowFunction既是处理函数又是全窗口函数具体接口如下 /*** ProcessWindowFunction它有四个类型参数* param IN 数据流中窗口任务的输入数据类型* param OUT 窗口任务进行计算之后的输出数据类型* param KEY 数据中键key的类型* param W 窗口的类型是Window的子类型。一般情况下我们定义时间窗口W就是TimeWindow*/public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Window extends AbstractRichFunction {/*** 处理数据的核心方法process()方法** param key 窗口做统计计算基于的键也就是之前keyBy用来分区的字段* param context 当前窗口进行计算的上下文它的类型就是ProcessWindowFunction内部定义的抽象类Context* param elements 窗口收集到用来计算的所有数据这是一个可迭代的集合类型* param out 用来发送数据输出计算结果的收集器类型为Collector* throws Exception*/public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;/*** 主要是进行窗口的清理工作* 如果自定义了窗口状态那么必须在clear()方法中进行显式地清除避免内存溢出* param context 当前窗口进行计算的上下文* throws Exception*/public void clear(Context context) throws Exception {}}ProcessAllWindowFunction介绍 ProcessAllWindowFunction的用法类似不过它是基于AllWindowedStream也就是对没有keyBy的数据流直接开窗并调用.process()方法 stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction())使用示例
以使用ProcessWindowFunction为例说明 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(IP, 8086);// 将输入数据转换为(key, value)元组DataStreamTuple2String, Integer dataStream source.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2 map(String s) throws Exception {int number Integer.parseInt(s);String key number % 2 0 ? key1 : key2;Tuple2 tuple2 new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// 将数据流按键分组并定义滚动窗口处理时间窗口DataStreamString resultStream dataStream.keyBy(tuple - tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new MyProcessWindowFunction());resultStream.print();env.execute(ProcessWindowFunction Example);}public static class MyProcessWindowFunction extends ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow {Overridepublic void process(String key, Context context, IterableTuple2String, Integer elements, CollectorString out) {int sum 0;for (Tuple2String, Integer element : elements) {sum element.f1;}out.collect(Key: key , Window: context.window() , Sum: sum);}}流的合并处理函数 CoProcessFunction是合并connect两条流之后的处理函数基于ConnectedStreams调用.process()时作为参数传入 CoProcessFunction介绍 调用.process()时传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法 CoProcessFunction类具体结构如下
/*** 用于同时处理两个连接的流* 它允许定义自定义处理逻辑以处理来自两个不同输入流的事件并生成输出** param IN1 第一个输入流的元素类型* param IN2 第二个输入流的元素类型* param OUT 输出元素的类型*/
public abstract class CoProcessFunctionIN1, IN2, OUT extends AbstractRichFunction {/*** 处理第一个输入流的元素** param value 第一个输入流的元素* param ctx 用于访问上下文信息例如事件时间和状态的Context对象* param out 用于发射输出元素的Collector对象* throws Exception 处理时可能抛出的异常*/public abstract void processElement1(IN1 value, Context ctx, CollectorOUT out) throws Exception;/*** 处理第二个输入流的元素** param value 第二个输入流的元素* param ctx 用于访问上下文信息可以使用Context对象来访问事件时间、水位线和状态等上下文信息* param out 用于发射输出元素的Collector对象* throws Exception 处理时可能抛出的异常*/public abstract void processElement2(IN2 value, Context ctx, CollectorOUT out) throws Exception;/*** 当定时器触发时调用的方法。可以重写这个方法来执行基于时间的操作** param timestamp 触发定时器的时间戳* param ctx 用于访问上下文信息如事件时间和状态的OnTimerContext对象* param out 用于发射输出元素的Collector对象* throws Exception 处理时可能抛出的异常*/public void onTimer(long timestamp, OnTimerContext ctx, CollectorOUT out) throws Exception {}
}
使用示例
假设有两个输入流将这两个流合并计算得到每个key对应的合计并输出结果流 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple2String, Integer source1 env.fromElements(Tuple2.of(key1, 1), Tuple2.of(key2, 4), Tuple2.of(key1, 2));DataStreamSourceTuple2String, Integer source2 env.fromElements(Tuple2.of(key1, 3), Tuple2.of(key2, 5), Tuple2.of(key2, 6));ConnectedStreamsTuple2String, Integer, Tuple2String, Integer connect source1.connect(source2);// 进行keyby操作将key相同数据放到一起ConnectedStreamsTuple2String, Integer, Tuple2String, Integer connectKeyby connect.keyBy(s1 - s1.f0, s2 - s2.f0);/*** 对2个流中相同key的值求和*/SingleOutputStreamOperatorString process connectKeyby.process(new CoProcessFunctionTuple2String, Integer, Tuple2String, Integer, String() {MapString, Integer map new HashMap();/*** 第一条流的处理逻辑* param value 第一条流的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement1(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {String key value.f0;if (!map.containsKey(key)) {// 如果key不存在则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算获取上一次put的值 本次的值Integer total map.get(key) value.f1;map.put(key, total);}out.collect(processElement1 key key value value total map.get(key));}/*** 第二条流的处理逻辑* param value 第二条流的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement2(Tuple2String, Integer value, Context ctx, CollectorString out) throws Exception {String key value.f0;if (!map.containsKey(key)) {// 如果key不存在则将值直接put进mapmap.put(key, value.f1);} else {// key存在,则计算获取上一次put的值 本次的值Integer total map.get(key) value.f1;map.put(key, total);}out.collect(processElement2 key key value value total map.get(key));}});process.print();env.execute();}3 processElement1 key key2 value (key2,4)total 4
4 processElement1 key key1 value (key1,1)total 1
4 processElement2 key key1 value (key1,3)total 4
4 processElement1 key key1 value (key1,2)total 6
3 processElement2 key key2 value (key2,5)total 9
3 processElement2 key key2 value (key2,6)total 15流的联结处理函数 JoinFunction 和 ProcessJoinFunction 是 Flink 中用于执行窗口连接操作的两个不同接口 窗口联结 JoinFunction Flink为基于一段时间的双流合并专门提供了一个窗口联结算子可以定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。 JoinFunction接口如下 /*** 联接通过在指定的键上联接两个数据集的元素来组合它们,每对连接元素都调用此函数* * 默认情况下连接严格遵循SQL中 “inner join” 的语义** param IN1 第一个输入中元素的类型* param IN2 第二个输入中元素的类型* param OUT 结果元素的类型*/public interface JoinFunctionIN1, IN2, OUT extends Function, Serializable {/*** join方法每对联接的元素调用一次** param first 来自第一个输入的元素* param second 来自第二个输入的元素* return 生成的元素*/OUT join(IN1 first, IN2 second) throws Exception;}具体语法格式如下
/*** 1.调用DataStream的.join()方法来合并两条流得到一个JoinedStreams* 2.通过.where()和.equalTo()方法指定两条流中联结的key。注意两者相同的元素如果在同一窗口中才可以匹配起来* 3.通过.window()开窗口并调用.apply()传入联结窗口函数进行处理计算*/
stream1.join(stream2)// where()参数是KeySelector键选择器用来指定第一条流中的key.where(KeySelector)// equalTo()传入KeySelector则指定第二条流中的key.equalTo(KeySelector)// window()传入窗口分配器.window(WindowAssigner)// apply()看作实现一个特殊的窗口函数,只能调用.apply()。传入JoinFunction是一个函数类接口使用时需要实现内部的.join()方法方法有两个参数分别表示两条流中成对匹配的数据。.apply(JoinFunction)示例如下 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 生成数据源1DataStreamSourceTuple2String, Integer streamSource1 env.fromElements(Tuple2.of(a, 1), Tuple2.of(a, 2), Tuple2.of(b, 3), Tuple2.of(c, 4));// 定义 使用 Watermark策略SingleOutputStreamOperatorTuple2String, Integer stream1 streamSource1.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// 生成数据源2DataStreamSourceTuple2String, Integer streamSource2 env.fromElements(Tuple2.of(a, 1), Tuple2.of(a, 2), Tuple2.of(b, 3), Tuple2.of(c, 4), Tuple2.of(d, 5), Tuple2.of(e, 6));// 定义 使用 Watermark策略SingleOutputStreamOperatorTuple2String, Integer stream2 streamSource2.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));/*** 根据keyby的key进行匹配关联** 注意落在同一个时间窗口范围内才能匹配*/DataStreamString join stream1.join(stream2)// stream1的keyby.where(r1 - r1.f0)// stream2的keyby.equalTo(r2 - r2.f0)// 传入窗口分配器.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 传入JoinFunction函数类接口实现内部的.join()方法方法有两个参数分别表示两条流中成对匹配的数据.apply(new JoinFunctionTuple2String, Integer, Tuple2String, Integer, String() {/*** 关联上的数据调用join方法* param first stream1的数据* param second stream2的数据*/Overridepublic String join(Tuple2String, Integer first, Tuple2String, Integer second) throws Exception {return stream1 数据: first 关联 stream2 数据 second;}});join.print();env.execute();}执行结果如下
stream1 数据: (a,1) 关联 stream2 数据 (a,1)
stream1 数据: (a,1) 关联 stream2 数据 (a,2)
stream1 数据: (a,2) 关联 stream2 数据 (a,1)
stream1 数据: (a,2) 关联 stream2 数据 (a,2)
stream1 数据: (c,4) 关联 stream2 数据 (c,4)
stream1 数据: (b,3) 关联 stream2 数据 (b,3)间隔联结 ProcessJoinFunction Interval Join即间隔联结它是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔看这期间是否有来自另一条流的数据匹配。 ProcessJoinFunction接口情况如下
/*** 处理两个连接流的关联操作的抽象类* 该类允许定义自定义的处理逻辑以在连接两个流时处理匹配的元素** param IN1 第一个输入流的元素类型* param IN2 第二个输入流的元素类型* param OUT 输出元素的类型*/
public interface ProcessJoinFunctionIN1, IN2, OUT {/*** 处理连接两个流的元素** param left 第一个输入流的元素* param right 第二个输入流的元素* param ctx 用于访问上下文信息的 Context 对象* param out 用于发射输出元素的 Collector 对象* throws Exception 处理时可能抛出的异常*/void processElement(IN1 left, IN2 right, Context ctx, CollectorOUT out) throws Exception;
}间隔联结使用语法如下
// 第一条流进行KeyedStream
stream1.keyBy(KeySelector)// 得到KeyedStream之后调用.intervalJoin()合并两条流传入一个KeyedStream参数两者key类型应该一致最终得到一个IntervalJoin类型.intervalJoin(stream2.keyBy(KeySelector))// 通过.between()方法指定间隔的上下界.between(Time.milliseconds(-2), Time.milliseconds(1))// 调用.process()方法定义对匹配数据对的处理操作传入一个处理函数.process (new ProcessJoinFunctionInteger, Integer, String(){Overridepublic void processElement(Integer left, Integer right, Context ctx, CollectorString out) {out.collect(left , right);}});使用示例如下 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 生成数据源1DataStreamSourceTuple2String, Integer streamSource1 env.fromElements(Tuple2.of(a, 1), Tuple2.of(a, 2), Tuple2.of(b, 3), Tuple2.of(c, 4));// 定义 使用 Watermark策略SingleOutputStreamOperatorTuple2String, Integer stream1 streamSource1.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// 生成数据源2DataStreamSourceTuple2String, Integer streamSource2 env.fromElements(Tuple2.of(a, 1), Tuple2.of(a, 2), Tuple2.of(b, 3), Tuple2.of(c, 4), Tuple2.of(d, 5), Tuple2.of(e, 6));// 定义 使用 Watermark策略SingleOutputStreamOperatorTuple2String, Integer stream2 streamSource2.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// 对2条流分别做keyby,key就是关联条件KeyedStreamTuple2String, Integer, String keyedStream1 stream1.keyBy(r1 - r1.f0);KeyedStreamTuple2String, Integer, String keyedStream2 stream2.keyBy(r2 - r2.f0);// 执行间隔联结keyedStream1.intervalJoin(keyedStream2).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunctionTuple2String, Integer, Tuple2String, Integer, String() {/*** 当两条流数据匹配上时调用这个方法* param left stream1的数据* param right stream2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple2String, Integer right, Context ctx, CollectorString out) throws Exception {// 关联的数据out.collect(stream1 数据: left 关联 stream2 数据 right);}}).print();env.execute();}stream1 数据: (a,1) 关联 stream2 数据 (a,1)
stream1 数据: (a,1) 关联 stream2 数据 (a,2)
stream1 数据: (a,2) 关联 stream2 数据 (a,2)
stream1 数据: (a,2) 关联 stream2 数据 (a,1)
stream1 数据: (b,3) 关联 stream2 数据 (b,3)
stream1 数据: (c,4) 关联 stream2 数据 (c,4)迟到数据的处理 窗口间隔联结处理函数可以实现对迟到数据的处理 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer streamSource1 env.socketTextStream(112.74.96.150, 8086).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple2String, Integer streamSource2 env.socketTextStream(112.74.96.150, 8087).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split(,);return Tuple2.of(split[0], Integer.valueOf(split[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));// 对2条流分别做keyby,key就是关联条件KeyedStreamTuple2String, Integer, String keyedStream1 streamSource1.keyBy(r1 - r1.f0);KeyedStreamTuple2String, Integer, String keyedStream2 streamSource2.keyBy(r2 - r2.f0);// 定义 标记操作符的侧面输出OutputTagTuple2String, Integer keyedStream1OutputTag new OutputTag(keyedStream1, Types.TUPLE(Types.STRING, Types.INT));OutputTagTuple2String, Integer keyedStream2OutputTag new OutputTag(keyedStream2, Types.TUPLE(Types.STRING, Types.INT));// 执行间隔联结SingleOutputStreamOperatorString process keyedStream1.intervalJoin(keyedStream2)// 指定间隔的上界、下界的偏移负号代表时间往前正号代表时间往后// 若keyedStream1中某事件时间为5则其水位线是5-32其上界是 5-23 下界是527 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据.between(Time.seconds(-2), Time.seconds(2))// 将streamSource1迟到数据放入侧输出流.sideOutputLeftLateData(keyedStream1OutputTag)// 将streamSource2迟到数据放入侧输出流.sideOutputRightLateData(keyedStream2OutputTag)// 对匹配数据对的处理操作 只能处理 join上的数据.process(new ProcessJoinFunctionTuple2String, Integer, Tuple2String, Integer, String() {/*** 当两条流数据匹配上时调用这个方法* param left stream1的数据* param right stream2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple2String, Integer right, Context ctx, CollectorString out) throws Exception {// 进入这个方法是关联上的数据out.collect(stream1 数据: left 关联 stream2 数据 right);}});process.print(主流);process.getSideOutput(keyedStream1OutputTag).printToErr(streamSource1迟到数据);process.getSideOutput(keyedStream2OutputTag).printToErr(streamSource2迟到数据);env.execute();}1.2条流数据匹配 若keyedStream1中某事件时间为5则其水位线是5-32其上界是 5-23 下界是527 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据 nc -lk 8086
key1,5nc -lk 8087
key1,3
key1,7
key1,8主流 stream1 数据: (key1,5) 关联 stream2 数据 (key1,3)
主流 stream1 数据: (key1,5) 关联 stream2 数据 (key1,7)2.keyedStream2迟到数据 此时keyedStream1中水位线是5-32keyedStream2中水位线是8-35多并行度下水位线取最小即取水位线2
在keyedStream2输入事件时间1
nc -lk 8087
key1,3
key1,7
key1,8
key1,1事件时间1 水位线2且事件时间1被keyedStream1的事件时间5的上界5-23与下界527不包含即数据不匹配且streamSource2数据迟到
streamSource2迟到数据 (key1,1)3.keyedStream1迟到数据
keyedStream1输入事件时间7
nc -lk 8086
key1,5
key1,7此时匹配到streamSource2中的8、7
主流 stream1 数据: (key1,7) 关联 stream2 数据 (key1,8)
主流 stream1 数据: (key1,7) 关联 stream2 数据 (key1,7)此时keyedStream1的水位线是7-34keyedStream2的水位线是8-35多并行度下水位线取最小即取水位线4
keyedStream1输入事件时间3 nc -lk 8086
key1,5
key1,7
key1,3事件时间3 水位线4且事件时间3被keyedStream2的事件时间3的上界3-21与下界325包含即数据匹配且streamSource1数据迟到
streamSource1迟到数据 (key1,3)广播流处理函数 用于连接一个主数据流和多个广播数据流。可以实现processElement 方法来处理主数据流的每个元素同时可以处理广播数据流通常用于数据广播和连接。 广播流处理函数有2个
1.BroadcastProcessFunction 广播连接流处理函数基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接之后的产物 2.KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个KeyedStream与广播流做连接 KeyedBroadcastProcessFunction
/*** param KS 输入键控流的键类型* param IN1 键控 (非广播) 端的输入类型* param IN2 广播端的输入类型* param OUT 运算符的输出类型*/
public abstract class KeyedBroadcastProcessFunctionKS, IN1, IN2, OUTextends BaseBroadcastProcessFunction {private static final long serialVersionUID -2584726797564976453L;/*** (非广播) 的键控流中的每个元素调用此方法** param value 流元素* param ctx 允许查询元素的时间戳、查询当前处理/事件时间以及以只读访问迭代广播状态* param out 将结果元素发出*/public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final CollectorOUT out) throws Exception;/*** 针对broadcast stream中的每个元素调用该方法** param value stream元素* param ctx 上下文 许查询元素的时间戳、查询当前处理/事件时间和更新广播状态* param out 将结果元素发射到*/public abstract void processBroadcastElement(final IN2 value, final Context ctx, final CollectorOUT out) throws Exception;/*** 当使用TimerService设置的计时器触发时调用** param timestamp 触发计时器的时间戳* param ctx 上下文* param out 返回结果值的收集器*/public void onTimer(final long timestamp, final OnTimerContext ctx, final CollectorOUT out)throws Exception {}
}BroadcastProcessFunction BroadcastProcessFunction与KeyedBroadcastProcessFunction类似不过它是基于AllWindowedStream也就是对没有keyBy的数据流直接开窗并调用.process()方法 public abstract class BroadcastProcessFunctionIN1, IN2, OUT extends BaseBroadcastProcessFunction {public abstract void processElement( final IN1 value, final ReadOnlyContext ctx, final CollectorOUT out) throws Exception;public abstract void processBroadcastElement(final IN2 value, final Context ctx, final CollectorOUT out) throws Exception;}使用示例
以使用KeyedBroadcastProcessFunction为例说明
public class KeyedBroadcastProcessFunctionExample {/*** 主流 数据对象*/DataAllArgsConstructorNoArgsConstructorpublic static class MainRecord {private String key;private int value;}/*** 广播流 数据对象*/DataAllArgsConstructorNoArgsConstructorpublic static class BroadcastRecord {private String configKey;private int configValue;}/*** 结果 数据对象*/DataAllArgsConstructorNoArgsConstructorpublic static class ResultRecord {private String key;private int result;}// 使用给定的名称和给定的类型信息新建一个MapStateDescriptorstatic MapStateDescriptorString, Integer mapStateDescriptor new MapStateDescriptor(broadcastState, String.class, Integer.class);public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 创建主数据流DataStreamMainRecord mainStream env.fromElements(new MainRecord(A, 10),new MainRecord(B, 20),new MainRecord(A, 30));// 创建广播数据流DataStreamBroadcastRecord broadcastStream env.fromElements(new BroadcastRecord(config, 5));// 将广播数据流转化为 BroadcastStreamBroadcastStreamBroadcastRecord broadcast broadcastStream.broadcast(mapStateDescriptor);// 使用 KeyedBroadcastProcessFunction 连接主数据流和广播数据流DataStreamResultRecord resultStream mainStream.keyBy(new MainRecordKeySelector()).connect(broadcast).process(new MyKeyedBroadcastProcessFunction());resultStream.print();env.execute(KeyedBroadcastProcessFunction Example);}/*** 使用提供的键对其运算符状态进行分区*/public static class MainRecordKeySelector implements KeySelectorMainRecord, String {Overridepublic String getKey(MainRecord mainRecord) {return mainRecord.getKey();}}/****/public static class MyKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunctionString, MainRecord, BroadcastRecord, ResultRecord {Overridepublic void processBroadcastElement(BroadcastRecord value, Context ctx, CollectorResultRecord out) throws Exception {// 通过上下文获取广播状态BroadcastStateString, Integer broadcastState ctx.getBroadcastState(mapStateDescriptor);// 处理广播数据流中的每个元素更新广播状态broadcastState.put(value.getConfigKey(), value.getConfigValue());}Overridepublic void processElement(MainRecord value, ReadOnlyContext ctx, CollectorResultRecord out) throws Exception {// 在 processElement 中访问广播状态ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(mapStateDescriptor);// 从广播状态中获取配置值Integer configValue broadcastState.get(config);// 注意刚启动时可能是数据流的第1 2 3...条数据先来 不是广播流先来if (configValue null) {return;}System.out.println(String.format(主数据流的Key: %s, value: %s, 广播更新结果: %s, value.key, value.value, value.value configValue));// 根据配置值和主数据流中的元素执行处理逻辑int result value.getValue() configValue;// 发出结果记录out.collect(new ResultRecord(value.getKey(), result));}}
}
主数据流的Key: A, value: 10, 广播更新结果: 15
主数据流的Key: B, value: 20, 广播更新结果: 25
2 KeyedBroadcastProcessFunctionExample.ResultRecord(keyB, result25)
7 KeyedBroadcastProcessFunctionExample.ResultRecord(keyA, result15)
主数据流的Key: A, value: 30, 广播更新结果: 35
7 KeyedBroadcastProcessFunctionExample.ResultRecord(keyA, result35)