侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

基于python实现聊天室程序

Python  /  管理员 发布于 7年前   210

本文实例为大家分享了python实现简单聊天室的具体代码,供大家参考,具体内容如下

刚刚接触python编程,又从接触java开始一直对socket模块感兴趣,所以就做了一个聊天室的小程序。

该程序由客户端与服务器构成,使用UDP服务,服务器端绑定本地IP和端口,客户端由系统随机选择端口。

实现了群发、私发、点对点文件互传功能。

客户端自建了一个类继承了Cmd模块,使用自定义的命令command进行操作,调用相应的do_command方法。

使用json模块进行消息的封装序列化,在接收方进行解析。

客户端代码如下:

import socketimport threadingimport jsonimport osfrom cmd import Cmd  class Client(Cmd): """ 客户端 """ prompt = '>>>' intro = '[Welcome] 简易聊天室客户端(Cli版)\n' + '[Welcome] 输入help来获取帮助\n' buffersize = 1024  def __init__(self, host):  """  构造  """  super().__init__()  self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # self.__id = None  self.__nickname = None  self.__host = host  self.thread_recv = None  self.threadisalive = False  # 是否在接收文件  self.recvfile = False  # 是否在发送文件  self.sendfile = False  self.filesize = None  self.sendfilesize = None   # 接收文件包计数  self.filecount = None  # 接收文件名  self.filename = None  # 发送文件名  self.sendfilename = None   # 发送者  self.filefrom = None  # 接收者  self.fileto = None   # 接收文件流  self.file_recv = None  # 发送文件流  self.file_send = None   # 接收文件地址  self.filefrom_addr = None  # 发送文件地址  self.fileto_addr = None  def __receive_message_thread(self):  """  接受消息线程  """  while self.threadisalive:   # noinspection PyBroadException   try:    buffer, addr = self.__socket.recvfrom(1024)    '''    文件流由发送端直接发送,不经过服务器,故当发送端发来的消息时,将收到的数据存入文件    '''    if (addr != self.__host) & (addr == self.filefrom_addr) & self.recvfile:     self.file_recv.write(buffer)     self.filecount += 1     if self.filecount * 1024 >= self.filesize:      self.file_recv.close()      print(self.filename, 'is received.')      self.recvfile = False     continue     js = json.loads(buffer.decode())     # 若接收的数据为消息信息,则显示    if js['type'] == 'message':     print(js['message'])     # 若接收的数据为文件发送请求,则存储文件信息,并显示    elif js['type'] == 'filequest':     if self.recvfile:      self.__socket.sendto(json.dumps({       'type': 'fileres',       'fileres': 'no',       'nickname': self.__nickname,       'who': js['nickname'],       'errormessage': 'is transfroming files.',      }).encode(), self.__host)      continue     filename = js['filename']     who = js['nickname']     filesize = js['filesize']     self.recvfile = True     self.filesize = filesize     self.filename = filename     self.filecount = 0     self.filefrom = who     self.filefrom_addr = (js['send_ip'], js['send_port'])      print('[system]:', who, ' send a file(',       filename, ') to you. receive? ')     # 接受的数据为请求回复,若同意接收则存储服务器发来的接收方的地址,并开启发送线程    elif js['type'] == 'fileres':     if js['fileres'] == 'yes':      print(js['recv_ip'], js['recv_port'])      self.fileto_addr = (js['recv_ip'], js['recv_port'])      thread = threading.Thread(       target=self.__send_file_thread)      thread.start()     else:      print(js['nickname'], js['errormessage'])      self.sendfile = False    except Exception as e:    print(e)    print('[Client] 无法从服务器获取数据')  def __send_broadcast_message_thread(self, message):  """  发送广播消息线程  :param message: 消息内容  """  self.__socket.sendto(json.dumps({   'type': 'broadcast',   'nickname': self.__nickname,   'message': message,  }).encode(), self.__host)  def __send_file_thread(self):  """  发送文件线程  :param message: 消息内容  """  filecount = 0  print('[system]', 'sending the file...')  while filecount * 1024 <= self.sendfilesize:   self.__socket.sendto(    self.file_send.read(1024), self.fileto_addr)   filecount += 1  self.file_send.close()  self.sendfile = False  print('[system]', 'the file is sended.')  def __send_whisper_message_thread(self, who, message):  """  发送私发消息线程  :param message: 消息内容  """  self.__socket.sendto(json.dumps({   'type': 'sendto',   'who': who,   'nickname': self.__nickname,   'message': message  }).encode(), self.__host)  def send_exit(self):  self.__socket.sendto(json.dumps({   'type': 'offline',   'nickname': self.__nickname,  }).encode(), self.__host)   def start(self):  """  启动客户端  """  self.cmdloop()  def do_login(self, args):  """  登录聊天室  :param args: 参数  """  nickname = args.split(' ')[0]   # 将昵称发送给服务器,获取用户id  self.__socket.sendto(json.dumps({   'type': 'login',   'nickname': nickname,  }).encode(), self.__host)  # 尝试接受数据   buffer = self.__socket.recvfrom(1300)[0].decode()  obj = json.loads(buffer)  if obj['login'] == 'success':   self.__nickname = nickname   print('[Client] 成功登录到聊天室')   self.threadisalive = True   # 开启子线程用于接受数据   self.thread_recv = threading.Thread(    target=self.__receive_message_thread)   self.thread_recv.setDaemon(True)   self.thread_recv.start()  else:   print('[Client] 无法登录到聊天室', obj['errormessage'])  def do_send(self, args):  """  发送消息  :param args: 参数  """  if self.__nickname is None:   print('请先登录!login nickname')   return  message = args  # 开启子线程用于发送数据  thread = threading.Thread(   target=self.__send_broadcast_message_thread, args=(message, ))  thread.setDaemon(True)  thread.start()  def do_sendto(self, args):  """  发送私发消息  :param args: 参数  """  if self.__nickname is None:   print('请先登录!login nickname')   return  who = args.split(' ')[0]  message = args.split(' ')[1]  # # 显示自己发送的消息  # print('[' + str(self.__nickname) + '(' + str(self.__id) + ')' + ']', message)  # 开启子线程用于发送数据  thread = threading.Thread(   target=self.__send_whisper_message_thread, args=(who, message))  thread.setDaemon(True)  thread.start()  def do_catusers(self, arg):  if self.__nickname is None:   print('请先登录!login nickname')   return  catmessage = json.dumps({'type': 'catusers'})  self.__socket.sendto(catmessage.encode(), self.__host)  def do_catip(self, args):  if self.__nickname is None:   print('请先登录!login nickname')   return  who = args  catipmessage = json.dumps({'type': 'catip', 'who': who})  self.__socket.sendto(catipmessage.encode(), self.__host)  def do_help(self, arg):  """  帮助  :param arg: 参数  """  command = arg.split(' ')[0]  if command == '':   print('[Help] login nickname - 登录到聊天室,nickname是你选择的昵称')   print('[Help] send message - 发送消息,message是你输入的消息')   print('[Help] sendto who message - 私发消息,who是用户名,message是你输入的消息')   print('[Help] catusers - 查看所有用户')   print('[Help] catip who - 查看用户IP,who为用户名')   print('[Help] sendfile who filedir - 向某用户发送文件,who为用户名,filedir为文件路径')   print('[Help] getfile filename who yes/no - 接收文件,filename 为文件名,who为发送者,yes/no为是否接收')  elif command == 'login':   print('[Help] login nickname - 登录到聊天室,nickname是你选择的昵称')  elif command == 'send':   print('[Help] send message - 发送消息,message是你输入的消息')  elif command == 'sendto':   print('[Help] sendto who message - 发送私发消息,message是你输入的消息')  else:   print('[Help] 没有查询到你想要了解的指令')  def do_exit(self, arg): # 以do_*开头为命令  print("Exit")  self.send_exit()  try:   self.threadisalive = False   self.thread_recv.join()  except Exception as e:   print(e)  # self.__socket.close()  def do_sendfile(self, args):  who = args.split(' ')[0]  filepath = args.split(' ')[1]  filename = filepath.split('\\')[-1]  # 判断是否在发送文件  if self.sendfile:   print('you are sending files, please try later.')   return  if not os.path.exists(filepath):   print('the file is not exist.')   return  filesize = os.path.getsize(filepath)  # print(who, filename, filesize)   self.sendfile = True  self.fileto = who  self.sendfilename = filename  self.sendfilesize = filesize  self.file_send = open(filepath, 'rb')   self.__socket.sendto(json.dumps({   'type': 'filequest',   'nickname': self.__nickname,   'filename': self.sendfilename,   'filesize': self.sendfilesize,   'who': self.fileto,   'send_ip': '',   'send_port': '',  }).encode(), self.__host)   print('request send...')   # fileres = self.__socket.recvfrom(1024)[0].decode()  # js = json.loads(fileres)  def do_getfile(self, args):  filename = args.split(' ')[0]  who = args.split(' ')[1]  ch = args.split(' ')[2]  # print(self.filename is not None, filename, self.filename, who, self.filefrom)  if (self.filename is not None) & (filename == self.filename) & (who == self.filefrom):   if ch == 'yes':    self.file_recv = open(self.filename, 'wb')    self.__socket.sendto(json.dumps({     'type': 'fileres',     'fileres': 'yes',     'nickname': self.__nickname,     'who': who,     'recv_ip': '',     'recv_port': '',    }).encode(), self.__host)    print('you agree to reveive the file(', filename, ') from', who)    else:    self.__socket.sendto(json.dumps({     'type': 'fileres',     'fileres': 'no',     'nickname': self.__nickname,     'errormessage': 'deny the file.',     'who': who,     'recv_ip': '',     'recv_port': '',    }).encode(), self.__host)    print('you deny to reveive the file(', filename, ') from', who)    self.recvfile = False  else:   print('the name or sender of the file is wrong.')  c = Client(('127.0.0.1', 12346))c.start()

服务器端主要进行消息的分类转发处理,用户列表、地址列表的维护。

服务器端代码如下:

import socketimport json obj = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)obj.bind(('127.0.0.1', 12346)) conn_list = []user_list = [] while True: try:  receive_data, client_address = obj.recvfrom(1024)  js = json.loads(receive_data.decode())  # 登录消息  if js['type'] == 'login':    nickname = str(js['nickname'])   if nickname in user_list:    obj.sendto(json.dumps({'login': 'fail',          'errormessage': 'the nickname is exists'}).encode(),       client_address)   else:    # 向其他用户发送通知    for i in range(len(conn_list)):     obj.sendto(json.dumps(      {       'type': 'message',       'message': '[system]' + nickname + '已登录.'      }).encode(), conn_list[i])    user_list.append(nickname)    conn_list.append(client_address)    print(nickname, client_address, '登录成功!')    obj.sendto(json.dumps({'login': 'success',          'nickname': nickname}).encode(), client_address)   # 群发消息  elif js['type'] == 'broadcast':   message = js['message']   nickname = js['nickname']   for i in range(len(conn_list)):    obj.sendto(json.dumps(     {      'type': 'message',      'message': nickname + ':' + message     }).encode(), conn_list[i])   # 私发消息  elif js['type'] == 'sendto':   who = js['who']   nickname = js['nickname']   message = js['message']   # 检查用户是否存在   if who not in user_list:    obj.sendto(json.dumps(     {      'type': 'message',      'message': who + ' not exist or not online.please try later.'     }).encode(),     client_address)   else:    obj.sendto(json.dumps(     {      'type': 'message',      'message': nickname + ' whisper to you: ' + message     }).encode(),     conn_list[user_list.index(who)])   # 查看用户列表  elif js['type'] == 'catusers':   users = json.dumps(user_list)   obj.sendto(json.dumps(    {     'type': 'message',     'message': users,    }).encode(),    client_address)   # 查看用户IP  elif js['type'] == 'catip':   who = js['who']   if who not in user_list:    obj.sendto(json.dumps(     {      'type': 'message',      'message': who + ' not exist or not online.please try later.'     }).encode(),     client_address)   else:    addr = json.dumps(conn_list[user_list.index(who)])    obj.sendto(json.dumps(     {      'type': 'message',      'message': addr,     }).encode(),     client_address)   # 离线消息  elif js['type'] == 'offline':   user_list.remove(js['nickname'])   conn_list.remove(client_address)   obj.sendto(    (js['nickname'] + 'offline.').encode(),    client_address)   # 向其他用户发送通知   for i in range(len(conn_list)):    obj.sendto(json.dumps(     {      'type': 'message',      'message': '[system]' + nickname + '下线了.'     }).encode(), conn_list[i])   # 发送文件请求  elif js['type'] == 'filequest':   who = js['who']   if who not in user_list:    obj.sendto(json.dumps(     {      'type': 'message',      'message': who + ' not exist or not online.please try later.'     }).encode(),     client_address)   else:    js['send_ip'] = client_address[0]    js['send_port'] = client_address[1]    obj.sendto(json.dumps(js).encode(),       conn_list[user_list.index(who)])    print(js['nickname'], 'request to send file to', who)   # 发送文件请求回复  elif js['type'] == 'fileres':   who = js['who']   if js['fileres'] == 'yes':    js['recv_ip'] = client_address[0]    js['recv_port'] = client_address[1]    print(js['nickname'], 'agree to receive file from', js['who'])   else:    print(js['nickname'], 'deny to receive file from', js['who'])   obj.sendto(json.dumps(js).encode(),      conn_list[user_list.index(who)])  except Exception as e:  print(e)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    python实现点对点聊天程序
    下一条:
    Python中return self的用法详解
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客