国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

解決esp8266 Mircopython OTA 遠程升級方案

Shonim / 3948人閱讀

摘要:刪除用戶完成刪除用戶失敗加載用戶。。。。燒錄用戶錯誤燒錄用戶完畢,正在重啟。。。注意升級代碼通過傳輸時代碼過長時需要進行分段發送,本代碼僅演示未分段代碼的升級過程。測試過程中發現發送中文會亂碼。

? ? ? ? 對于ESP8266的開發,在arduino平臺上的開發庫非常多,arduino上也可以找到esp8266?OTA的許多解決方案,最近突然對Mircopython好奇起來,想通過Mircopython寫一下esp8266運行的程序,然后過程中查找了許多的資料都沒有看到Mircopython平臺上如何OTA升級esp8266固件,于是自己胡亂做了一個用起來還不錯的替代方案,給愛好者們提供一個小小的參考思路,直接進入正題。

????????我是使用uPyCraft燒錄以及編輯代碼的,下載uPyCraft鏈接: https://pan.baidu.com/s/1HARl7J3fy0J11I_FHG8fCg 提取碼: 5u4k?

? ? ? ? 第一次插入開發版會自動提示燒錄Mircopython固件,選擇好設置點擊OK,或者通過菜單Tools→BurnFirmware進行燒錄。

????????

?用uPyCraft工具燒完成后,device文件夾為esp8266寄存器中的根目錄,device目錄下僅有一個boot.py文件,次文件為esp8266每次啟動時必定執行一次的文件,隨后上傳simple.py以及index0.py文件,simple.py文件為MQTT連接庫,可以在從目錄uPy_lib/umqtt中直接找到拖拽到device目錄。以下代碼我也會全部給出。

?boot.py:

# This file is executed on every boot (including wake-boot from deepsleep)#import esp#esp.osdebug(None)import gcimport os#import webrepl#webrepl.start()gc.collect()base = "0"try:  file = open("base.ini","r")  base = file.read()  file.close()except:  file = open("base.ini","w")  file.write("0")  file.close()  if base == "1":  print("加載用戶1。。。。")  try:    os.remove("index0.py")    print("刪除用戶0完成")  except:    print("刪除用戶0失敗")  import index1  index1.connectWiFi()else :  print("加載用戶0。。。。")  try:    os.remove("index1.py")    print("刪除用戶1完成")  except:    print("刪除用戶1失敗")  import index0  index0.connectWiFi()

simple.py:

import usocket as socketimport ustruct as struct#from ubinascii import hexlifyclass MQTTException(Exception):    passclass MQTTClient:  def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):    if port == 0:      port = 8883 if ssl else 1883    self.client_id = client_id    self.sock = None    self.addr = socket.getaddrinfo(server, port)[0][-1]    self.ssl = ssl    self.ssl_params = ssl_params    self.pid = 0    self.cb = None    self.user = user    self.pswd = password    self.keepalive = keepalive    self.lw_topic = None    self.lw_msg = None    self.lw_qos = 0    self.lw_retain = False  def _send_str(self, s):    self.sock.write(struct.pack("!H", len(s)))    self.sock.write(s)  def _recv_len(self):    n = 0    sh = 0    while 1:      b = self.sock.read(1)[0]      n |= (b & 0x7f) << sh      if not b & 0x80:        return n      sh += 7  def set_callback(self, f):    self.cb = f  def set_last_will(self, topic, msg, retain=False, qos=0):    assert 0 <= qos <= 2    assert topic    self.lw_topic = topic    self.lw_msg = msg    self.lw_qos = qos    self.lw_retain = retain  def connect(self, clean_session=True):    self.sock = socket.socket()    self.sock.connect(self.addr)    if self.ssl:      import ussl      self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)    msg = bytearray(b"/x10/0/0/x04MQTT/x04/x02/0/0")    msg[1] = 10 + 2 + len(self.client_id)    msg[9] = clean_session << 1    if self.user is not None:      msg[1] += 2 + len(self.user) + 2 + len(self.pswd)      msg[9] |= 0xC0    if self.keepalive:      assert self.keepalive < 65536      msg[10] |= self.keepalive >> 8      msg[11] |= self.keepalive & 0x00FF    if self.lw_topic:      msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)      msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3      msg[9] |= self.lw_retain << 5    self.sock.write(msg)    #print(hex(len(msg)), hexlify(msg, ":"))    self._send_str(self.client_id)    if self.lw_topic:      self._send_str(self.lw_topic)      self._send_str(self.lw_msg)    if self.user is not None:      self._send_str(self.user)      self._send_str(self.pswd)    resp = self.sock.read(4)    assert resp[0] == 0x20 and resp[1] == 0x02    if resp[3] != 0:      raise MQTTException(resp[3])    return resp[2] & 1  def disconnect(self):    self.sock.write(b"/xe0/0")    self.sock.close()  def ping(self):    self.sock.write(b"/xc0/0")  def publish(self, topic, msg, retain=False, qos=0):    pkt = bytearray(b"/x30/0/0/0")    pkt[0] |= qos << 1 | retain    sz = 2 + len(topic) + len(msg)    if qos > 0:      sz += 2    assert sz < 2097152    i = 1    while sz > 0x7f:      pkt[i] = (sz & 0x7f) | 0x80      sz >>= 7      i += 1    pkt[i] = sz    #print(hex(len(pkt)), hexlify(pkt, ":"))    self.sock.write(pkt, i + 1)    self._send_str(topic)    if qos > 0:      self.pid += 1      pid = self.pid      struct.pack_into("!H", pkt, 0, pid)      self.sock.write(pkt, 2)    self.sock.write(msg)    if qos == 1:      while 1:        op = self.wait_msg()        if op == 0x40:          sz = self.sock.read(1)          assert sz == b"/x02"          rcv_pid = self.sock.read(2)          rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]          if pid == rcv_pid:            return    elif qos == 2:      assert 0  def subscribe(self, topic, qos=0):    assert self.cb is not None, "Subscribe callback is not set"    pkt = bytearray(b"/x82/0/0/0")    self.pid += 1    struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)    #print(hex(len(pkt)), hexlify(pkt, ":"))    self.sock.write(pkt)    self._send_str(topic)    self.sock.write(qos.to_bytes(1, "little"))    while 1:      op = self.wait_msg()      if op == 0x90:        resp = self.sock.read(4)        #print(resp)        assert resp[1] == pkt[2] and resp[2] == pkt[3]        if resp[3] == 0x80:          raise MQTTException(resp[3])        return  # Wait for a single incoming MQTT message and process it.  # Subscribed messages are delivered to a callback previously  # set by .set_callback() method. Other (internal) MQTT  # messages processed internally.  def wait_msg(self):    res = self.sock.read(1)    self.sock.setblocking(True)    if res is None:      return None    if res == b"":      raise OSError(-1)    if res == b"/xd0":  # PINGRESP      sz = self.sock.read(1)[0]      assert sz == 0      return None    op = res[0]    if op & 0xf0 != 0x30:      return op    sz = self._recv_len()    topic_len = self.sock.read(2)    topic_len = (topic_len[0] << 8) | topic_len[1]    topic = self.sock.read(topic_len)    sz -= topic_len + 2    if op & 6:      pid = self.sock.read(2)      pid = pid[0] << 8 | pid[1]      sz -= 2    msg = self.sock.read(sz)    self.cb(topic, msg)    if op & 6 == 2:      pkt = bytearray(b"/x40/x02/0/0")      struct.pack_into("!H", pkt, 2, pid)      self.sock.write(pkt)    elif op & 6 == 4:      assert 0  # Checks whether a pending message from server is available.  # If not, returns immediately with None. Otherwise, does  # the same processing as wait_msg.  def check_msg(self):    self.sock.setblocking(False)    return self.wait_msg()

index0.py:

import osimport time import networkfrom simple import MQTTClientimport machinefrom machine import Pin#wifissid="XXXXX"#WiFi名稱passwd="XXXXXXXXX"#WiFi密碼#mqttclient_id = "mricopython_"mserver = "XXXXX.com"#mqtt服務器IPport=1883M_subscribe = b"py"#設備訂閱的主題M_subscribe_ota = M_subscribe + b"u"#設備訂閱的OTA主題M_topic = M_subscribe + b"r"#設備推送消息的主題client = Nonewlan = Noneled=Pin(2, Pin.OUT)led2=Pin(5, Pin.OUT, value=1)def connectWiFi():  #連接WiFi  wlan = network.WLAN(network.STA_IF)  wlan.active(True)  if not wlan.isconnected():    print("connecting to network...")    wlan.connect(ssid, passwd)      while not wlan.isconnected():      pass  print("network config:", wlan.ifconfig())  connect_mqtt()#處理OTA版本信息def ota(txt):  file = open("base.ini","r")  base = file.read()  file.close()  if base == "1":    try:      f = open("index0.py","a")      f.write(txt)      f.close()          file = open("base.ini","w")      file.write("0")      file.close()      print("燒錄用戶0完畢,正在重啟。。。")      machine.reset()     except:      print("燒錄用戶0錯誤")      pass  else:    try:          f = open("index1.py","a")      f.write(txt)      f.close()      file = open("base.ini","w")      file.write("1")      file.close()      print("燒錄用戶1完畢,正在重啟。。。")      machine.reset()    except:      print("燒錄用戶1錯誤")      pass  #MQTT信息處理def sub_cb(topic_b, msg_b):  #轉換格式后打印bytes.decode(),str.decode()  topic = bytes.decode(topic_b)  msg = bytes.decode(msg_b)  print(topic,":",msg)  # 判斷MQTT信息作出相應動作  #string.spilt(" ")分組  if topic == bytes.decode(M_subscribe):    if msg=="1":      led.off()    if msg=="0":      led.on()  elif topic == bytes.decode(M_subscribe_ota):    ota(msg)    #連接MQTTdef connect_mqtt():   try:    t = str(time.ticks_us())    print("client:")    c = MQTTClient(client_id + t + str(time.ticks_us()),mserver)    c.set_callback(sub_cb)    c.connect()    c.subscribe(M_subscribe)#訂閱信息主題    c.subscribe(M_subscribe_ota)#訂閱OTA升級主題    print("連接MQTT成功,已訂閱" + str(M_subscribe))    c.publish(M_topic,b"連接MQTT成功") #推送信息#主循環,程序在此做死循環    while True:      c.check_msg() #接收MQTT信息      #c.publish(M_topic,b"版本1") #推送信息  except:    print("重新連接MQTT")    connect_mqtt()if __name__ == "__main__":  connectWiFi()

思路是boot.py通過讀取系統生成的base.ini里的內容判斷需要加載用戶0 “index0.py”或者是用戶1? “index1.py”進行運行,升級代碼通過mqtt傳輸到esp8266上,esp8266接受到自定義的OTA升級主題發送的代碼后,通過判斷base.ini生成新的index0.py或者index1.py,并刪除舊的index.py。

? ? ? ? 注意:升級代碼通過mqtt傳輸時代碼過長時需要進行分段發送,本代碼僅演示未分段代碼的升級過程。測試過程中發現MQTT.fx發送中文會亂碼。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/121628.html

相關文章

  • 【基于Arduino IDE平臺開發ESP8266連接巴法云】

    摘要:教程傳送門基于平臺開發連接巴法云簡介實驗準備硬件軟件實驗步驟點燈實驗發送溫濕度指令升級總結關于巴法云專注于開源,智造,創新,分享。 Arduino教程傳送門????...

    不知名網友 評論0 收藏0
  • (超簡單)ESP8266深度睡眠模式下遠程采集溫濕度信息

    摘要:超簡單深度睡眠模式下遠程采集溫濕度信息項目背景相關技術深度睡眠模式溫濕度采集數據收發前后端實現后端前端項目背景自己用收納箱做了一個用于存放打印耗材的干燥箱,想用閑置的開發板和溫濕度傳感器做一個遠程溫濕度監測的小項目。 ...

    pkhope 評論0 收藏0
  • 01.ESP8266開發方式知多少

    摘要:開發方式是樂鑫為開發者提供的物聯應開發平臺,包括基礎平臺以及上層應開發示例,如智能燈智能開關等。指令開發方式作為芯片,指令開發也是必不可少的。開發方式即,意為運行在單片機上的。 ...

    sushi 評論0 收藏0
  • Arduino uno r3 使用 ESP8266 UART-WiFi 透傳模塊

    摘要:查詢附近名稱密碼連接路由器的查看路由器分配給模組的地址例如設置單連接設置透傳模式建立的服務器開始發送數據進入發送模式發送數據注意退出透傳,直接發送。取消發送新行五參考模塊指令入門指南透傳簡單使用模塊指令匯總一、所需硬件材料 1.ESP8266:01s某寶上3、5塊錢 2.杜邦線:某寶幾塊錢一組40P,這里只需要三根,用于連接 樹莓派與繼電器 ?? 3.燒錄器 ...

    amuqiao 評論0 收藏0
  • 用Docker容器進行IoT開發

    摘要:大多數的硬件公司很難提供能夠正常運行的。這個容器在共享。這將使很重要的數據能夠非常容易的從輸入到你的容器中。如果你想在容器內運行這個項目是我在時做的。希望愛特梅爾公司和德州儀器將來也使用。 隨著Iot新的硬件平臺和開發板的不斷更新, SDK交付越來越多的轉向零碎化以及按需組裝。大多數的硬件公司很難提供能夠正常運行的Software Development Kits (SDKs)。 Do...

    glumes 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<