Hadoop MapReduce多输出详细介绍

发布时间 - 2026-01-10 22:06:30    点击率:

Hadoop MapReduce多输出

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制或让每个reducer输出多个文件。MapReduce为此提供了MultipleOutputFormat类。

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的mapper)创建多个文件。采用name-r-nnnnn形式的文件名用于map输出,name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指名块号的整数(从0开始)。块号保证从不同块(mapper或者reducer)写的输出在相同名字情况下不会冲突。

1. 重定义输出文件名

我们可以对输出的文件名进行控制。考虑这样一个需求:按男女性别来区分度假订单数据。这需要运行一个作业,作业的输出是男女各一个文件,此文件包含男女性别的所有数据记录。

这个需求可以使用MultipleOutputs来实现:

package com.sjf.open.test;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.sjf.open.utils.ConfigUtil;
/**
 * Created by xiaosi on 16-11-7.
 */
public class VacationOrderBySex extends Configured implements Tool {
  public static void main(String[] args) throws Exception {
    int status = ToolRunner.run(new VacationOrderBySex(), args);
    System.exit(status);
  }
  public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> {
    public String fInputPath = "";
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      super.setup(context);
      fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      if(fInputPath.contains("vacation_hot_country_order")){
        String[] params = line.split("\t");
        String sex = params[2];
        if(StringUtils.isBlank(sex)){
          return;
        }
        context.write(new Text(sex.toLowerCase()), value);
      }
    }
  }
  public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {
    private MultipleOutputs<NullWritable, Text> multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      for (Text value : values) {
        multipleOutputs.write(NullWritable.get(), value, key.toString());
      }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
      multipleOutputs.close();
    }
  }
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("./run <input> <output>");
      System.exit(1);
    }
    String inputPath = args[0];
    String outputPath = args[1];
    int numReduceTasks = 16;
    Configuration conf = this.getConf();
    conf.setBoolean("mapred.output.compress", true);
    conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    Job job = Job.getInstance(conf);
    job.setJobName("vacation_order_by_jifeng.si");
    job.setJarByClass(VacationOrderBySex.class);
    job.setMapperClass(VacationOrderBySexMapper.class);
    job.setReducerClass(VacationOrderBySexReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(job, inputPath);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.setNumReduceTasks(numReduceTasks);
    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
  }
}

在生成输出的reduce中,在setup()方法中构造一个MultipleOutputs的实例并将它赋予一个实例变量。在reduce()方法中使用MultipleOutputs实例来写输出,而不是context。write()方法作用于键,值和名字。这里使用的是性别作为名字,因此最后产生的输出名称的形式为sex-r-nnnnn:

-rw-r--r--  3 wirelessdev wirelessdev     0 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
-rw-r--r--  3 wirelessdev wirelessdev   88574 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev   60965 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz

我们可以看到在输出文件中不仅有我们想要的输出文件类型,还有part-r-nnnnn形式的文件,但是文件内没有信息,这是程序默认的输出文件。所以我们在指定输出文件名称时(name-r-nnnnn),不要指定name为part,因为它已经被使用为默认值了。

2. 多目录输出

在MultipleOutputs的write()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录。例如,我们改动上面的需求:按男女性别来区分度假订单数据,不同性别数据位于不同子目录(例如:sex=f/part-r-00000)。

 public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {
    private MultipleOutputs<NullWritable, Text> multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      for (Text value : values) {
        String basePath = String.format("sex=%s/part", key.toString());
        multipleOutputs.write(NullWritable.get(), value, basePath);
      }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
      multipleOutputs.close();
    }
  }

后产生的输出名称的形式为sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:

-rw-r--r--  3 wirelessdev wirelessdev     0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz
drwxr-xr-x  - wirelessdev wirelessdev     0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f
drwxr-xr-x  - wirelessdev wirelessdev     0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m

3. 延迟输出

FileOutputFormat的子类会产生输出文件(part-r-nnnnn),即使文件是空的,也会产生。我们有时候不想要这些空的文件,我们可以使用LazyOutputFormat进行处理。它是一个封装输出格式,可以指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法即可:

Configuration conf = this.getConf();
Job job = Job.getInstance(conf);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

再次检查一下我们的输出文件(第一个例子):

sudo -uwirelessdev hadoop fs -ls tmp/data_group/order/vacation_hot_country_order_by_sex/
Found 3 items
-rw-r--r--  3 wirelessdev wirelessdev     0 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
-rw-r--r--  3 wirelessdev wirelessdev   88574 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev   60965 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


# Hadoop  # MapReduce多输出  # MapReduce多输出详解  # Hadoop MapReduce实现单词计数(Word Count)  # hadoop之MapReduce框架原理  # Hadoop之Mapreduce序列化  # Java大数据开发Hadoop MapReduce  # Java/Web调用Hadoop进行MapReduce示例代码  # 用PHP和Shell写Hadoop的MapReduce程序  # MySQL数据文件直接通过拷贝备份与恢复的操作方法  # 多个  # 子类  # 因为它  # 可以使用  # 别来  # 的是  # 是一个  # 这是  # 放在  # 也会  # 第一个  # 是由  # 它是  # 我们可以  # 希望能  # 这样一个  # 可以看到  # 要对  # 写到  # 将它 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: Laravel如何生成PDF或Excel文件_Laravel文档导出工具与使用教程  如何正确选择百度移动适配建站域名?  如何在阿里云完成域名注册与建站?  Laravel Telescope怎么调试_使用Laravel Telescope进行应用监控与调试  详解免费开源的.NET多类型文件解压缩组件SharpZipLib(.NET组件介绍之七)  千问怎样用提示词获取健康建议_千问健康类提示词注意事项【指南】  Laravel如何实现多级无限分类_Laravel递归模型关联与树状数据输出【方法】  如何在阿里云虚拟机上搭建网站?步骤解析与避坑指南  如何在云主机上快速搭建多站点网站?  Android Socket接口实现即时通讯实例代码  Laravel怎么进行数据库事务处理_Laravel DB Facade事务操作确保数据一致性  JS经典正则表达式笔试题汇总  如何制作一个表白网站视频,关于勇敢表白的小标题?  如何在阿里云域名上完成建站全流程?  北京网站制作的公司有哪些,北京白云观官方网站?  Laravel怎么使用artisan命令缓存配置和视图  瓜子二手车官方网站在线入口 瓜子二手车网页版官网通道入口  Laravel API路由如何设计_Laravel构建RESTful API的路由最佳实践  Laravel如何发送邮件和通知_Laravel邮件与通知系统发送步骤  Laravel Eloquent关联是什么_Laravel模型一对一与一对多关系精讲  Laravel怎么自定义错误页面_Laravel修改404和500页面模板  如何正确下载安装西数主机建站助手?  如何在宝塔面板创建新站点?  如何基于云服务器快速搭建网站及云盘系统?  Laravel如何发送邮件_Laravel Mailables构建与发送邮件的简明教程  教你用AI将一段旋律扩展成一首完整的曲子  如何使用 jQuery 正确渲染 Instagram 风格的标签列表  黑客入侵网站服务器的常见手法有哪些?  如何用IIS7快速搭建并优化网站站点?  Windows驱动无法加载错误解决方法_驱动签名验证失败处理步骤  常州企业网站制作公司,全国继续教育网怎么登录?  个人网站制作流程图片大全,个人网站如何注销?  英语简历制作免费网站推荐,如何将简历翻译成英文?  如何使用 Go 正则表达式精准提取括号内首个纯字母标识符(忽略数字与嵌套)  标题:Vue + Vuex 项目中正确使用 JWT 进行身份认证的实践指南  Laravel与Inertia.js怎么结合_使用Laravel和Inertia构建现代单页应用  Laravel如何生成和使用数据填充?(Seeder和Factory示例)  Claude怎样写结构化提示词_Claude结构化提示词写法【教程】  Laravel怎么配置自定义表前缀_Laravel数据库迁移与Eloquent表名映射【步骤】  手机网站制作平台,手机靓号代理商怎么制作属于自己的手机靓号网站?  PHP正则匹配日期和时间(时间戳转换)的实例代码  Laravel用户密码怎么加密_Laravel Hash门面使用教程  香港服务器网站推广:SEO优化与外贸独立站搭建策略  佐糖AI抠图怎样调整抠图精度_佐糖AI精度调整与放大细化操作【攻略】  音响网站制作视频教程,隆霸音响官方网站?  Edge浏览器如何截图和滚动截图_微软Edge网页捕获功能使用教程【技巧】  如何快速搭建自助建站会员专属系统?  免费视频制作网站,更新又快又好的免费电影网站?  Windows10如何删除恢复分区_Win10 Diskpart命令强制删除分区  如何为不同团队 ID 动态生成多个独立按钮