Flink中的广播无界流

今天来学习FLink中的广播无界流的变量。

广播变量简介

在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变量Broadcast便是解决这种情况的。
我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。

如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

注意因为广播变量是要把dataStream广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题

对于有界流:

Broadcast:Broadcast是通过withBroadcastSet(dataset,string)来注册的

Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量

对与无界流:

使用MapStateDescriptor来描述广播流
broadcastStream = dataStream1.boradcaset(mapSateDescriptor);
dataStream2.connect(broadcastStream).process(new BroadcastProcessFunction()).

案例

假设,现在有两个数据流:1)学生的基本信息的数据流;2)学生性别的数据流。

学生的基本信息的数据流:"张三",18,1,"北京"// 姓名:张三,年纪:18,性别:1,地址:北京
学生性别的数据流:
(1,"男") // 1:男 ,2:女,3:*

现在,我们需要将学生基本信息的数据流和性别数据流合并,最终输出完成的学生信息,根据上面的描述,最终的信息展示:

("张三",18,"男","北京")

代码

package com.liuzhuo.broadcast;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class UnBoundBroadCastDemo {

    public static void main(String[] args) throws Exception {

        //1)无界流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2) 获取学生基本信息的数据流:这里使用socket
        DataStreamSource<String> stuInfo = env.socketTextStream("localhost", 6666);


        //3) 获取学生性别的数据流
        DataStreamSource<String> genderInfo = env.socketTextStream("localhost", 7777);

        //4) 设置广播的状态变量
        MapStateDescriptor<String, String> genderState = new MapStateDescriptor("genderInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

        //5) 数据流小的变成广播流
        BroadcastStream<String> genderBroadcast = genderInfo.broadcast(genderState);

        //6) 学生基本信息流连接广播流
        SingleOutputStreamOperator<String> result = stuInfo.connect(genderBroadcast).process(new MyBroadCastFunction(genderState));

        //7) 打印
        result.print();

        //8) 执行
        env.execute("无界流的广播状态!");
    }


    static class MyBroadCastFunction extends BroadcastProcessFunction<String, String, String> {

        private MapStateDescriptor<String, String> mapStateDescriptor;

        public MyBroadCastFunction(MapStateDescriptor<String, String> mapStateDescriptor) {
            this.mapStateDescriptor = mapStateDescriptor;
        }

        @Override
        public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            if (!s.isEmpty()) {
                String[] split = s.split(",");
                String gender = split[2];
                String genderName = readOnlyContext.getBroadcastState(mapStateDescriptor).get(gender);
                if (genderName == null) {
                    genderName = "未知";
                }

                collector.collect(split[0].concat(",").concat(split[1].concat(",").concat(genderName).concat(",").concat(split[3])));
            }
        }

        @Override
        public void processBroadcastElement(String s, Context context, Collector<String> collector) throws Exception {
            //处理广播流的信息
            if (!s.isEmpty()) {
                String[] split = s.split(",");
                String gender = split[0];
                String genderName = split[1];
                context.getBroadcastState(mapStateDescriptor).put(gender, genderName);
            }
        }
    }
}

开启socket的环境

启动两个shell环境

第一个为监听6666端口号的socket,第二个为监听7777端口的socket

启动程序

如果报xxx类找不到的问题,请打开idea的运行环境,勾选如下的信息

验证

1)在端口:7777输入:1,男

结果:控制台什么都不打印。

2)在端口号6666:输入:张三,18,1,北京

3)在端口号6666:输入:王五,19,2,北京

4)在端口:7777输入:2,女


  目录