Hadoop streaming详细介绍

发布时间 - 2026-01-11 00:11:32    点击率:

Hadoop streaming

Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口。所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写。

streamming是天然适用于文字处理的(text processing),当然,也仅适用纯文本的处理,对于需要对象和序列化的场景,hadoop streaming无能为力。它力图使我们能够快捷的通过各种脚本语言,快速的处理大量的文本文件。以下是steaming的一些特点:

  1. Map函数的输入是通过stand input一行一行的接收数据的。(不像Java API,通过InputFormat类做预处理,使得Map函数的输入是有Key和value的)
  2. Map函数的output则必须限定为key-value pair,key和value之间用\t分开。(MapReduce框架在处理intermediate的Map输出时,必须做sort和partition,即shuffle)
  3. Reduce函数的input是Map函数的output也是key-value pair,key和value之间用\t分开。

常用的Streaming编程语言:

  1. bash shell
  2. ruby
  3. python

Ruby

下面是一个Ruby编写的MapReduce程序的示例:

map

max_temperature_map.rb:

ruby 
#!/usr/bin/env ruby 
STDIN.each_line do |line| 
val = line 
year, temp, q = val[15,4], val[87,5], val[92,1] 
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) 
end 
  • 从标准输入读入一行data。
  • 处理数据之后,生成一个键值对,用\t分隔,输出到标准输出

reduce

max_temperature_reduce.rb:

ruby 
#!/usr/bin/env ruby 
last_key, max_val = nil, -1000000 
STDIN.each_line do |line| 
key, val = line.split("\t") 
if last_key && last_key != key 
puts "#{last_key}\t#{max_val}" 
last_key, max_val = key, val.to_i 
else 
last_key, max_val = key, [max_val, val.to_i].max 
end 
end 
puts "#{last_key}\t#{max_val}" if last_key 
  1. 从标准输入读入一行数据
  2. 数据是用\t分隔的键值对
  3. 数据是被MapReduce根据key排序之后顺序一行一行读入
  4. reduce函数对数据进行处理,并输出,输出仍是用\t分隔的键值对

运行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
  1. hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar指明了使用hadoop streaming
  2. hadoop-*-streaming.jar会将input里的文件,一行一行的输出到标准输出。
  3. 用-mapper指定Map函数。类似于通过管道将数据传给rb文件: data|ch02/src/main/ruby/max_temperature_map.rb
  4. -reducer指定Reduce函数。

Python

Map

#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)

Reduce

#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)

运行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.py\
-reducer ch02/src/main/ruby/max_temperature_reduce.py

Bash shell

Map

#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz

运行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
  1. 这里的-D mapred.reduce.tasks=0将reduce task观掉,因此也不需要设置-reducer
  2. 只使用Mapper,可以通过MapReduce帮助我们并行的完成一些平时只能串行的shell脚本
  3. 注意这里的-file,在集群模式下,需要并行运行时,需要-file把文件传输到其他节点

Combiner

在streaming模式下,仍然可以运行Combiner,两种方法:

  1. 通过Java编写一个combiner的函数,并使用-combiner option
  2. 以命令行的管道模式完成combiner的任务

这里具体解释第二种方法:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb

注意看-mapper这一行,通关管道的方式,把mapper的临时输出文件(intermediate file,Map完成后的临时文件)作为输入,送到sort进行排序,然后送到reduce脚本,来完成类似于combiner的工作。这时候的输出才真正的作为shuffle的输入,被分组并在网络上发送到Reduce

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


# Hadoop  # streaming详细介绍  # streaming  # streaming详解  # Hadoop组件简介  # Hadoop-3.1.2完全分布式环境搭建过程图文详解(Windows 10)  # Hadoop源码分析一架构关系简介  # 键值  # 类似于  # 编程语言  # 是一个  # 也不  # 是有  # 模式下  # 两种  # 可以用  # 适用于  # 并在  # 可以通过  # 希望能  # 不像  # 仍是  # 使我  # 种方法  # 发送到  # 谢谢大家  # 会将 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: 图片制作网站免费软件,有没有免费的网站或软件可以将图片批量转为A4大小的pdf?  Laravel怎么配置自定义表前缀_Laravel数据库迁移与Eloquent表名映射【步骤】  Laravel Blade组件怎么用_Laravel可复用视图组件的创建与使用  Android okhttputils现在进度显示实例代码  专业企业网站设计制作公司,如何理解商贸企业的统一配送和分销网络建设?  Swift中循环语句中的转移语句 break 和 continue  进行网站优化必须要坚持的四大原则  使用spring连接及操作mongodb3.0实例  百度输入法ai组件怎么删除 百度输入法ai组件移除工具  Laravel怎么进行数据库回滚_Laravel Migration数据库版本控制与回滚操作  如何利用DOS批处理实现定时关机操作详解  如何用5美元大硬盘VPS安全高效搭建个人网站?  Android Socket接口实现即时通讯实例代码  Laravel如何使用Laravel Vite编译前端_Laravel10以上版本前端静态资源管理【教程】  Laravel Livewire是什么_使用Laravel Livewire构建动态前端界面  Android滚轮选择时间控件使用详解  jQuery 常见小例汇总  Android 常见的图片加载框架详细介绍  微信小程序 canvas开发实例及注意事项  Laravel如何部署到服务器_线上部署Laravel项目的完整流程与步骤  网站制作大概要多少钱一个,做一个平台网站大概多少钱?  Laravel怎么处理异常_Laravel自定义异常处理与错误页面教程  javascript中数组(Array)对象和字符串(String)对象的常用方法总结  重庆市网站制作公司,重庆招聘网站哪个好?  深圳网站制作培训,深圳哪些招聘网站比较好?  Python数据仓库与ETL构建实战_Airflow调度流程详解  如何快速生成专业多端适配建站电话?  Laravel如何处理跨站请求伪造(CSRF)保护_Laravel表单安全机制与令牌校验  如何解决hover在ie6中的兼容性问题  免费的流程图制作网站有哪些,2025年教师初级职称申报网上流程?  如何快速生成高效建站系统源代码?  Python文本处理实践_日志清洗解析【指导】  车管所网站制作流程,交警当场开简易程序处罚决定书,在交警网站查询不到怎么办?  Laravel如何使用软删除(Soft Deletes)功能_Eloquent软删除与数据恢复方法  北京的网站制作公司有哪些,哪个视频网站最好?  PHP 实现电台节目表的智能时间匹配与今日/明日轮播逻辑  如何快速打造个性化非模板自助建站?  JavaScript如何实现音频处理_Web Audio API如何工作?  Laravel如何创建自定义中间件?(Middleware代码示例)  高防网站服务器:DDoS防御与BGP线路的AI智能防护方案  Laravel如何自定义错误页面(404, 500)?(代码示例)  Linux系统运维自动化项目教程_Ansible批量管理实战  图册素材网站设计制作软件,图册的导出方式有几种?  如何在阿里云完成域名注册与建站?  Android利用动画实现背景逐渐变暗  canvas 画布在主流浏览器中的尺寸限制详细介绍  Laravel怎么实现验证码(Captcha)功能  Laravel怎么实现模型属性转换Casting_Laravel自动将JSON字段转为数组【技巧】  如何自定义safari浏览器工具栏?个性化设置safari浏览器界面教程【技巧】  JavaScript数据类型有哪些_如何准确判断一个变量的类型