hadoop上传文件功能实例代码

发布时间 - 2026-01-11 03:25:07    点击率:

hdfs上的文件是手动执行命令从本地linux上传至hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs提供的Java api实现文件上传至hdfs,或者直接从ftp上传至hdfs。 

然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行。像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR。其实,这个调度平台就是使用了quartz。当然,这个调度平台也提供其它的一些功能,比如web展示、日志查看等,所以也不是免费的。 

首先,给大家简单介绍一下hdfs。hdfs是以流式数据访问模式来存储超大文件,hdfs的构建思路是一次写入,多次读取,这样才是最高效的访问模式。hdfs是为高数据吞吐量应用优化的,所以会以提高时间延迟为代价。对于低延时的访问需求,我们可以使用hbase。 

然后,还要知道hdfs中块(block)的概念,默认为64MB。块是hdfs的数据读写的最小单位,通常每个map任务一次只处理一个block,像我们对集群性能评估就会使用到这个概念,比如目前有多少节点,每个节点的磁盘空间、cpu以及所要处理的数据量、网络带宽,通过这些信息来进行性能评估。我们可以使用Hadoop fsck / -files -blocks列出文件系统中各个文件由哪些块构成。 

然后,再就是要知道namenode和datanode,这个在之前的博文已经介绍过,下面看看cm环境中hdfs的管理者(namenode)和工作者(datanode),如下 

在yarn环境中是可以有多个nameNode的。此环境中没有SecondaryNameNode,当然也可以有。 

好了,关于hdfs的基本概念就讲到这儿了,下面来看看具体的代码。

一、java实现上传本地文件至hdfs

这里,可以直接使用hdfs提供的java api即可实现,代码如下:

package com.bjpowernode.hdfs.local;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * ClassName:UploadLocalFileToHdfs <br/>
 * Function: 本地文件上传至hdfs. <br/>
 * Date:  2016年3月28日 下午10:06:05 <br/>
 * @author qiyongkang
 * @version 
 * @since JDK 1.6
 * @see  
 */
public class UploadLocalFileToHdfs {
 public static void main(String[] args) {
  Configuration conf = new Configuration();
  String localDir = "/home/qiyongkang";
  String hdfsDir = "/qiyongkang";
  try{
   Path localPath = new Path(localDir);
   Path hdfsPath = new Path(hdfsDir);
   FileSystem hdfs = FileSystem.get(conf);
   hdfs.copyFromLocalFile(localPath, hdfsPath);
  }catch(Exception e){
   e.printStackTrace();
  }
 }
}

注意,这里hdfs上传目录如果不存在的话,hdfs会自动创建,比较智能。 

打完包后,上传至服务器,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,然后执行hadoop fs -ls /qiyongkang便可看到: 

二、java实现上传ftp上的文件至hdfs

首先,我们得准备一个ftp服务器,关于ftp服务器的搭建,大家可以查阅资料,笔者就不赘述了。 

其实,从ftp上拉取文件上传到hdfs上,这个过程大家不要想复杂了,我们讲本地文件上传到hdfs,其实就是采用流的方式。因此,我们可以直接读取ftp上的文件流,然后以流的方式写入到hdfs。 

下面,直接贴出代码:

package com.bjpowernode.hdfs.ftp;
import java.io.InputStream;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
 * ClassName:UploadFtpFileToHdfs <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason: TODO ADD REASON. <br/>
 * Date: 2016年3月28日 下午10:50:37 <br/>
 * 
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class UploadFtpFileToHdfs {
 public static void main(String[] args) {
  Configuration conf = new Configuration();
  loadFromFtpToHdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);
 }
 /**
  * 
  * loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
  *
  * @author qiyongkang
  * @param ip
  * @param username
  * @param password
  * @param filePath
  * @param outputPath
  * @param conf
  * @return
  * @since JDK 1.6
  */
 private static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath,
   String outputPath, Configuration conf) {
  FTPClient ftp = new FTPClient();
  InputStream inputStream = null;
  FSDataOutputStream outputStream = null;
  boolean flag = true;
  try {
   ftp.connect(ip);
   ftp.login(username, password);
   ftp.setFileType(FTP.BINARY_FILE_TYPE);
   ftp.setControlEncoding("UTF-8");
   int reply = ftp.getReplyCode();
   if (!FTPReply.isPositiveCompletion(reply)) {
    ftp.disconnect();
   }
   FTPFile[] files = ftp.listFiles(filePath);
   FileSystem hdfs = FileSystem.get(conf);
   for (FTPFile file : files) {
    if (!(file.getName().equals(".") || file.getName().equals(".."))) {
     inputStream = ftp.retrieveFileStream(filePath + file.getName());
     outputStream = hdfs.create(new Path(outputPath + file.getName()));
     IOUtils.copyBytes(inputStream, outputStream, conf, false);
     if (inputStream != null) {
      inputStream.close();
      ftp.completePendingCommand();
     }
    }
   }
   ftp.disconnect();
  } catch (Exception e) {
   flag = false;
   e.printStackTrace();
  }
  return flag;
 }
}

然后同样打包上传后执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,便可看到: 

总结

以上所述是小编给大家介绍的hadoop上传文件功能实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!


# hadoop上传文件  # java  # hadoop  # Hadoop Combiner使用方法详解  # hadoop动态增加和删除节点方法介绍  # Hadoop编程基于MR程序实现倒排索引示例  # hadoop重新格式化HDFS步骤解析  # ASP.NET实现Hadoop增删改查的示例代码  # 浅谈七种常见的Hadoop和Spark项目案例  # VMware虚拟机下hadoop1.x的安装方法  # Hadoop MultipleOutputs输出到多个文件中的实现方法  # 上传  # 传至  # 可以使用  # 不可能  # 给大家  # 便可  # 小编  # 下午  # 使用了  # 运行环境  # 好了  # 多个  # 才是  # 在此  # 就不  # 工作流  # 我们可以  # 有多少  # 不存在  # 太过 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: Laravel Docker环境搭建教程_Laravel Sail使用指南  详解免费开源的DotNet二维码操作组件ThoughtWorks.QRCode(.NET组件介绍之四)  如何在宝塔面板中创建新站点?  北京企业网站设计制作公司,北京铁路集团官方网站?  b2c电商网站制作流程,b2c水平综合的电商平台?  Android滚轮选择时间控件使用详解  Laravel如何创建自定义Facades?(详细步骤)  装修招标网站设计制作流程,装修招标流程?  Laravel中DTO是什么概念_在Laravel项目中使用数据传输对象(DTO)  如何快速启动建站代理加盟业务?  如何在建站之星网店版论坛获取技术支持?  ,交易猫的商品怎么发布到网站上去?  香港服务器网站测试全流程:性能评估、SEO加载与移动适配优化  Laravel如何升级到最新的版本_Laravel版本升级流程与兼容性处理  Windows10如何删除恢复分区_Win10 Diskpart命令强制删除分区  jQuery validate插件功能与用法详解  如何在阿里云购买域名并搭建网站?  如何将凡科建站内容保存为本地文件?  如何快速搭建个人网站并优化SEO?  如何在万网利用已有域名快速建站?  PHP的CURL方法curl_setopt()函数案例介绍(抓取网页,POST数据)  Firefox Developer Edition开发者版本入口  如何彻底卸载建站之星软件?  EditPlus中的正则表达式实战(5)  Laravel如何实现登录错误次数限制_Laravel自带LoginThrottles限流配置【方法】  Laravel如何生成API文档?(Swagger/OpenAPI教程)  Laravel项目如何进行性能优化_Laravel应用性能分析与优化技巧大全  Laravel如何与Inertia.js和Vue/React构建现代单页应用  edge浏览器无法安装扩展 edge浏览器插件安装失败【解决方法】  Win11关机界面怎么改_Win11自定义关机画面设置【工具】  Laravel怎么生成URL_Laravel路由命名与URL生成函数详解  Laravel如何使用Guzzle调用外部接口_Laravel发起HTTP请求与JSON数据解析【详解】  Laravel如何处理JSON字段的查询和更新_Laravel JSON列操作与查询技巧  黑客入侵网站服务器的常见手法有哪些?  Python文件操作最佳实践_稳定性说明【指导】  如何在Tomcat中配置并部署网站项目?  linux写shell需要注意的问题(必看)  JavaScript如何实现音频处理_Web Audio API如何工作?  如何用好域名打造高点击率的自主建站?  消息称 OpenAI 正研发的神秘硬件设备或为智能笔,富士康代工  Bootstrap整体框架之CSS12栅格系统  jquery插件bootstrapValidator表单验证详解  如何快速生成专业多端适配建站电话?  成都品牌网站制作公司,成都营业执照年报网上怎么办理?  利用 Google AI 进行 YouTube 视频 SEO 描述优化  手机怎么制作网站教程步骤,手机怎么做自己的网页链接?  如何为不同团队 ID 动态生成多个“认领值班”按钮  网站制作软件免费下载安装,有哪些免费下载的软件网站?  Laravel如何清理系统缓存命令_Laravel清除路由配置及视图缓存的方法【总结】  EditPlus中的正则表达式 实战(4)