Flink中的join

介绍

上篇文章,我们知道了flink中的广播流变量,今天我们来学习flink中的Join操作,只不过Flink是在一个时间窗口上面进行两个表的Join。

目前,Flink支持了两种Join:Window Join(窗口连接)和 Interval Join (时间间隔连接)。

Window Join:主要在Flink的窗口上进行操作,它将两个流中落在相同窗口的元素按照某个key进行Join。

一个Window Join的大致过程如下:

input1.join(input2)
  .where(<KeySelector>)     // input1使用哪个字段作为Key
  .equalTo(<KeySelector>)   // input2使用哪个字段作为Key
  .window(<WindowAssigner>) // 指定WindowAssigner
  [.trigger(<Trigger>)]     // 指定Trigger(可选)
  [.evictor(<Evictor>)]     // 指定Evictor(可选)
  .apply(<JoinFunction>)    // 指定JoinFunction

案例

背景

现在,我们有两个无界流,进行Join,第一个无界流学生的性别流,第二个无界流是学生的基本流;合并两个流后进行Join。

代码

package com.liuzhuo.join;

import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class UnboundedJoinStream {

    public static void main(String[] args) throws Exception {


        //1) 无界流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2) 添加无界流的数据流:自定义一个数据流,每两秒进行发送一个数据
        DataStreamSource<Tuple2<Integer, String>> genderStream = env.addSource(new MySourceFunction());

        //3) socket无界流
        SingleOutputStreamOperator<Tuple4<String, Integer, Integer, String>> stuInfoStream = env.socketTextStream("localhost", 8888)
                .filter(stuInfo -> !stuInfo.isEmpty())
                .map(stuInfo -> {
                    String[] split = stuInfo.split(",");
                    return new Tuple4<String, Integer, Integer, String>(split[0], Integer.valueOf(split[1]), Integer.valueOf(split[2]), split[3]);
                }).returns(TypeInformation.of(new TypeHint<Tuple4<String,Integer, Integer,String>>(){}));

        //4)Join操作
        DataStream<String> result = stuInfoStream.join(genderStream)
                .where(stuInfo -> stuInfo.f2)
                .equalTo(genderInfo -> genderInfo.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
                .apply(new MyFlatJoinFunction());


        //5) 打印
        result.print("两个stream的Join:");

        //6) 执行
        env.execute("两个stream的Join");
    }

    static class MySourceFunction extends RichSourceFunction<Tuple2<Integer, String>> {

        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception {

            List<Tuple2<Integer, String>> sources = Arrays.asList(new Tuple2<Integer, String>(1, "男"), new Tuple2<Integer, String>(2, "女"), new Tuple2<Integer, String>(3, "*"));
            int i = 0;
            while (isRunning) {

                Tuple2<Integer, String> data = sources.get(i++);

                sourceContext.collect(data);

                if (i >= sources.size()) {
                    i = 0;
                }

                //休眠两秒钟
                TimeUnit.SECONDS.sleep(3);
            }
        }

        @Override
        public void cancel() {
            this.isRunning = false;
        }
    }

    static class MyFlatJoinFunction implements FlatJoinFunction<Tuple4<String, Integer, Integer, String>, Tuple2<Integer, String>, String> {

        @Override
        public void join(Tuple4<String, Integer, Integer, String> first, Tuple2<Integer, String> second, Collector<String> out) throws Exception {
            String name = first.f0;
            Integer age = first.f1;
            String gender = second.f1;
            String address = first.f3;
            out.collect(name.concat(",").concat(String.valueOf(age)).concat(",").concat(gender).concat(",").concat(address));
        }
    }
}

效果

这里,我设置的是两秒钟来发送数据,两秒的滚动窗口。

每两秒钟,发送一次性别的数据,此时,我们在终端输入相应的性别序号来印证即可,输入的性别序号不对等就不会打印出来数据的。


  转载请注明: 解忧杂货店 Flink中的join

  目录