Flink 转换算子(转算数据)

摘要:本文主要描述flink在接收到上游数据后,使用转换算子,可以如何进行数据转换,得到我们想要的数据,章节主要分为基本转换算子、聚合算子、用户自定义函数、物理分区算子、分流、合流。

基本转换算子

map

map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。下面例子把字符串转为Int类型

public class StrToIntMapFunction implements MapFunction<String,Integer> {
    

    @Override

    public Integer map(String value) throws Exception {
        return Integer.valueOf(value);
    }

    
}

filter

数据过滤,过滤出我们想要的数据,下面我们过滤出数据大于10的数字

public class NumberFilter implements FilterFunction<Integer> {











    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 10;
    }

}

flatMap

扁平数据转换,我们可以把一条数据转换为多条数据。下面我们把大于10的数据拆分为两条。

public class NumberFlatMapFunction implements FlatMapFunction<String,Integer> {











    @Override

    public void flatMap(String value, Collector<Integer> out) throws Exception {
        Integer v = Integer.parseInt(value);
        // 大于10的分为10和它的差
        if(v > 10){
            out.collect(10);




            out.collect(v - 10);
        }else{
            // 小于10的就输出零
            out.collect(0);
        }
    }




}

完整案例

ConvertStreamDemo

public class ConvertStreamDemo {











    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());








        DataStreamSource<String> lineDs = env.socketTextStream("192.168.137.100",7777);
        SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = lineDs.map(new StrToIntMapFunction());





        // 输出一
        SingleOutputStreamOperator<Integer> gt10 = integerSingleOutputStreamOperator.filter(new NumberFilter());
        gt10.print("输出一");



        // 输出二
        SingleOutputStreamOperator<Integer> flatMapDatas = lineDs.flatMap(new NumberFlatMapFunction());
        flatMapDatas.print("输出二");
        env.execute();
    }
}

image.png

image.png

特殊的RichXXXFunction

基本上每个算子都有RichFunction,它提供了更加强大的支持,下面用Filter的做一个介绍,其他的大同小异

NumberRichFilterFunction

public class NumberRichFilterFunction extends RichFilterFunction<Integer> {











    /**
     * 该方法在算子初始化的时候只执行一次,
     * @param parameters The configuration containing the parameters attached to the contract.
     * @throws Exception
     */
    @Override

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 该方法可以获取到我们的运行上下文,比如获取配置参数等等都可以用该方法
        this.getRuntimeContext();
    }






    /**
     * 在整个应用关闭的时候执行一次
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
    }



    @Override
    public boolean filter(Integer value) throws Exception {
        return false;
    }
}

要看效果把上面的Filter替换了即可。

聚合算子

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并一-这就是所谓的“聚合”(Aggregation),类似于 MapReduce 中的 reduce操作。主要包含按键分区、简单聚合算子和规约聚合(自定义聚合)。

按键分区keyBy

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键 (key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。“基于不同的 key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key 的数据,都将被发往同一个分区。

StrKeyByProcess分区后的数据处理

public class StrKeyByProcess extends KeyedProcessFunction<String,String,String> {











    private static final Logger log = LoggerFactory.getLogger(StrKeyByProcess.class);




    /** 统计每个单词出现次数 */
    ValueState<Integer> count;




    @Override

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        count = getRuntimeContext().getState(new ValueStateDescriptor<>("lastVcState", Types.INT));
    }



    @Override
    public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {
        if(count.value() == null){
            count.update(1);
        }else{
            count.update(count.value() +1 );
        }
        log.info("单词{}出现{}次",value,count.value());
    }

}

测试StrKeyedStreamDemo

public class StrKeyedStreamDemo {











    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());







        DataStreamSource<String> lineDs = env.socketTextStream("192.168.137.100",7777);
        // 分组,KeySelector让使用什么来分组
        KeyedStream<String, String> stringStringKeyedStream = lineDs.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });
        // 分组后的数据处理
        stringStringKeyedStream.process(new StrKeyByProcess());









        env.execute();
    }


}


测试结果

image.png

简单聚合count、sum、max、min等等

定义一个实体类

public class Student {











    /** ID */
    private String id;
    /** 名称 */
    private String name;
    /** 年龄 */
    private Integer age;
    /** 有多少钱 */
    private Integer money;


    public Student() {
    }






    public Student(String id, String name, Integer age, Integer money) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.money = money;
    }




    public String getId() {
        return id;
    }



    public void setId(String id) {
        this.id = id;
    }




    public String getName() {
        return name;
    }



    public void setName(String name) {
        this.name = name;
    }


    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }


    public Integer getMoney() {
        return money;
    }

    public void setMoney(Integer money) {
        this.money = money;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id='" + id + ''' +
                ", name='" + name + ''' +
                ", age=" + age +
                ", money=" + money +
                '}';
    }
}

SimpleAggrDemo
public class SimpleAggrDemo {











    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);










        DataStreamSource<Student> sensorDS = env.fromElements(
                new Student("1", "zhangsan", 10,1000),
                new Student("2", "lisi", 11,32),
                new Student("3", "wangwu", 10,4000),
                new Student("4", "zhaoliu", 9,0)
        );

















        KeyedStream<Student, Integer> sensorKS = sensorDS
                .keyBy(new KeySelector<Student, Integer>() {
                    @Override
                    public Integer getKey(Student value) throws Exception {
                        return value.getAge();
                    }

                });



        /**
         * TODO 简单聚合算子
         *  1、 keyby之后才能调用
         *  2、 分组内的聚合:对同一个key的数据进行聚合
         */
        // 传位置索引的,适用于 Tuple类型,POJO不行
//        SingleOutputStreamOperator<Student> result = sensorKS.sum(2);
//        SingleOutputStreamOperator<Student> result = sensorKS.sum("money");



        /**
         *   max\maxby的区别: 同min
         *       max:只会取比较字段的最大值,非比较字段保留第一次的值
         *       maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值
         */
//        SingleOutputStreamOperator<Student> result = sensorKS.max("money");
//        SingleOutputStreamOperator<Student> result = sensorKS.min("money");
        SingleOutputStreamOperator<Student> result = sensorKS.maxBy("money");
//        SingleOutputStreamOperator<Student> result = sensorKS.minby("money");


        result.print();



        env.execute();
    }

}
结果

image.png

规约聚合

ReduceDemo
public class ReduceDemo {











    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);










        DataStreamSource<Student> sensorDS = env.fromElements(
                new Student("1", "zhangsan", 10,1000),
                new Student("2", "lisi", 11,32),
                new Student("3", "wangwu", 10,4000),
                new Student("4", "zhaoliu", 9,0)
        );

















        KeyedStream<Student, Integer> sensorKS = sensorDS
                .keyBy(new KeySelector<Student, Integer>() {
                    @Override
                    public Integer getKey(Student value) throws Exception {
                        return value.getAge();
                    }

                });



        // 我们求分组的金钱总数
        //         * 1、keyby之后调用
        //         * 2、输入类型 = 输出类型,类型不能变
        //         * 3、每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
        //         * 4、reduce方法中的两个参数
        //         *     value1: 之前的计算结果,存状态
        //         *     value2: 现在来的数据
        //
        SingleOutputStreamOperator<Student> result = sensorKS.reduce(new ReduceFunction<Student>() {
            @Override
            public Student reduce(Student value1, Student value2) throws Exception {
                return new Student(value1.getId(), value1.getName(), value1.getAge(), value1.getMoney() + value2.getMoney());
            }
        });

        result.print();


        env.execute();
    }


}
结果

image.png

用户自定义函数

就是一些常用的Function,上面的filter、map都是这类函数,下面举例常用的

RichFunctionStreamDemo

public class RichFunctionStreamDemo {











    private static final Logger log = LoggerFactory.getLogger(RichFunctionStreamDemo.class);











    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());





        DataStreamSource<String> lineDs = env.socketTextStream("192.168.137.100",7777);


        SingleOutputStreamOperator<Integer> outputStreamOperator = lineDs.map(new RichMapFunction<String, Integer>() {



            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                log.info("只执行一次");
            }






            @Override
            public Integer map(String value) throws Exception {
                return Integer.valueOf(value);
            }
        });

        outputStreamOperator.print("打印");


        env.execute();
    }




}

执行结果

image.png

物理分区算子

常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。

系统自带的分区算子案例

public class PartitionStreamDemo {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());





        env.setParallelism(2);






        DataStreamSource<String> socketDS = env.socketTextStream("192.168.137.100",7777);





        // shuffle随机分区: random.nextInt(下游算子并行度)
//        socketDS.shuffle().print();


        // rebalance轮询:nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度
        // 如果是 数据源倾斜的场景, source后,调用rebalance,就可以解决 数据源的 数据倾斜
//        socketDS.rebalance().print();



        //rescale缩放: 实现轮询, 局部组队,比rebalance更高效
//        socketDS.rescale().print();








        // broadcast 广播:  发送给下游所有的子任务
//        socketDS.broadcast().print();

        // global 全局: 全部发往 第一个子任务
        // return 0;
        socketDS.global().print();


        // keyby: 按指定key去发送,相同key发往同一个子任务
        // one-to-one: Forward分区器




        // 总结: Flink提供了 7种分区器+ 1种自定义

        env.execute();
    }
}

自定义分区

public class PartitionCustomStreamDemo {
    

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(2);






        DataStreamSource<String> socketDS = env.socketTextStream("192.168.137.100",7777);





        socketDS
                .partitionCustom(new Partitioner<String>() {
                    @Override
                    public int partition(String key, int numPartitions) {
                        // key是传入的值,numPartitions是下游有几个分区
                        return Integer.parseInt(key) % numPartitions;
                    }
                }, r -> r)
                .print();






        env.execute();
    }


}


结果如下

image.png

分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

filter分流

public class SplitByFilterDemo {











    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());








        env.setParallelism(1);




        DataStreamSource<String> socketDS = env.socketTextStream("192.168.137.100",7777);




        // 缺点: 同一个数据,要被处理两遍(调用两次filter),数据流量发送太多会是瓶颈
        SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);
        SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);








        even.print("偶数流");
        odd.print("奇数流");











        env.execute();
    }


}


结果

image.png

自定义分流

public class SideOutputDemo {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());





        env.setParallelism(1);






        SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("192.168.137.100",7777);









        /**
         * 创建OutputTag对象
         * 第一个参数: 标签名
         * 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
         */

        OutputTag<String> s1Tag = new OutputTag<>("out1", TypeInformation.of(new TypeHint<String>() {}));
        OutputTag<String> s2Tag = new OutputTag<>("out2", TypeInformation.of(new TypeHint<String>() {}));





        SingleOutputStreamOperator<String> process = sensorDS
                .process(
                        new ProcessFunction<String, String>() {
                            @Override
                            public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                                if (value.startsWith("out1")) {
                                    // 如果是 s1,放到侧输出流s1中
                                    /**
                                     * 上下文ctx 调用ouput,将数据放入侧输出流
                                     * 第一个参数: Tag对象
                                     * 第二个参数: 放入侧输出流中的 数据
                                     */
                                    ctx.output(s1Tag, value);
                                } else if (value.startsWith("out2")) {
                                    // 如果是 s2,放到侧输出流s2中



                                    ctx.output(s2Tag, value);
                                } else {
                                    // 非s1、s2的数据,放到主流中
                                    out.collect(value);
                                }

                            }
                        }
                );
        // 从主流中,根据标签 获取 侧输出流
        SideOutputDataStream<String> s1 = process.getSideOutput(s1Tag);
        SideOutputDataStream<String> s2 = process.getSideOutput(s2Tag);




        // 打印主流
        process.print("主流-非out1、out2");

        //打印 侧输出流
        s1.printToErr("out1");
        s2.printToErr("out2");

        env.execute();
    }
}

结果

image.png

合流

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理所以 Flink 中合流的操作会更加普遍,对应的 API也更加丰富

union

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

public class UnionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);







        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33);
        DataStreamSource<String> source3 = env.fromElements("111", "222", "333");




        /**
         * union:合并数据流
         * 1、 流的数据类型必须一致
         * 2、 一次可以合并多条流
         */

//        DataStream<Integer> union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r)));
        DataStream<Integer> union = source1.union(source2, source3.map(r -> Integer.valueOf(r)));
        union.print();








        env.execute();
    }
}

connect

为了处理更加灵活,连接操作允许流的数据类型不同但我们知道一个DataStrcam中的数据只能有唯一的类型所以连接得到的并不是DataStream,而是一个“连接流连接流可以看成是两条流形式上的“统一”,被放在了个同一个流中;事实上内部仍保持各白的数据形式不变,彼此之间是相互独立的。要想得到新的DataStrcamm,还需要进一步定义一个“同处理” (co-process) 转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。

普通的connect

public class ConnectDemo {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);













        SingleOutputStreamOperator<Integer> source1 = env
                .socketTextStream("192.168.137.100", 7777)
                .map(i -> Integer.parseInt(i));


        DataStreamSource<String> source2 = env.socketTextStream("192.168.137.100", 8888);






        /**
         * 
         * 1、一次只能连接 2条流
         * 2、流的数据类型可以不一样
         * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
         */
        ConnectedStreams<Integer, String> connect = source1.connect(source2);


        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "来源于数字流:" + value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return "来源于字母流:" + value;
            }
        });



        result.print();


        env.execute();
    }
}

合流过后再keyby

两条数据流可以不一样,但是在keyby后需要输出一样的数据,下面的例子展示了数据库的联表查询。

ConnectKeybyDemo

public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);







        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );






        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);


        // 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);


        /**
         * 实现互相匹配的效果:  两条流,,不一定谁的数据先来
         *  1、每条流,有数据来,存到一个变量中
         *      hashmap
         *      =》key=id,第一个字段值
         *      =》value=List<数据>
         *  2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
         */
        SingleOutputStreamOperator<String> process = connectKeyby.process(
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                    // 每条流定义一个hashmap,用来存数据
                    Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
                    Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();



                    /**
                     * 第一条流的处理逻辑
                     * @throws Exception
                     */
                    @Override
                    public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // 1. s1的数据来了,就存到变量中
                        if (!s1Cache.containsKey(id)) {
                            // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
                            List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                            s1Values.add(value);
                            s1Cache.put(id, s1Values);
                        } else {
                            // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
                            s1Cache.get(id).add(value);
                        }

                        //  2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
                        if (s2Cache.containsKey(id)) {
                            for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
                            }
                        }

                    }

                    /**
                     * 第二条流的处理逻辑
                     * @throws Exception
                     */
                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // 1. s2的数据来了,就存到变量中
                        if (!s2Cache.containsKey(id)) {
                            // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
                            List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                            s2Values.add(value);
                            s2Cache.put(id, s2Values);
                        } else {
                            // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
                            s2Cache.get(id).add(value);
                        }

                        // 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
                        if (s1Cache.containsKey(id)) {
                            for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
                            }
                        }
                    }
                }
        );

        process.print();


        env.execute();
    }
}

image.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY1wlcbJ' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片