摘要:本文主要描述flink
在接收到上游数据后,使用转换算子,可以如何进行数据转换,得到我们想要的数据,章节主要分为基本转换算子、聚合算子、用户自定义函数、物理分区算子、分流、合流。
基本转换算子
map
map
是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。下面例子把字符串转为Int类型
public class StrToIntMapFunction implements MapFunction<String,Integer> {
@Override
public Integer map(String value) throws Exception {
return Integer.valueOf(value);
}
}
filter
数据过滤,过滤出我们想要的数据,下面我们过滤出数据大于10的数字
public class NumberFilter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer value) throws Exception {
return value > 10;
}
}
flatMap
扁平数据转换,我们可以把一条数据转换为多条数据。下面我们把大于10的数据拆分为两条。
public class NumberFlatMapFunction implements FlatMapFunction<String,Integer> {
@Override
public void flatMap(String value, Collector<Integer> out) throws Exception {
Integer v = Integer.parseInt(value);
// 大于10的分为10和它的差
if(v > 10){
out.collect(10);
out.collect(v - 10);
}else{
// 小于10的就输出零
out.collect(0);
}
}
}
完整案例
ConvertStreamDemo
public class ConvertStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> lineDs = env.socketTextStream("192.168.137.100",7777);
SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = lineDs.map(new StrToIntMapFunction());
// 输出一
SingleOutputStreamOperator<Integer> gt10 = integerSingleOutputStreamOperator.filter(new NumberFilter());
gt10.print("输出一");
// 输出二
SingleOutputStreamOperator<Integer> flatMapDatas = lineDs.flatMap(new NumberFlatMapFunction());
flatMapDatas.print("输出二");
env.execute();
}
}
特殊的RichXXXFunction
基本上每个算子都有
RichFunction
,它提供了更加强大的支持,下面用Filter
的做一个介绍,其他的大同小异
NumberRichFilterFunction
public class NumberRichFilterFunction extends RichFilterFunction<Integer> {
/**
* 该方法在算子初始化的时候只执行一次,
* @param parameters The configuration containing the parameters attached to the contract.
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 该方法可以获取到我们的运行上下文,比如获取配置参数等等都可以用该方法
this.getRuntimeContext();
}
/**
* 在整个应用关闭的时候执行一次
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
}
@Override
public boolean filter(Integer value) throws Exception {
return false;
}
}
要看效果把上面的
Filter
替换了即可。
聚合算子
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并一-这就是所谓的“聚合”(Aggregation),类似于 MapReduce 中的 reduce操作。主要包含按键分区、简单聚合算子和规约聚合(自定义聚合)。
按键分区keyBy
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键 (key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。“基于不同的 key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key 的数据,都将被发往同一个分区。
StrKeyByProcess
分区后的数据处理
public class StrKeyByProcess extends KeyedProcessFunction<String,String,String> {
private static final Logger log = LoggerFactory.getLogger(StrKeyByProcess.class);
/** 统计每个单词出现次数 */
ValueState<Integer> count;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
count = getRuntimeContext().getState(new ValueStateDescriptor<>("lastVcState", Types.INT));
}
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {
if(count.value() == null){
count.update(1);
}else{
count.update(count.value() +1 );
}
log.info("单词{}出现{}次",value,count.value());
}
}
测试StrKeyedStreamDemo
public class StrKeyedStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> lineDs = env.socketTextStream("192.168.137.100",7777);
// 分组,KeySelector让使用什么来分组
KeyedStream<String, String> stringStringKeyedStream = lineDs.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
// 分组后的数据处理
stringStringKeyedStream.process(new StrKeyByProcess());
env.execute();
}
}
测试结果
简单聚合count、sum、max、min
等等
定义一个实体类
public class Student {
/** ID */
private String id;
/** 名称 */
private String name;
/** 年龄 */
private Integer age;
/** 有多少钱 */
private Integer money;
public Student() {
}
public Student(String id, String name, Integer age, Integer money) {
this.id = id;
this.name = name;
this.age = age;
this.money = money;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Integer getMoney() {
return money;
}
public void setMoney(Integer money) {
this.money = money;
}
@Override
public String toString() {
return "Student{" +
"id='" + id + ''' +
", name='" + name + ''' +
", age=" + age +
", money=" + money +
'}';
}
}
SimpleAggrDemo
public class SimpleAggrDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Student> sensorDS = env.fromElements(
new Student("1", "zhangsan", 10,1000),
new Student("2", "lisi", 11,32),
new Student("3", "wangwu", 10,4000),
new Student("4", "zhaoliu", 9,0)
);
KeyedStream<Student, Integer> sensorKS = sensorDS
.keyBy(new KeySelector<Student, Integer>() {
@Override
public Integer getKey(Student value) throws Exception {
return value.getAge();
}
});
/**
* TODO 简单聚合算子
* 1、 keyby之后才能调用
* 2、 分组内的聚合:对同一个key的数据进行聚合
*/
// 传位置索引的,适用于 Tuple类型,POJO不行
// SingleOutputStreamOperator<Student> result = sensorKS.sum(2);
// SingleOutputStreamOperator<Student> result = sensorKS.sum("money");
/**
* max\maxby的区别: 同min
* max:只会取比较字段的最大值,非比较字段保留第一次的值
* maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值
*/
// SingleOutputStreamOperator<Student> result = sensorKS.max("money");
// SingleOutputStreamOperator<Student> result = sensorKS.min("money");
SingleOutputStreamOperator<Student> result = sensorKS.maxBy("money");
// SingleOutputStreamOperator<Student> result = sensorKS.minby("money");
result.print();
env.execute();
}
}
结果
规约聚合
ReduceDemo
public class ReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Student> sensorDS = env.fromElements(
new Student("1", "zhangsan", 10,1000),
new Student("2", "lisi", 11,32),
new Student("3", "wangwu", 10,4000),
new Student("4", "zhaoliu", 9,0)
);
KeyedStream<Student, Integer> sensorKS = sensorDS
.keyBy(new KeySelector<Student, Integer>() {
@Override
public Integer getKey(Student value) throws Exception {
return value.getAge();
}
});
// 我们求分组的金钱总数
// * 1、keyby之后调用
// * 2、输入类型 = 输出类型,类型不能变
// * 3、每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
// * 4、reduce方法中的两个参数
// * value1: 之前的计算结果,存状态
// * value2: 现在来的数据
//
SingleOutputStreamOperator<Student> result = sensorKS.reduce(new ReduceFunction<Student>() {
@Override
public Student reduce(Student value1, Student value2) throws Exception {
return new Student(value1.getId(), value1.getName(), value1.getAge(), value1.getMoney() + value2.getMoney());
}
});
result.print();
env.execute();
}
}
结果
用户自定义函数
就是一些常用的
Function
,上面的filter、map
都是这类函数,下面举例常用的
RichFunctionStreamDemo
public class RichFunctionStreamDemo {
private static final Logger log = LoggerFactory.getLogger(RichFunctionStreamDemo.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> lineDs = env.socketTextStream("192.168.137.100",7777);
SingleOutputStreamOperator<Integer> outputStreamOperator = lineDs.map(new RichMapFunction<String, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
log.info("只执行一次");
}
@Override
public Integer map(String value) throws Exception {
return Integer.valueOf(value);
}
});
outputStreamOperator.print("打印");
env.execute();
}
}
执行结果
物理分区算子
常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。
系统自带的分区算子案例
public class PartitionStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
DataStreamSource<String> socketDS = env.socketTextStream("192.168.137.100",7777);
// shuffle随机分区: random.nextInt(下游算子并行度)
// socketDS.shuffle().print();
// rebalance轮询:nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度
// 如果是 数据源倾斜的场景, source后,调用rebalance,就可以解决 数据源的 数据倾斜
// socketDS.rebalance().print();
//rescale缩放: 实现轮询, 局部组队,比rebalance更高效
// socketDS.rescale().print();
// broadcast 广播: 发送给下游所有的子任务
// socketDS.broadcast().print();
// global 全局: 全部发往 第一个子任务
// return 0;
socketDS.global().print();
// keyby: 按指定key去发送,相同key发往同一个子任务
// one-to-one: Forward分区器
// 总结: Flink提供了 7种分区器+ 1种自定义
env.execute();
}
}
自定义分区
public class PartitionCustomStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(2);
DataStreamSource<String> socketDS = env.socketTextStream("192.168.137.100",7777);
socketDS
.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
// key是传入的值,numPartitions是下游有几个分区
return Integer.parseInt(key) % numPartitions;
}
}, r -> r)
.print();
env.execute();
}
}
结果如下
分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
filter
分流
public class SplitByFilterDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("192.168.137.100",7777);
// 缺点: 同一个数据,要被处理两遍(调用两次filter),数据流量发送太多会是瓶颈
SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);
SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);
even.print("偶数流");
odd.print("奇数流");
env.execute();
}
}
结果
自定义分流
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("192.168.137.100",7777);
/**
* 创建OutputTag对象
* 第一个参数: 标签名
* 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
*/
OutputTag<String> s1Tag = new OutputTag<>("out1", TypeInformation.of(new TypeHint<String>() {}));
OutputTag<String> s2Tag = new OutputTag<>("out2", TypeInformation.of(new TypeHint<String>() {}));
SingleOutputStreamOperator<String> process = sensorDS
.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (value.startsWith("out1")) {
// 如果是 s1,放到侧输出流s1中
/**
* 上下文ctx 调用ouput,将数据放入侧输出流
* 第一个参数: Tag对象
* 第二个参数: 放入侧输出流中的 数据
*/
ctx.output(s1Tag, value);
} else if (value.startsWith("out2")) {
// 如果是 s2,放到侧输出流s2中
ctx.output(s2Tag, value);
} else {
// 非s1、s2的数据,放到主流中
out.collect(value);
}
}
}
);
// 从主流中,根据标签 获取 侧输出流
SideOutputDataStream<String> s1 = process.getSideOutput(s1Tag);
SideOutputDataStream<String> s2 = process.getSideOutput(s2Tag);
// 打印主流
process.print("主流-非out1、out2");
//打印 侧输出流
s1.printToErr("out1");
s2.printToErr("out2");
env.execute();
}
}
结果
合流
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理所以 Flink 中合流的操作会更加普遍,对应的 API也更加丰富
union
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33);
DataStreamSource<String> source3 = env.fromElements("111", "222", "333");
/**
* union:合并数据流
* 1、 流的数据类型必须一致
* 2、 一次可以合并多条流
*/
// DataStream<Integer> union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r)));
DataStream<Integer> union = source1.union(source2, source3.map(r -> Integer.valueOf(r)));
union.print();
env.execute();
}
}
connect
为了处理更加灵活,连接操作允许流的数据类型不同但我们知道一个DataStrcam中的数据只能有唯一的类型所以连接得到的并不是DataStream,而是一个“连接流连接流可以看成是两条流形式上的“统一”,被放在了个同一个流中;事实上内部仍保持各白的数据形式不变,彼此之间是相互独立的。要想得到新的DataStrcamm,还需要进一步定义一个“同处理” (co-process) 转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。
普通的connect
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Integer> source1 = env
.socketTextStream("192.168.137.100", 7777)
.map(i -> Integer.parseInt(i));
DataStreamSource<String> source2 = env.socketTextStream("192.168.137.100", 8888);
/**
*
* 1、一次只能连接 2条流
* 2、流的数据类型可以不一样
* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
*/
ConnectedStreams<Integer, String> connect = source1.connect(source2);
SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return "来源于数字流:" + value.toString();
}
@Override
public String map2(String value) throws Exception {
return "来源于字母流:" + value;
}
});
result.print();
env.execute();
}
}
合流过后再keyby
两条数据流可以不一样,但是在
keyby
后需要输出一样的数据,下面的例子展示了数据库的联表查询。
ConnectKeybyDemo
public class ConnectKeybyDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
Tuple2.of(1, "a1"),
Tuple2.of(1, "a2"),
Tuple2.of(2, "b"),
Tuple2.of(3, "c")
);
DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
Tuple3.of(1, "aa1", 1),
Tuple3.of(1, "aa2", 2),
Tuple3.of(2, "bb", 1),
Tuple3.of(3, "cc", 1)
);
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
// 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
/**
* 实现互相匹配的效果: 两条流,,不一定谁的数据先来
* 1、每条流,有数据来,存到一个变量中
* hashmap
* =》key=id,第一个字段值
* =》value=List<数据>
* 2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
*/
SingleOutputStreamOperator<String> process = connectKeyby.process(
new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
// 每条流定义一个hashmap,用来存数据
Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();
/**
* 第一条流的处理逻辑
* @throws Exception
*/
@Override
public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// 1. s1的数据来了,就存到变量中
if (!s1Cache.containsKey(id)) {
// 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
s1Values.add(value);
s1Cache.put(id, s1Values);
} else {
// 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
s1Cache.get(id).add(value);
}
// 2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
if (s2Cache.containsKey(id)) {
for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
}
}
}
/**
* 第二条流的处理逻辑
* @throws Exception
*/
@Override
public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// 1. s2的数据来了,就存到变量中
if (!s2Cache.containsKey(id)) {
// 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
s2Values.add(value);
s2Cache.put(id, s2Values);
} else {
// 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
s2Cache.get(id).add(value);
}
// 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
if (s1Cache.containsKey(id)) {
for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
}
}
}
}
);
process.print();
env.execute();
}
}