Flink流处理入门
Flink 流处理API
1. Environment
1.1 getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
1ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1.2 createLocalEnvironment
1LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
1.3 createRemoteEnvironment
1StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar");
2. Source
2.1 集合
1StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2env.setParallelism(1);
3
4// 从集合里读取数据
5DataStreamSource<SensorReading> dataStream = env.fromCollection(Arrays.asList(
6 new SensorReading("sensor_1", 1547718199L, 35.8),
7 new SensorReading("sensor_6", 1547718201L, 15.4),
8 new SensorReading("sensor_7", 1547718202L, 6.7),
9 new SensorReading("sensor_10", 1547718205L, 38.1)));
10
11DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 4, 8);
12
13// 打印输出
14dataStream.print("data");
15integerDataStreamSource.print("int");
16
17env.execute();
2.2 文件
1String path = "src/main/resources/sensor.txt";
2
3DataStreamSource<String> dataStream = env.readTextFile(path);
2.3 kafka消息队列
1Properties properties = new Properties();
2properties.setProperty("bootstrap.servers", "localhost:9092");
3properties.setProperty("group.id", "consumer-group");
4properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
5properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
6properties.setProperty("auto.offset.reset", "latest");
7
8
9//从kafka读取文件
10DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(),
11 properties));
2.4 自定义Source
除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:
1DataStreamSource<SensorReading> dataStream = env.addSource(new MySourceFucntion());
我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:
1public static class MySourceFucntion implements SourceFunction<SensorReading>{
2 private boolean running = true;
3
4 @Override
5 public void run(SourceContext<SensorReading> ctx) throws Exception {
6 HashMap<String, Double> sensorTempMap = new HashMap<>(10);
7 // 设置10个传感器的初始温度
8 Random random = new Random();
9 for (int i = 0; i < 10; i++) {
10 sensorTempMap.put("sensor_" + (i+1), 60 + random.nextGaussian() * 20);
11 }
12
13 while(running){
14 for (String sensorId : sensorTempMap.keySet() ) {
15 // 在当前温度基础上随机波动
16 double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
17 sensorTempMap.put(sensorId, newTemp);
18 ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
19 }
20 // 控制输出频率
21 Thread.sleep(1000L);
22 }
23 }
24
25 @Override
26 public void cancel() {
27 running = false;
28 }
29}
3. Transform
3.1 map
1DataStreamSource<String> inputStream = env.readTextFile(path);
2...
3DataStream<Integer> mapStram = inputStream.map(new MapFunction<String, Integer>() {
4 public Integer map(String value) throws Exception {
5 return value.length();
6 }
7 });
3.2 flatMap
1// 2. flatmap,按逗号分字段
2DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
3 @Override
4 public void flatMap(String value, Collector<String> out) throws Exception {
5 String[] fields = value.split(",");
6 for( String field: fields )
7 out.collect(field);
8 }
9});
3.3 filter
1DataStream<String> filterStream = inputStream.filter(line -> line.startsWith("sensor_1"));
3.4 keyBy
DataStream→KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的.
3.5 滚动聚合算子(Rolling Aggregation)
这些算子可以针对$KeyedStream$的每一个支流做聚合。
- sum()
- min()
- max()
- minBy()
- Maxby()
3.6 Reduce
KeyedStream →DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
1DataStreamSource<String> inputStream = env.readTextFile(path);
2
3 DataStream<SensorReading> mapStream = inputStream.map(line -> {
4 String[] fields = line.split(",");
5 return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
6 });
7 KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
8
9 SingleOutputStreamOperator<SensorReading> reduceStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
10 @Override
11 public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
12 return new SensorReading(value1.getId(), value2.getTimestamp(),
13 Math.max(value1.getTemperature(), value2.getTemperature()));
14 }
15 });
16
17 //SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
18 //SensorReading{id='sensor_1', timestamp=1547718209, temperature=36.3}
19 reduceStream.print();
20 env.execute();
3.7 Split/Select
DataStream → SplitStream:根据某些特征把一个$DataStream$拆分成两个或者多个$DataStream$.
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream.
1//传感器数据按照温度高低(以 30 度为界),拆分 成两个流 。
2DataStreamSource<String> inputStream = env.readTextFile(path);
3
4DataStream<SensorReading> dataStream = inputStream.map(line -> {
5 String[] fields = line.split(",");
6 return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
7});
8// 分流,按照温度值30度为界分为两条流
9SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
10 @Override
11 public Iterable<String> select(SensorReading value) {
12 return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
13 }
14});
15
16DataStream<SensorReading> highStream = splitStream.select("high");
17DataStream<SensorReading> lowStream = splitStream.select("low");
3.8 Connect和CoMap
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
ConnectedStreams → DataStream:作用于$ConnectedStreams$上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理
1// 2. 合流 connect,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息
2DataStream<Tuple2<String, Double>> warningStream = highStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
3 @Override
4 public Tuple2<String, Double> map(SensorReading value) throws Exception {
5 return new Tuple2<>(value.getId(), value.getTemperature());
6 }
7 });
8ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowStream);
9
10SingleOutputStreamOperator<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
11 @Override
12 public Object map1(Tuple2<String, Double> value) throws Exception {
13 return new Tuple3<>(value.f0, value.f1, "high temperature warning");
14 }
15
16 @Override
17 public Object map2(SensorReading value) throws Exception {
18 return new Tuple2<>(value.getId(), "normal");
19 }
20 });
3.9 Union
DataStream → DataStream:对两个或者两个以上的$DataStream$进行union操作,产生一个包含所有$DataStream$元素的新$DataStream$。
Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
Connect只能操作两个流,Union可以操作多个
1highStream.union(lowStream, allStream);
总览