image-20210630103508004

1. Environment

1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

1ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

1.2 createLocalEnvironment

1LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

1.3 createRemoteEnvironment

1StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar");

2. Source

2.1 集合

 1StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2env.setParallelism(1);
 3
 4// 从集合里读取数据
 5DataStreamSource<SensorReading> dataStream = env.fromCollection(Arrays.asList(
 6    new SensorReading("sensor_1", 1547718199L, 35.8),
 7    new SensorReading("sensor_6", 1547718201L, 15.4),
 8    new SensorReading("sensor_7", 1547718202L, 6.7),
 9    new SensorReading("sensor_10", 1547718205L, 38.1)));
10
11DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 4, 8);
12
13// 打印输出
14dataStream.print("data");
15integerDataStreamSource.print("int");
16
17env.execute();

2.2 文件

1String path = "src/main/resources/sensor.txt";
2
3DataStreamSource<String> dataStream = env.readTextFile(path);

2.3 kafka消息队列

 1Properties properties = new Properties();
 2properties.setProperty("bootstrap.servers", "localhost:9092");
 3properties.setProperty("group.id", "consumer-group");
 4properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 5properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 6properties.setProperty("auto.offset.reset", "latest");
 7
 8
 9//从kafka读取文件
10DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), 			
11                                                                                  properties));

2.4 自定义Source

除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:

1DataStreamSource<SensorReading> dataStream = env.addSource(new MySourceFucntion());

我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:

 1public static class MySourceFucntion implements SourceFunction<SensorReading>{
 2    private boolean running = true;
 3
 4    @Override
 5    public void run(SourceContext<SensorReading> ctx) throws Exception {
 6        HashMap<String, Double> sensorTempMap = new HashMap<>(10);
 7        // 设置10个传感器的初始温度
 8        Random random = new Random();
 9        for (int i = 0; i < 10; i++) {
10            sensorTempMap.put("sensor_" + (i+1), 60 + random.nextGaussian() * 20);
11        }
12
13        while(running){
14            for (String sensorId : sensorTempMap.keySet() ) {
15                // 在当前温度基础上随机波动
16                double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
17                sensorTempMap.put(sensorId, newTemp);
18                ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
19            }
20            // 控制输出频率
21            Thread.sleep(1000L);
22        }
23    }
24
25    @Override
26    public void cancel() {
27        running = false;
28    }
29}

3. Transform

3.1 map

image-20210630084715606

1DataStreamSource<String> inputStream = env.readTextFile(path);
2...
3DataStream<Integer> mapStram = inputStream.map(new MapFunction<String, Integer>() {
4        public Integer map(String value) throws Exception {
5            return value.length();
6        }
7    });

3.2 flatMap

1// 2. flatmap,按逗号分字段
2DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
3    @Override
4    public void flatMap(String value, Collector<String> out) throws Exception {
5        String[] fields = value.split(",");
6        for( String field: fields )
7            out.collect(field);
8    }
9});

3.3 filter

1DataStream<String> filterStream = inputStream.filter(line -> line.startsWith("sensor_1"));

3.4 keyBy

image-20210630084918876

DataStreamKeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的.

3.5 滚动聚合算子(Rolling Aggregation)

​ 这些算子可以针对$KeyedStream$的每一个支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • Maxby()

3.6 Reduce

KeyedStreamDataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

 1DataStreamSource<String> inputStream = env.readTextFile(path);
 2
 3        DataStream<SensorReading> mapStream = inputStream.map(line -> {
 4            String[] fields = line.split(",");
 5            return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
 6        });
 7        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
 8
 9        SingleOutputStreamOperator<SensorReading> reduceStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
10            @Override
11            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
12                return new SensorReading(value1.getId(), value2.getTimestamp(),
13                        Math.max(value1.getTemperature(), value2.getTemperature()));
14            }
15        });
16
17        //SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
18        //SensorReading{id='sensor_1', timestamp=1547718209, temperature=36.3}
19        reduceStream.print();
20        env.execute();

3.7 Split/Select

image-20210630100205168

DataStreamSplitStream:根据某些特征把一个$DataStream$拆分成两个或者多个$DataStream$.

image-20210630100619233

SplitStreamDataStream:从一个SplitStream中获取一个或者多个DataStream.

 1//传感器数据按照温度高低(以 30 度为界),拆分 成两个流 。
 2DataStreamSource<String> inputStream = env.readTextFile(path);
 3
 4DataStream<SensorReading> dataStream = inputStream.map(line -> {
 5    String[] fields = line.split(",");
 6    return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
 7});
 8// 分流,按照温度值30度为界分为两条流
 9SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
10    @Override
11    public Iterable<String> select(SensorReading value) {
12        return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
13    }
14});
15
16DataStream<SensorReading> highStream = splitStream.select("high");
17DataStream<SensorReading> lowStream = splitStream.select("low");

3.8 Connect和CoMap

image-20210630102511107

DataStream,DataStreamConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

image-20210630102609192

ConnectedStreams → DataStream:作用于$ConnectedStreams$上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理

 1// 2. 合流 connect,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息
 2DataStream<Tuple2<String, Double>> warningStream = highStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
 3            @Override
 4            public Tuple2<String, Double> map(SensorReading value) throws Exception {
 5                return new Tuple2<>(value.getId(), value.getTemperature());
 6            }
 7        });
 8ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowStream);
 9
10SingleOutputStreamOperator<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, 		Object>() {
11                @Override
12                public Object map1(Tuple2<String, Double> value) throws Exception {
13                    return new Tuple3<>(value.f0, value.f1, "high temperature warning");
14                }
15
16                @Override
17                public Object map2(SensorReading value) throws Exception {
18                    return new Tuple2<>(value.getId(), "normal");
19                }
20            });

3.9 Union

image-20210630103112209

DataStreamDataStream:对两个或者两个以上的$DataStream$进行union操作,产生一个包含所有$DataStream$元素的新$DataStream$。

  • Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。

  • Connect只能操作两个流,Union可以操作多个

1highStream.union(lowStream, allStream);

总览

image-20210630103112209