侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

python使用MQTT给硬件传输图片的实现方法

Python  /  管理员 发布于 7年前   248

最近因需要用python写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程如下:


协议为:

需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400Byte。
消息(MQTT Payload) 格式:Web服务器-------->BASE:

反馈:BASE---------> Web服务器:

如果Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。

程序流程图

根据上面的协议,可以得到如下的流程图:


代码如下:

# encoding:utf-8from flask import Flask, jsonifyfrom flask_restful import Api, Resource, reqparsefrom PIL import Imagefrom io import BytesIOimport requestsimport os, logging, timeimport paho.mqtt.client as mqttimport structfrom flask_cors import *# 日志配置信息logging.basicConfig(  level=logging.INFO,  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',)class Mqtt(object):  def __init__(self, img_data, size):    self.MQTTHOST = '*******'    self.MQTTPORT = "******"    # 订阅和发送的主题    self.topic_from_base = 'mqttTestSub'    self.topic_to_base = 'mqttTestPub'    self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))    self.client = mqtt.Client(self.client_id)    # 完成链接后的回掉函数    self.client.on_connect = self.on_connect    # 图片大小    self.size = size    # 用于跳出死循环,结束任务    self.finished = None    # 包的编号    self.index = 0    # 将收到的图片数据按大小分成列表    self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]    # 记录发布后的数据,用于监控时延    self.pub_time = 0    self.header_to_base = 0xffffeeee    self.header_from_base = 0xeeeeffff    # 功能标识    self.function_begin = 0x01    self.function_doing = 0x02    self.function_finished = 0x03    # 包的完整和非完整状态    self.whole_package = 0x01    self.bad_package = 0x00    # 头信息的格式,小端模式    self.format_to_base = "<Lbhh"    self.format_from_base = "<Lbhb"    # 如果重发包时,用于检查是否重发第一个包    self.first = True    # 如果重发包时,用于检查是否重发最后一个包    self.last = False    self.begin_data = 'image.jpg;' + str(self.size)  # 链接mqtt服务器函数  def on_mqtt_connect(self):    self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)    self.client.loop_start()  # 链接完成后的回调函数  def on_connect(self, client, userdata, flags, rc):    logging.info("+++ Connected with result code {} +++".format(str(rc)))    self.client.subscribe(self.topic_from_base)  # 订阅函数  def subscribe(self):    self.client.subscribe(self.topic_from_base, 1)    # 消息到来处理函数    self.client.on_message = self.on_message  # 接收到信息后的回调函数  def on_message(self, client, userdata, msg):    # 如果接受第一个包则不需要重发第一个    self.first = False    # 将接受到的包进行解压,得到一个元组    base_tuple = struct.unpack(self.format_from_base, msg.payload)    logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))    logging.info("+++ package_number is {}, package_status_from_base is {} +++"           .format(base_tuple[2], base_tuple[3]))    # 检查接受到信息的头部是否正确    if base_tuple[0] == self.header_from_base:      logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))      # 是否完成传输,如果完成则退出      if base_tuple[1] == self.function_finished:        logging.info("+++ finish work +++")        self.finished = 1        self.client.disconnect()      else:        # 是否是最后一个包        if self.index == len(self.image_data_list) - 1:          self.publish('finished', self.function_finished)          self.last = True          logging.info("+++ finished_data_to_base is finished+++")        else:          # 如果接收到的包不是 0x03则进行传送数据          if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:logging.info("+++ package_number is {}, package_status_from_base is {} +++"       .format(base_tuple[2],base_tuple[3]))# 如果数据的反馈中,包的状态是1则继续发下一个包if base_tuple[3] == self.whole_package:  self.publish(self.index, self.function_doing)  logging.info("+++ data_to_base is finished+++")  self.index += 1# 如果数据的反馈中,包的状态是0则重发数据包elif base_tuple[3] == self.bad_package:  re_package_number = base_tuple[2]  self.publish(re_package_number-1, self.function_doing)  logging.info("+++ re_data_to_base is finished+++")else:  logging.info("+++ package_status_from_base is not 0 or 1 +++")  self.client.disconnect()          else:logging.info("+++ function_identifier is illegal +++")self.client.disconnect()    else:      logging.info("+++ header_from_base is illegal +++")      self.client.disconnect()  # 数据发送函数  def publish(self, index, fuc):    # 看是否是最后一个包    if index == 'finished':      length = 0      package_number = 0      data = b''    else:      length = len(self.image_data_list[index])      package_number = index      data = self.image_data_list[index]    # 打包数据头信息    buffer = struct.pack(      self.format_to_base,      self.header_to_base,      fuc,      package_number,      length    )    to_base_data = buffer + data    # mqtt发送    self.client.publish(      self.topic_to_base,      to_base_data    )    self.pub_time = time.time()  # 发送第一个包函数  def publish_begin(self):    buffer = struct.pack(      self.format_to_base,      self.header_to_base,      self.function_begin,      0,      len(self.begin_data.encode('utf-8')),    )    begin_data = buffer + self.begin_data.encode('utf-8')    self.client.publish(self.topic_to_base, begin_data)  # 控制函数  def control(self):    self.on_mqtt_connect()    self.publish_begin()    begin_time = time.time()    self.pub_time = time.time()    self.subscribe()    while True:      time.sleep(1)      # 超过5秒重传      date = time.time() - self.pub_time      if date > 5:        # 是否重传第一个包        if self.first == True:          self.publish_begin()          logging.info('+++ this is timeout first_data +++')        # 是否重传最后一个包        elif self.last == True:          self.publish('finished', self.function_finished)          logging.info('+++ this is timeout last_data +++')        else:          self.publish(self.index-1, self.function_doing)          logging.info('+++ this is timeout middle_data +++')      if self.finished == 1:        logging.info('+++ all works is finished+++')        break    print(str(time.time()-begin_time) + 'begin_time - end_time')app = Flask(__name__)api = Api(app)CORS(app, supports_credentials=True)# 接受参数parser = reqparse.RequestParser()parser.add_argument('url', help='mqttImage url', location='args', type=str)class GetImage(Resource):  # 得到参数并从图床下载到本地  def get(self):    args = parser.parse_args()    url = args.get('url')    response = requests.get(url)    # 获取图片    image = Image.open(BytesIO(response.content))    # 存取图片    add = os.path.join(os.path.abspath(''), 'image.jpg')    image.save(add)    # 得到图片大小    size = os.path.getsize(add)    f = open(add, 'rb')    imageData = f.read()    f.close()    # 进行mqtt传输    mqtt = Mqtt(imageData, size)    mqtt.control()    # 删除文件    os.remove(add)    logging.info('*** the result of control is {} ***'.format(1))    return jsonify({      "imageData": 1    })api.add_resource(GetImage, '/image')if __name__ == '__main__':  app.run(debug=True, host='0.0.0.0')

总结

以上所述是小编给大家介绍的python使用MQTT给硬件传输图片的实现方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!


  • 上一条:
    python使用pandas处理大数据节省内存技巧(推荐)
    下一条:
    Python实现的插入排序,冒泡排序,快速排序,选择排序算法示例
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客