Flink 源算子(数据源)

摘要:本文主要介绍flink源算子,为flink提供数据;介绍了他们如何使用。

什么是源算子

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源 (data source),而读取数据的算子就是源算子 (source operator)。所以,source 就是我们整个处理程序的输入端,官方和第三方提供的元算子在这里nightlies.apache.org/flink/flink…;下面将挑选几个案例来讲解一下

FileSource从文件读取

从文件中读取有界数据,计算完后就停止项目。配置请参考入门篇juejin.cn/post/724149…

FileSourceDemo

public class FileSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/word.txt")
)
.build();
env
.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
.print();
env.execute();
}
}
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {




        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        env.setParallelism(1);

        FileSource<String> fileSource = FileSource
                .forRecordStreamFormat(
                        new TextLineInputFormat(),
                        new Path("input/word.txt")
                )
                .build();


        env
                .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
                .print();


        env.execute();
    }
}
public class FileSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FileSource<String> fileSource = FileSource .forRecordStreamFormat( new TextLineInputFormat(), new Path("input/word.txt") ) .build(); env .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource") .print(); env.execute(); } }

自定义数据集

顾名思义就是自己mock一些数据充当有界数据来计算,如下

public class UserDefDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env
.fromElements(1,2,33); // 从元素读
// .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读
source.print();
env.execute();
}
}
public class UserDefDemo {
    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DataStreamSource<Integer> source = env
                .fromElements(1,2,33); // 从元素读
//                .fromCollection(Arrays.asList(1, 22, 3));  // 从集合读



        source.print();

        env.execute();


    }
}
public class UserDefDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> source = env .fromElements(1,2,33); // 从元素读 // .fromCollection(Arrays.asList(1, 22, 3)); // 从集合读 source.print(); env.execute(); } }

DataGeneratorSource自定义生成器

该模式比较适合于模拟生产环境的数据

public class DataGeneratorDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 如果有n个并行度, 最大值设为a
// 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个
// 其中一个是 0-49,另一个50-99
env.setParallelism(2);
/**
* 数据生成器Source,四个参数:
* 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long
* 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了
* 第三个: 限速策略, 比如 每秒生成几条数据
* 第四个: 返回的类型
*/
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
100,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
env
.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator")
.print();
env.execute();
}
}
public class DataGeneratorDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 如果有n个并行度, 最大值设为a
        // 将数值 均分成 n份,  a/n ,比如,最大100,并行度2,每个并行度生成50个
        // 其中一个是 0-49,另一个50-99
        env.setParallelism(2);


        /**
         * 数据生成器Source,四个参数:
         *     第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long
         *     第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了
         *     第三个: 限速策略, 比如 每秒生成几条数据
         *     第四个: 返回的类型
         */
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                100,
                RateLimiterStrategy.perSecond(1),
                Types.STRING
        );

        env
                .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator")
                .print();


        env.execute();
    }
}
public class DataGeneratorDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 如果有n个并行度, 最大值设为a // 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个 // 其中一个是 0-49,另一个50-99 env.setParallelism(2); /** * 数据生成器Source,四个参数: * 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long * 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了 * 第三个: 限速策略, 比如 每秒生成几条数据 * 第四个: 返回的类型 */ DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, 100, RateLimiterStrategy.perSecond(1), Types.STRING ); env .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator") .print(); env.execute(); } }

KafkaSource

该数据源比较符合生产环境,下面有简单案例

public class KafkaSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 从Kafka读: 新Source架构
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口
.setGroupId("atguigu") // 指定消费者组的id
.setTopics("topic_1") // 指定消费的 Topic
.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value
.setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略
.build();
env
// .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
.print();
env.execute();
}
}
/**
* kafka消费者的参数:
* auto.reset.offsets
* earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费
* latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费
*
* flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest
* earliest: 一定从 最早 消费
* latest : 一定从 最新 消费
*
*
*
*/
public class KafkaSourceDemo {
    public static void main(String[] args) throws Exception {



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 从Kafka读: 新Source架构
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口
                .setGroupId("atguigu")  // 指定消费者组的id
                .setTopics("topic_1")   // 指定消费的 Topic
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value
                .setStartingOffsets(OffsetsInitializer.latest())  // flink消费kafka的策略
                .build();


        env
//                .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
                .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
                .print();


        env.execute();
    }
}
/**
 *   kafka消费者的参数:
 *      auto.reset.offsets
 *          earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费
 *          latest  : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费
 *
 *   flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest
 *          earliest: 一定从 最早 消费
 *          latest  : 一定从 最新 消费
 *
 *
 *
 */
public class KafkaSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // TODO 从Kafka读: 新Source架构 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka节点的地址和端口 .setGroupId("atguigu") // 指定消费者组的id .setTopics("topic_1") // 指定消费的 Topic .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,这个是反序列化value .setStartingOffsets(OffsetsInitializer.latest()) // flink消费kafka的策略 .build(); env // .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource") .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource") .print(); env.execute(); } } /** * kafka消费者的参数: * auto.reset.offsets * earliest: 如果有offset,从offset继续消费; 如果没有offset,从 最早 消费 * latest : 如果有offset,从offset继续消费; 如果没有offset,从 最新 消费 * * flink的kafkasource,offset消费策略:OffsetsInitializer,默认是 earliest * earliest: 一定从 最早 消费 * latest : 一定从 最新 消费 * * * */

rabbitmqSource

该案例就不详细讲了,大家都可以参考官方文档来配置,比如这个数据源的nightlies.apache.org/flink/flink…

我们在方法中封装了即可,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);


final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();
    
final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
        connectionConfig,            // config for the RabbitMQ connection
        "queueName",                 // name of the RabbitMQ queue to consume
        true,                        // use correlation ids; can be false if only at-least-once is required
        new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
    .setParallelism(1);              // non-parallel source is only required for exactly-once
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // checkpointing is required for exactly-once or at-least-once guarantees env.enableCheckpointing(...); final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("localhost") .setPort(5000) ... .build(); final DataStream<String> stream = env .addSource(new RMQSource<String>( connectionConfig, // config for the RabbitMQ connection "queueName", // name of the RabbitMQ queue to consume true, // use correlation ids; can be false if only at-least-once is required new SimpleStringSchema())) // deserialization schema to turn messages into Java objects .setParallelism(1); // non-parallel source is only required for exactly-once

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYcqkK0o' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片