MapReduce实现TopN的效果

1、背景

最近在学习Hadoop的MapReduce,此处记录一下如何实现 TopN 的效果,以及在MapReduce中如何实现 自定义分组

2、需求

我们有一份数据,数据中存在如下3个字段,订单编号,订单项订单项价格。 输出的数据,需求如下:

  1. 订单编号与订单编号之间需要正序输出。
  2. 输出每个订单价格最高的2个订单项

3、分析

  1. 订单编号与订单编号之间需要正序输出,那么订单编号必须要作为Key,因为只有Key才有排序操作。
  2. 输出每个订单价格最高的2个订单项: 这个输出是在reduce阶段,并且是每个订单,因此需要根据订单编号进行分组操作(前后2个key比较,相同则为一组),而分组也只有Key才有,因此就需要JavaBean(订单编号、订单项、订单项价格)来作为组合Key
  3. 订单编号与订单编号之间需要正序输出 && 输出每个订单价格最高的2个订单项: 可以看出在Key中的排序规则为:根据订单编号升序,然后根据订单项价格倒序排序, 并且是根据订单编号来分组。
  4. 我们知道默认MapReduce中默认的分区规则是,根据key的hascode来进行分区,而 分区 下是有多个 分组,每个分组调用一次reduce方法。 而我们上方的思路是,根据订单编号来进行分组,当我们Key是JavaBean组合Key时,相同的订单编号所在的JavaBean会被分在一个分组吗,这个不一定,因为JavaBean的hashcode不一定一致,因此就需要我们自定义分区(继承Partitioner类)。此处我们job.setNumReduceTasks设置为1个,因此不考虑这个分区的问题
  5. 一个分区下有多个分组,每个分组调用一次reduce方法。

4、准备数据

4.1 准备数据

20230713000010  item-101    1020230713000010  item-102    3020230713000015  item-151    1020230713000015  item-152    2020230713000010  item-103    2020230713000015  item-153    3020230713000012  item-121    5020230713000012  item-122    1020230713000012  item-123    30

4.2 每行数据格式

订单编号          订单项      订单项价格20230713000012  item-123    30

每行数据的分隔符为空格

4.3 期望输出结果

20230713000010  item-102    3020230713000010  item-103    2020230713000012  item-121    5020230713000012  item-123    3020230713000015  item-153    3020230713000015  item-152    20

5、编码实现

5.1 引入jar包

<dependencies>    <dependency>        <groupId>org.apache.hadoop</groupId>        <artifactId>hadoop-client</artifactId>        <version>3.3.4</version>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>1.18.22</version>    </dependency></dependencies> <build>    <plugins>        <plugin>            <groupId>org.apache.maven.plugins</groupId>            <artifactId>maven-jar-plugin</artifactId>            <version>3.2.2</version>            <configuration>                <archive>                    <manifest>                        <addClasspath>true</addClasspath>                        <classpathPrefix>lib/</classpathPrefix>                        <mainClass>com.huan.hadoop.mr.TopNDriver</mainClass>                    </manifest>                </archive>            </configuration>        </plugin>    </plugins></build>

5.2 编写实体类

package com.huan.hadoop.mr; import lombok.Getter;import lombok.Setter;import org.apache.hadoop.io.WritableComparable; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; /** * 订单数据 * * @author huan.fu * @date 2023/7/13 - 14:20 */@Getter@Setterpublic class OrderVo implements WritableComparable<OrderVo> {    /**     * 订单编号     */    private long orderId;    /**     * 订单项     */    private String itemId;    /**     * 订单项价格     */    private long price;     @Override    public int compareTo(OrderVo o) {        // 排序: 根据 订单编号 升序, 如果订单编号相同,则根据 订单项价格 倒序        int result = Long.compare(this.orderId, o.orderId);        if (result == 0) {            // 等于0说明 订单编号 相同,则需要根据 订单项价格 倒序            result = -Long.compare(this.price, o.price);        }        return result;    }     @Override    public void write(DataOutput out) throws IOException {        // 序列化        out.writeLong(orderId);        out.writeUTF(itemId);        out.writeLong(price);    }     @Override    public void readFields(DataInput in) throws IOException {        // 反序列化        this.orderId = in.readLong();        this.itemId = in.readUTF();        this.price = in.readLong();    }     @Override    public String toString() {        return this.getOrderId() + "\t" + this.getItemId() + "\t" + this.getPrice();    }} 
  1. 此处需要实现 WritableComparable接口
  2. 需要编写 排序序列化方法

5.3 编写分组方法

package com.huan.hadoop.mr; import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator; /** * 分组: 订单编号相同说明是同一组,否则是不同的组 * * @author huan.fu * @date 2023/7/13 - 14:30 */public class TopNGroupingComparator extends WritableComparator {     public TopNGroupingComparator() {        // 第二个参数为true: 表示可以通过反射创建实例        super(OrderVo.class, true);    }     @Override    public int compare(WritableComparable a, WritableComparable b) {        // 订单编号 相同说明是同一个对象,否则是不同的对象        return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1;    }} 
  1. 实现 WritableComparator接口,自定义分组规则。
  2. 分组是发生在reduce阶段,前后2个key比较,相同则为一组,一组调用一次reduce方法。

5.4 编写 map 方法

package com.huan.hadoop.mr; import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * map 操作: 输出的key为OrderVo, 输出的value为: price * * @author huan.fu * @date 2023/7/13 - 14:28 */public class TopNMapper extends Mapper<LongWritable, Text, OrderVo, LongWritable> {     private final OrderVo outKey = new OrderVo();    private final LongWritable outValue = new LongWritable();     @Override    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderVo, LongWritable>.Context context) throws IOException, InterruptedException {        // 获取一行数据 20230713000010  item-101    10        String row = value.toString();        // 根据 \t 进行分割        String[] cells = row.split("\\s+");        // 获取订单编号        long orderId = Long.parseLong(cells[0]);        // 获取订单项        String itemId = cells[1];        // 获取订单项价格        long price = Long.parseLong(cells[2]);         // 设置值        outKey.setOrderId(orderId);        outKey.setItemId(itemId);        outKey.setPrice(price);        outValue.set(price);         // 写出        context.write(outKey, outValue);    }} 
  1. map 操作: 输出的key为OrderVo, 输出的value为: price

5.5 编写reduce方法

package com.huan.hadoop.mr; import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * reduce操作: Key(OrderVo)相同的分为一组, 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现, * 即 订单编号 相同的认为一组 * * @author huan.fu * @date 2023/7/13 - 14:29 */public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> {     @Override    protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException {        int topN = 0;        // 随着每次遍历, key的 orderId 是相同的(因为是根据这个分组的),但是里面的itemId和price是不同的        for (LongWritable price : values) {            topN++;            if (topN > 2) {                break;            }            // 注意: 此处的key每次输出都不一样            context.write(key, NullWritable.get());        }    }} 
  1. reduce操作: Key(OrderVo)相同的分为一组, 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现,即 订单编号 相同的认为一组.
  2. 随着每次遍历, key的 orderId 是相同的(因为是根据这个分组的),但是里面的itemId和price是不同的

5.6 编写driver类

package com.huan.hadoop.mr; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; /** * @author huan.fu * @date 2023/7/13 - 14:29 */public class TopNDriver extends Configured implements Tool {     public static void main(String[] args) throws Exception {        // 构建配置对象        Configuration configuration = new Configuration();        // 使用 ToolRunner 提交程序        int status = ToolRunner.run(configuration, new TopNDriver(), args);        // 退出程序        System.exit(status);    }     @Override    public int run(String[] args) throws Exception {        // 构建Job对象实例 参数(配置对象,Job对象名称)        Job job = Job.getInstance(getConf(), "topN");        // 设置mr程序运行的主类        job.setJarByClass(TopNDriver.class);        // 设置mr程序运行的 mapper类型和reduce类型        job.setMapperClass(TopNMapper.class);        job.setReducerClass(TopNReducer.class);        // 指定mapper阶段输出的kv数据类型        job.setMapOutputKeyClass(OrderVo.class);        job.setMapOutputValueClass(LongWritable.class);        // 指定reduce阶段输出的kv数据类型,业务mr程序输出的最终类型        job.setOutputKeyClass(OrderVo.class);        job.setOutputValueClass(NullWritable.class);        // 配置本例子中的输入数据路径和输出数据路径,默认输入输出组件为: TextInputFormat和TextOutputFormat        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        // 先删除输出目录(方便本地测试)        FileSystem.get(this.getConf()).delete(new Path(args[1]), true);         // 设置分组        job.setGroupingComparatorClass(TopNGroupingComparator.class);         return job.waitForCompletion(true) ? 0 : 1;    }}
  1. 需要设置分组 job.setGroupingComparatorClass(TopNGroupingComparator.class);

5.7 运行结果

运行结果

6、完整代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/hadoop/mr-topn-group

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYrLyYpK' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片