python3实现ftp服务功能(客户端)

发布时间 - 2026-01-11 00:21:12    点击率:

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

客户端 main代码:

#Author by Andy
#_*_ coding:utf-8 _*_
'''
This program is used to create a ftp client

'''
import socket,os,json,time,hashlib,sys
class Ftp_client(object):
 def __init__(self):
  self.client = socket.socket()
 def help(self):
  msg = '''useage:
  ls
  pwd
  cd dir(example: / .. . /var)
  put filename
  rm filename
  get filename
  mkdir directory name
  '''
  print(msg)
 def connect(self,addr,port):
  self.client.connect((addr,port))
 def auth(self):
  m = hashlib.md5()
  username = input("请输入用户名:").strip()

  m.update(input("请输入密码:").strip().encode())
  password = m.hexdigest()
  user_info = {
   'action':'auth',
   'username':username,
   'password':password}
  self.client.send(json.dumps(user_info).encode('utf-8'))
  server_response = self.client.recv(1024).decode()
  # print(server_response)
  return server_response
 def interactive(self):
  while True:
   msg = input(">>>:").strip()
   if not msg:
    print("不能发送空内容!")
    continue
   cmd = msg.split()[0]
   if hasattr(self,cmd):
    func = getattr(self,cmd)
    func(msg)
   else:
    self.help()
    continue
 def put(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   filename = cmd_split[1]
   if os.path.isfile(filename):
    filesize = os.stat(filename).st_size
    file_info = {
     "action":"put",
     "filename":filename,
     "size":filesize,
     "overriding":'True'
    }
    self.client.send( json.dumps(file_info).encode('utf-8') )
    #防止粘包,等待服务器确认。
    request_code = {
     '200': 'Ready to recceive data!',
     '210': 'Not ready to received data!'
    }
    server_response = self.client.recv(1024).decode()
    if server_response == '200':
     f = open(filename,"rb")
     send_size = 0
     start_time = time.time()
     for line in f:
      self.client.send(line)
      send_size += len(line)
      send_percentage = int((send_size / filesize) * 100)
      while True:
       progress = ('\r已上传%sMB(%s%%)' % (round(send_size / 102400, 2), send_percentage)).encode(
        'utf-8')
       os.write(1, progress)
       sys.stdout.flush()
       time.sleep(0.0001)
       break
     else:
      end_time = time.time()
      time_use = int(end_time - start_time)
      print("\nFile %s has been sent successfully!" % filename)
      print('\n平均下载速度%s MB/s' % (round(round(send_size / 102400, 2) / time_use, 2)))
      f.close()
    else:
     print("Sever isn't ready to receive data!")
     time.sleep(10)
     start_time = time.time()
     f = open(filename, "rb")
     send_size = 0
     for line in f:
       self.client.send(line)
       send_size += len(line)
       # print(send_size)
       while True:
        send_percentage = int((send_size / filesize) * 100)
        progress = ('\r已上传%sMB(%s%%)' % (round(send_size / 102400, 2), send_percentage)).encode(
         'utf-8')
        os.write(1, progress)
        sys.stdout.flush()
        # time.sleep(0.0001)
        break
     else:
      end_time = time.time()
      time_use = int(end_time - start_time)
      print("File %s has been sent successfully!" % filename)
      print('\n平均下载速度%s MB/s' % (round(round(send_size / 102400, 2) / time_use, 2)))
      f.close()
   else:
    print("File %s is not exit!" %filename)
  else:
   self.help()
 def ls(self,*args):
  cmd_split = args[0].split()
  # print(cmd_split)
  if len(cmd_split) > 1:
   path = cmd_split[1]
  elif len(cmd_split) == 1:
   path = '.'
  request_info = {
   'action': 'ls',
   'path': path
  }
  self.client.send(json.dumps(request_info).encode('utf-8'))
  sever_response = self.client.recv(1024).decode()
  print(sever_response)
 def pwd(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) == 1:
   request_info = {
    'action': 'pwd',
   }
   self.client.send(json.dumps(request_info).encode("utf-8"))
   server_response = self.client.recv(1024).decode()
   print(server_response)
  else:
   self.help()
 def get(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   filename = cmd_split[1]
   file_info = {
     "action": "get",
     "filename": filename,
     "overriding": 'True'
    }
   self.client.send(json.dumps(file_info).encode('utf-8'))
   server_response = self.client.recv(1024).decode() #服务器反馈文件是否存在
   self.client.send('0'.encode('utf-8'))
   if server_response == '0':
    file_size = int(self.client.recv(1024).decode())
    # print(file_size)
    self.client.send('0'.encode('utf-8')) #确认开始传输数据
    if os.path.isfile(filename):
     filename = filename+'.new'
    f = open(filename,'wb')
    receive_size = 0
    m = hashlib.md5()
    start_time = time.time()
    while receive_size < file_size:
     if file_size - receive_size > 1024: # 还需接收不止1次
      size = 1024
     else:
      size = file_size - receive_size
     data = self.client.recv(size)
     m.update(data)
     receive_size += len(data)
     data_percent=int((receive_size / file_size) * 100)
     f.write(data)
     progress = ('\r已下载%sMB(%s%%)' %(round(receive_size/102400,2),data_percent)).encode('utf-8')
     os.write(1,progress)
     sys.stdout.flush()
     time.sleep(0.0001)

    else:
     end_time = time.time()
     time_use = int(end_time - start_time)
     print('\n平均下载速度%s MB/s'%(round(round(receive_size/102400,2)/time_use,2)))
     Md5_server = self.client.recv(1024).decode()
     Md5_client = m.hexdigest()
     print('文件校验中,请稍候...')
     time.sleep(0.3)
     if Md5_server == Md5_client:
      print('文件正常。')
     else:
      print('文件与服务器MD5值不符,请确认!')
   else:
    print('File not found!')
    client.interactive()
  else:
   self.help()
 def rm(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   filename = cmd_split[1]
   request_info = {
    'action':'rm',
    'filename': filename,
    'prompt':'Y'
   }
   self.client.send(json.dumps(request_info).encode("utf-8"))
   server_response = self.client.recv(10240).decode()
   request_code = {
    '0':'confirm to deleted',
    '1':'cancel to deleted'
   }

   if server_response == '0':
    confirm = input("请确认是否真的删除该文件:")
    if confirm == 'Y' or confirm == 'y':
     self.client.send('0'.encode("utf-8"))
     print(self.client.recv(1024).decode())
    else:
     self.client.send('1'.encode("utf-8"))
     print(self.client.recv(1024).decode())
   else:
    print('File not found!')
    client.interactive()


  else:
   self.help()
 def cd(self,*args):
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   path = cmd_split[1]
  elif len(cmd_split) == 1:
   path = '.'
  request_info = {
   'action':'cd',
   'path':path
  }
  self.client.send(json.dumps(request_info).encode("utf-8"))
  server_response = self.client.recv(10240).decode()
  print(server_response)
 def mkdir(self,*args):
  request_code = {
   '0': 'Directory has been made!',
   '1': 'Directory is aleady exist!'
  }
  cmd_split = args[0].split()
  if len(cmd_split) > 1:
   dir_name = cmd_split[1]
   request_info = {
    'action':'mkdir',
    'dir_name': dir_name
   }
   self.client.send(json.dumps(request_info).encode("utf-8"))
   server_response = self.client.recv(1024).decode()
   if server_response == '0':
    print('Directory has been made!')
   else:
    print('Directory is aleady exist!')
  else:
   self.help()
 # def touch(self,*args):
def run():
 client = Ftp_client()
 # client.connect('10.1.2.3',6969)
 Addr = input("请输入服务器IP:").strip()
 Port = int(input("请输入端口号:").strip())
 client.connect(Addr,Port)
 while True:
  if client.auth() == '0':
   print("Welcome.....")
   client.interactive()
   break
  else:
   print("用户名或密码错误!")
   continue

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


# python3  # ftp  # 客户端  # python3实现ftp服务功能(服务端 For Linux)  # 详解Python下ftp上传文件linux服务器  # Python FTP两个文件夹间的同步实例代码  # Python基于FTP模块实现ftp文件上传操作示例  # python2.7实现FTP文件下载功能  # Python实现的FTP通信客户端与服务器端功能示例  # 1 行 Python 代码快速实现 FTP 服务器  # Python搭建FTP服务器的方法示例  # Python实现基于多线程、多用户的FTP服务器与客户端功能完整实例  # python实现FTP服务器服务的方法  # 详解Python3的TFTP文件传输  # 请输入  # 下载速度  # 请确认  # 上传  # 大家分享  # 请稍候  # 还需  # 该文件  # 具体内容  # 大家多多  # 是否存在  # 端口号  # Port  # strip  # input  # username  # password  # hexdigest  # update 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: Laravel Admin后台管理框架推荐_Laravel快速开发后台工具  Laravel软删除怎么实现_Laravel Eloquent SoftDeletes功能使用教程  东莞专业网站制作公司有哪些,东莞招聘网站哪个好?  如何在不使用负向后查找的情况下匹配特定条件前的换行符  谷歌浏览器下载文件时中断怎么办 Google Chrome下载管理修复  香港服务器网站搭建教程-电商部署、配置优化与安全稳定指南  javascript如何操作浏览器历史记录_怎样实现无刷新导航  Laravel如何处理异常和错误?(Handler示例)  悟空识字怎么关闭自动续费_悟空识字取消会员自动扣费步骤  logo在线制作免费网站在线制作好吗,DW网页制作时,如何在网页标题前加上logo?  Laravel怎么连接多个数据库_Laravel多数据库连接配置  Laravel如何升级到最新的版本_Laravel版本升级流程与兼容性处理  Laravel Eloquent性能优化技巧_Laravel N+1查询问题解决  Laravel怎么使用Session存储数据_Laravel会话管理与自定义驱动配置【详解】  Laravel如何实现全文搜索_Laravel Scout集成Algolia或Meilisearch教程  Laravel的契約(Contracts)是什么_深入理解Laravel Contracts与依赖倒置  Laravel Eloquent:优雅地将关联模型字段扁平化到主模型中  Laravel请求验证怎么写_Laravel Validator自定义表单验证规则教程  如何快速生成凡客建站的专业级图册?  JavaScript中的标签模板是什么_它如何扩展字符串功能  微信小程序 wx.uploadFile无法上传解决办法  php后缀怎么变mp4格式错误_修改扩展名提示格式不对怎么办【技巧】  Laravel storage目录权限问题_Laravel文件写入权限设置  米侠浏览器网页背景异常怎么办 米侠显示修复  jQuery 常见小例汇总  Laravel如何发送邮件_Laravel Mailables构建与发送邮件的简明教程  laravel怎么通过契约(Contracts)编程_laravel契约(Contracts)编程方法  php读取心率传感器数据怎么弄_php获取max30100的心率值【指南】  Microsoft Edge如何解决网页加载问题 Edge浏览器加载问题修复  如何在云主机快速搭建网站站点?  Laravel如何使用Livewire构建动态组件?(入门代码)  香港网站服务器数量如何影响SEO优化效果?  如何在阿里云香港服务器快速搭建网站?  EditPlus中的正则表达式 实战(1)  Laravel怎么实现验证码功能_Laravel集成验证码库防止机器人注册  详解vue.js组件化开发实践  php结合redis实现高并发下的抢购、秒杀功能的实例  Win11应用商店下载慢怎么办 Win11更改DNS提速下载【修复】  JavaScript模板引擎Template.js使用详解  Laravel如何处理跨站请求伪造(CSRF)保护_Laravel表单安全机制与令牌校验  Windows家庭版如何开启组策略(gpedit.msc)?(安装方法)  php在windows下怎么调试_phpwindows环境调试操作说明【操作】  怎样使用JSON进行数据交换_它有什么限制  linux写shell需要注意的问题(必看)  香港服务器选型指南:免备案配置与高效建站方案解析  如何在云指建站中生成FTP站点?  Laravel怎么实现搜索功能_Laravel使用Eloquent实现模糊查询与多条件搜索【实例】  高端云建站费用究竟需要多少预算?  图片制作网站免费软件,有没有免费的网站或软件可以将图片批量转为A4大小的pdf?  html文件怎么打开证书错误_https协议的html打开提示不安全【指南】