Flink合流与分流

1.Flink转换算子之合流(Union/Connect)

合流就是将多个流合并成一个流。

1、基于Union

注意: Union可以将两个或多个同数据类型的流合并成一个流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.lxk.test;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.log4j.Logger;

import java.util.Random;

public class TestUnion {
   public static boolean runingFlag = true;
   public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
       //env.setParallelism(1);

       DataStreamSource<Tuple3<Integer, String, String>> ADataStream = env.addSource(new SourceFunction<Tuple3<Integer, String, String>>() {
           @Override
           public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
               while (runingFlag) {
                   ctx.collect(new Tuple3<Integer, String, String>( 1, "111","A"));
                   Thread.sleep(500);
              }
          }
           @Override
           public void cancel() {
               runingFlag = false;
          }
      });
       DataStreamSource<Tuple3<Integer, String, String>> BDataStream = env.addSource(new SourceFunction<Tuple3<Integer, String, String>>() {
           @Override
           public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
               while (runingFlag) {
                   ctx.collect(new Tuple3<Integer, String, String>(1, "222","B"));
                   Thread.sleep(500);
              }
          }
           @Override
           public void cancel() {
               runingFlag = false;
          }
      });
       DataStreamSource<Tuple3<Integer, String, String>> CDataStream = env.addSource(new SourceFunction<Tuple3<Integer, String, String>>() {
           @Override
           public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
               while (runingFlag) {
                   ctx.collect(new Tuple3<Integer, String, String>(1, "333","C"));
                   Thread.sleep(500);
              }
          }

           @Override
           public void cancel() {
               runingFlag = false;
          }
      });

       DataStream<Tuple3<Integer, String, String>> union = ADataStream.union(BDataStream).union(CDataStream);
       union.print();

       env.execute("测试合流");
  }
}

测试结果:

img

2、基于Connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.lxk.test;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.log4j.Logger;

import java.util.Random;

public class TestConnect {
   public static boolean runingFlag = true;

   public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
       //env.setParallelism(1);

       DataStreamSource<Tuple3<Integer, String, String>> ADataStream = env.addSource(new SourceFunction<Tuple3<Integer, String, String>>() {
           @Override
           public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
               while (runingFlag) {
                   ctx.collect(new Tuple3<Integer, String, String>(1, "111", "A"));
                   Thread.sleep(500);
              }
          }

           @Override
           public void cancel() {
               runingFlag = false;
          }
      });
       DataStreamSource<Tuple3<String, Integer, String>> BDataStream = env.addSource(new SourceFunction<Tuple3<String, Integer, String>>() {
           @Override
           public void run(SourceContext<Tuple3<String, Integer, String>> ctx) throws Exception {
               while (runingFlag) {
                   ctx.collect(new Tuple3<String, Integer, String>("B", 2, "222"));
                   Thread.sleep(500);
              }
          }

           @Override
           public void cancel() {
               runingFlag = false;
          }
      });


       ConnectedStreams<Tuple3<Integer, String, String>, Tuple3<String, Integer, String>> connect = ADataStream.connect(BDataStream);
       // 方式一:用CoMap处理合并后的流
       SingleOutputStreamOperator<String> map01 = connect.map(new CoMapFunction<Tuple3<Integer, String, String>, Tuple3<String, Integer, String>, String>() {
           //定义第一个流的处理逻辑
           @Override
           public String map1(Tuple3<Integer, String, String> tuple3) throws Exception {
               return tuple3.f0.toString() + "-" + tuple3.f1 + "-" + tuple3.f2;
          }

           //定义第二个流的处理逻辑
           @Override
           public String map2(Tuple3<String, Integer, String> tuple3) throws Exception {
               return tuple3.f0.toString() + "-" + tuple3.f1 + "-" + tuple3.f2;
          }
      });

       // 方式二:用CoFlatMap处理合并后的流
       SingleOutputStreamOperator<String> map02 = connect.flatMap(new CoFlatMapFunction<Tuple3<Integer, String, String>, Tuple3<String, Integer, String>, String>() {
           @Override
           public void flatMap1(Tuple3<Integer, String, String> tuple3, Collector<String> collector) throws Exception {
               collector.collect(tuple3.f0.toString() + "-" + tuple3.f1 + "-" + tuple3.f2);
          }

           @Override
           public void flatMap2(Tuple3<String, Integer, String> tuple3, Collector<String> collector) throws Exception {
               collector.collect(tuple3.f0 + "-" + tuple3.f1 + "-" + tuple3.f2);
          }
      });

       map01.print("map01:");
       map02.print("map02:");

       env.execute("测试connect合流");
  }
}

测试结果:

img

注意:

  1. Connect可以只能用来合并两种不同类型的流。
  2. Connect合并后,可用map中的CoMapFunction或flatMap中的CoFlatMapFunction来对合并流中的每个流进行处理。

2. 分流(Split/Side)

分流可以将一个流拆分成多个流。

2.1基于Split...Select...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.bigdata.flink;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
* Author: Wang Pei
* Summary:
* 分流:基于Split-Select
*/
@Slf4j
public class SplitStreamBySplit {
   public static void main(String[] args) throws Exception{

       /**运行环境*/
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       /**输入数据源*/
       DataStreamSource<Tuple3<String, String, String>> source = env.fromElements(
               new Tuple3<>("productID1", "click", "user_1"),
               new Tuple3<>("productID1", "click", "user_2"),
               new Tuple3<>("productID1", "browse", "user_1"),
               new Tuple3<>("productID2", "browse", "user_1"),
               new Tuple3<>("productID2", "click", "user_2"),
               new Tuple3<>("productID2", "click", "user_1")
      );

       /**1、定义拆分逻辑*/
       SplitStream<Tuple3<String, String, String>> splitStream = source.split(new OutputSelector<Tuple3<String, String, String>>() {
           @Override
           public Iterable<String> select(Tuple3<String, String, String> value) {

               ArrayList<String> output = new ArrayList<>();
               if (value.f0.equals("productID1")) {
                   output.add("productID1");

              } else if (value.f0.equals("productID2")) {
                   output.add("productID2");
              }

               return output;

          }
      });

       /**2、将流真正拆分出来*/
       splitStream.select("productID1").print();

       env.execute();
  }
}

注意:

  1. Split...Select...Split只是对流中的数据打上标记,并没有将流真正拆分。可通过Select算子将流真正拆分出来。
  2. Split...Select...不能连续分流。即不能Split...Select...Split,但可以如Split...Select...Filter...Split
  3. Split...Select...已经过时,推荐使用更灵活的侧路输出(Side-Output),如下。

2.2 基于Side-Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.bigdata.flink;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


/**
* Author: Wang Pei
* Summary:
* 分流:基于SideOutput(侧路输出)
*/
@Slf4j
public class SplitStreamBySideOutput {
   public static void main(String[] args) throws Exception{

       /**运行环境*/
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       /**输入数据源*/
       DataStreamSource<Tuple3<String, String, String>> source = env.fromElements(
               new Tuple3<>("productID1", "click", "user_1"),
               new Tuple3<>("productID1", "click", "user_2"),
               new Tuple3<>("productID1", "browse", "user_1"),
               new Tuple3<>("productID2", "browse", "user_1"),
               new Tuple3<>("productID2", "click", "user_2"),
               new Tuple3<>("productID2", "click", "user_1")
      );

       /**1、定义OutputTag*/
       OutputTag<Tuple3<String, String, String>> sideOutputTag = new OutputTag<Tuple3<String, String, String>>("side-output-tag"){};

       /**2、在ProcessFunction中处理主流和分流*/
       SingleOutputStreamOperator<Tuple3<String, String, String>> processedStream = source.process(new ProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>>() {
           @Override
           public void processElement(Tuple3<String, String, String> value, Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {

               //侧流-只输出特定数据
               if (value.f0.equals("productID1")) {
                   ctx.output(sideOutputTag, value);

               //主流
              }else {
                   out.collect(value);
              }

          }
      });

       //获取主流
       processedStream.print();
       //获取侧流
       processedStream.getSideOutput(sideOutputTag).print();

       env.execute();
  }
}