侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

python3实现ftp服务功能(服务端 For Linux)

Python  /  管理员 发布于 7年前   238

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:

# Author by Andy# _*_ coding:utf-8 _*_import os, sys, json, hashlib, socketserver, timebase_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.append(base_dir)from conf import userdb_setclass Ftp_server(socketserver.BaseRequestHandler): user_home_dir = '' def auth(self, *args):  '''验证用户名及密码'''  cmd_dic = args[0]  username = cmd_dic["username"]  password = cmd_dic["password"]  f = open(userdb_set.userdb_set(), 'r')  user_info = json.load(f)  if username in user_info.keys():   if password == user_info[username]:    self.request.send('0'.encode())    os.chdir('/home/%s' % username)    self.user_home_dir = os.popen('pwd').read().strip()    data = "%s login successed" % username    self.loging(data)   else:    self.request.send('1'.encode())    data = "%s login failed" % username    self.loging(data)    f.close  else:   self.request.send('1'.encode())   data = "%s login failed" % username   self.loging(data)   f.close   ########################################## def get(self, *args):  '''给客户端传输文件'''  request_code = {   '0': 'file is ready to get',   '1': 'file not found!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic["filename"]  if os.path.isfile(filename):   self.request.send('0'.encode('utf-8')) # 确认文件存在   self.request.recv(1024)   self.request.send(str(os.stat(filename).st_size).encode('utf-8'))   self.request.recv(1024)   m = hashlib.md5()   f = open(filename, 'rb')   for line in f:    m.update(line)    self.request.send(line)   self.request.send(m.hexdigest().encode('utf-8'))   print('From server:Md5 value has been sended!')   f.close()  else:   self.request.send('1'.encode('utf-8'))   ########################################### def cd(self, *args):  '''执行cd命令'''  user_current_dir = os.popen('pwd').read().strip()  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  path = cmd_dic['path']  if path.startswith('/'):   if self.user_home_dir in path:    os.chdir(path)    new_dir = os.popen('pwd').read()    user_current_dir = new_dir    self.request.send('Change dir successfully!'.encode("utf-8"))    data = 'Change dir successfully!'    self.loging(data)   elif os.path.exists(path):    self.request.send('Permission Denied!'.encode("utf-8"))    data = 'Permission Denied!'    self.loging(data)   else:    self.request.send('Directory not found!'.encode("utf-8"))    data = 'Directory not found!'    self.loging(data)  elif os.path.exists(path):   os.chdir(path)   new_dir = os.popen('pwd').read().strip()   if self.user_home_dir in new_dir:    self.request.send('Change dir successfully!'.encode("utf-8"))    user_current_dir = new_dir    data = 'Change dir successfully!'    self.loging(data)   else:    os.chdir(user_current_dir)    self.request.send('Permission Denied!'.encode("utf-8"))    data = 'Permission Denied!'    self.loging(data)  else:   self.request.send('Directory not found!'.encode("utf-8"))   data = 'Directory not found!'   self.loging(data)   ########################################### def rm(self, *args):  request_code = {   '0': 'file exist,and Please confirm whether to rm',   '1': 'file not found!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic['filename']  if os.path.exists(filename):   self.request.send('0'.encode("utf-8")) # 确认文件存在   client_response = self.request.recv(1024).decode()   if client_response == '0':    os.popen('rm -rf %s' % filename)    self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))    self.loging('File %s has been deleted!' % filename)   else:    self.request.send(('File %s not deleted!' % filename).encode("utf-8"))    self.loging('File %s not deleted!' % filename)  else:   self.request.send('1'.encode("utf-8"))   ######################################## def pwd(self, *args):  '''执行pwd命令'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  server_response = os.popen('pwd').read().strip().encode("utf-8")  self.request.send(server_response) ############################################# def ls(self, *args):  '''执行ls命名'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  path = cmd_dic['path']  cmd = 'ls -l %s' % path  server_response = os.popen(cmd).read().encode("utf-8")  self.request.send(server_response) ############################################ def put(self, *args):  '''接收客户端文件'''  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  filename = cmd_dic["filename"]  filesize = cmd_dic["size"]  if os.path.isfile(filename):   f = open(filename + '.new', 'wb')  else:   f = open(filename, 'wb')  request_code = {   '200': 'Ready to recceive data!',   '210': 'Not ready to received data!'  }  self.request.send('200'.encode())  receive_size = 0  while True:   if receive_size < filesize:    data = self.request.recv(1024)    f.write(data)    receive_size += len(data)   else:    data = "File %s has been uploaded successfully!" % filename    self.loging(data)    print(data)    break    ################################################ def mkdir(self, *args):  request_code = {   '0': 'Directory has been made!',   '1': 'Directory is aleady exist!'  }  cmd_dic = args[0]  self.loging(json.dumps(cmd_dic))  dir_name = cmd_dic['dir_name']  if os.path.exists(dir_name):   self.request.send('1'.encode("utf-8"))  else:   os.popen('mkdir %s' % dir_name)   self.request.send('0'.encode("utf-8"))   #############################################  def loging(self, data):  '''日志记录'''  localtime = time.asctime(time.localtime(time.time()))  log_file = '/root/ftp/ftpserver/log/server.log'  with open(log_file, 'a', encoding='utf-8') as f:   f.write('%s-->' % localtime + data + '\n')   ############################################## def handle(self):  # print("您本次访问使用的IP为:%s" %self.client_address[0])  # localtime = time.asctime( time.localtime(time.time()))  # print(localtime)  while True:   try:    self.data = self.request.recv(1024).decode() #    # print(self.data)    cmd_dic = json.loads(self.data)    action = cmd_dic["action"]    # print("用户请求%s"%action)    if hasattr(self, action):     func = getattr(self, action)     func(cmd_dic)   except Exception as e:    self.loging(str(e))    breakdef run(): HOST, PORT = '0.0.0.0', 6969 print("The server is started,and listenning at port 6969") server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server) server.serve_forever()if __name__ == '__main__': run()

设置用户口令代码:

#Author by Andy#_*_ coding:utf-8 _*_import os,json,hashlib,sysbase_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))userdb_file = base_dir+"\data\\userdb"# print(userdb_file)def userdb_set(): if os.path.isfile(userdb_file):  # print(userdb_file)  return userdb_file else:  print('请先为您的服务器创建用户!')  user_data = {}  dict={}  Exit_flags = True  while Exit_flags:   username = input("Please input username:")   if username != 'exit':    password = input("Please input passwod:")    if password != 'exit':      user_data.update({username:password})      m = hashlib.md5()      # m.update('hello')      # print(m.hexdigest())      for i in user_data:       # print(i,user_data[i])       m.update(user_data[i].encode())       dict.update({i:m.hexdigest()})    else:     break   else:    break  f = open(userdb_file,'w')  json.dump(dict,f)  f.close() return userdb_file

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    python利用拉链法实现字典方法示例
    下一条:
    python3实现ftp服务功能(客户端)
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go语言中实现字符串可逆性压缩及解压缩功能(0个评论)
    • 使用go + gin + jwt + qrcode实现网站生成登录二维码在app中扫码登录功能(0个评论)
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客