摘要:本文主要介绍flink
源算子,为flink
提供数据;介绍了他们如何使用。
什么是源算子
Flink
可以从各种来源获取数据,然后构建DataStream
进行转换处理。一般将数据的输入来源称为数据源 (data source
),而读取数据的算子就是源算子 (source operator
)。所以,source
就是我们整个处理程序的输入端,官方和第三方提供的元算子在这里nightlies.apache.org/flink/flink…;下面将挑选几个案例来讲解一下
FileSource
从文件读取
从文件中读取有界数据,计算完后就停止项目。配置请参考入门篇juejin.cn/post/724149…
FileSourceDemo
类
public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource").print();env.execute();}}public class FileSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FileSource<String> fileSource = FileSource .forRecordStreamFormat( new TextLineInputFormat(), new Path("input/word.txt") ) .build(); env .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource") .print(); env.execute(); } }public class FileSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FileSource<String> fileSource = FileSource .forRecordStreamFormat( new TextLineInputFormat(), new Path("input/word.txt") ) .build(); env .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource") .print(); env.execute(); } }
自定义数据集
顾名思义就是自己
mock
一些数据充当有界数据来计算,如下
public class UserDefDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> source = env.fromElements(1,2,33); // 从元素读// .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读source.print();env.execute();}}public class UserDefDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> source = env .fromElements(1,2,33); // 从元素读 // .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读 source.print(); env.execute(); } }public class UserDefDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> source = env .fromElements(1,2,33); // 从元素读 // .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读 source.print(); env.execute(); } }
DataGeneratorSource
自定义生成器
该模式比较适合于模拟生产环境的数据
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 如果有n个并行度, 最大值设为a// 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个// 其中一个是 0-49,另一个50-99env.setParallelism(2);/*** 数据生成器Source,四个参数:* 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long* 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了* 第三个: 限速策略, 比如 每秒生成几条数据* 第四个: 返回的类型*/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator").print();env.execute();}}public class DataGeneratorDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 如果有n个并行度, 最大值设为a // 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个 // 其中一个是 0-49,另一个50-99 env.setParallelism(2); /** * 数据生成器Source,四个参数: * 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long * 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了 * 第三个: 限速策略, 比如 每秒生成几条数据 * 第四个: 返回的类型 */ DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, 100, RateLimiterStrategy.perSecond(1), Types.STRING ); env .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator") .print(); env.execute(); } }public class DataGeneratorDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 如果有n个并行度, 最大值设为a // 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个 // 其中一个是 0-49,另一个50-99 env.setParallelism(2); /** * 数据生成器Source,四个参数: * 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long * 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了 * 第三个: 限速策略, 比如 每秒生成几条数据 * 第四个: 返回的类型 */ DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, 100, RateLimiterStrategy.perSecond(1), Types.STRING ); env .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator") .print(); env.execute(); } }
KafkaSource
该数据源比较符合生产环境,下面有简单案例
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 从Kafka读: 新Source架构KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口.setGroupId("atguigu") // 指定消费者组的id.setTopics("topic_1") // 指定消费的 Topic.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value.setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略.build();env// .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource").fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}}/*** kafka消费者的参数:* auto.reset.offsets* earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费* latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费** flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest* earliest: 一定从 最早 消费* latest : 一定从 最新 消费****/public class KafkaSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // TODO 从Kafka读: 新Source架构 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口 .setGroupId("atguigu") // 指定消费者组的id .setTopics("topic_1") // 指定消费的 Topic .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value .setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略 .build(); env // .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource") .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource") .print(); env.execute(); } } /** * kafka消费者的参数: * auto.reset.offsets * earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费 * latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费 * * flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest * earliest: 一定从 最早 消费 * latest : 一定从 最新 消费 * * * */public class KafkaSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // TODO 从Kafka读: 新Source架构 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口 .setGroupId("atguigu") // 指定消费者组的id .setTopics("topic_1") // 指定消费的 Topic .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value .setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略 .build(); env // .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource") .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource") .print(); env.execute(); } } /** * kafka消费者的参数: * auto.reset.offsets * earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费 * latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费 * * flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest * earliest: 一定从 最早 消费 * latest : 一定从 最新 消费 * * * */
rabbitmqSource
该案例就不详细讲了,大家都可以参考官方文档来配置,比如这个数据源的nightlies.apache.org/flink/flink…
我们在方法中封装了即可,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// checkpointing is required for exactly-once or at-least-once guaranteesenv.enableCheckpointing(...);final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();final DataStream<String> stream = env.addSource(new RMQSource<String>(connectionConfig, // config for the RabbitMQ connection"queueName", // name of the RabbitMQ queue to consumetrue, // use correlation ids; can be false if only at-least-once is requirednew SimpleStringSchema())) // deserialization schema to turn messages into Java objects.setParallelism(1); // non-parallel source is only required for exactly-oncefinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // checkpointing is required for exactly-once or at-least-once guarantees env.enableCheckpointing(...); final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("localhost") .setPort(5000) ... .build(); final DataStream<String> stream = env .addSource(new RMQSource<String>( connectionConfig, // config for the RabbitMQ connection "queueName", // name of the RabbitMQ queue to consume true, // use correlation ids; can be false if only at-least-once is required new SimpleStringSchema())) // deserialization schema to turn messages into Java objects .setParallelism(1); // non-parallel source is only required for exactly-oncefinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // checkpointing is required for exactly-once or at-least-once guarantees env.enableCheckpointing(...); final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("localhost") .setPort(5000) ... .build(); final DataStream<String> stream = env .addSource(new RMQSource<String>( connectionConfig, // config for the RabbitMQ connection "queueName", // name of the RabbitMQ queue to consume true, // use correlation ids; can be false if only at-least-once is required new SimpleStringSchema())) // deserialization schema to turn messages into Java objects .setParallelism(1); // non-parallel source is only required for exactly-once
© 版权声明
文章版权归作者所有,未经允许请勿转载,侵权请联系 admin@trc20.tw 删除。
THE END