lsf/sdk/RaspberryPi/main_sdk.py

346 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# /***********************************************************
# * author: xiaoY [物美智能 wumei-smart]
# * create: 2022-05-10
# * emailqimint@outlook.com
# * source:https://github.com/kerwincui/wumei-smart
# * board:raspberry 4b
# ***********************************************************/
import time
import requests
import json
from aes import AEScryptor,AES
import paho.mqtt.client as mqtt
import random
import threading #导入线程模块,用作定时器
#################################################################
# 需要连接好外部网络
#################################################################
# 作为python的AES的iv,应该为16位字节型数据
wumei_iv = b"wumei-smart-open"
#发布监测数据的最大次数
monitorCount =5
# 发布监测数据的间隔默认5秒。 使用esp8266单片机时服务器传来的间隔单位为毫秒本程序由于定时运行需要的是秒将转化为秒如需毫秒运行自行更改程序
monitorInterval =5
# NTP地址用于获取时间,可选的修改为自己部署项目的地址)
ntpServer = "http://120.24.218.158:8080/iot/tool/ntp?deviceSendTime="
# 连接成功标志位
g_rc=-1
#全局变量,管理定时监测
global t2
# 设备信息配置
deviceNum = "DW43CI6RM8GMG23H"
userId = "1"
productId = "4"
firmwareVersion = "1.0"
# 经度和纬度可选,如果产品使用设备定位,则必须传
latitude=0
longitude=0
# Mqtt配置
mqttHost = "120.24.218.158"
mqttPort = 1883
mqttUserName = "wumei-smart"
mqttPwd = "P261I5G3RY3MCIGG"
# 作为python的AES的key,应该为16位字节型数据
mqttSecret = b"K2IB784BM0O01GG6"
# 产品启用授权码则authCode不能为空
authCode=""
# 订阅的主题
prefix = "/" + productId + "/" + deviceNum
sOtaTopic = prefix + "/ota/get"
sNtpTopic = prefix + "/ntp/get"
sPropertyTopic = prefix + "/property/get"
sFunctionTopic = prefix + "/function/get"
sPropertyOnline = prefix + "/property-online/get"
sFunctionOnline = prefix + "/function-online/get"
sMonitorTopic = prefix + "/monitor/get"
# 发布的主题
pInfoTopic = prefix + "/info/post"
pNtpTopic = prefix + "/ntp/post"
pPropertyTopic = prefix + "/property/post"
pFunctionTopic = prefix + "/function/post"
pMonitorTopic = prefix + "/monitor/post"
pEventTopic = prefix + "/event/post"
# 初始化,连接 设备mqtt客户端Id格式为认证类型(E=加密、S=简单) & 设备编号 & 产品ID & 用户ID
clientId = "E&" + deviceNum + "&" + productId +"&" + userId
client=mqtt.Client(clientId)
#加密 (AES-CBC-128-pkcs5padding)
def encrypt(plain_data,wumei_key,wumei_iv):
aes=AEScryptor(wumei_key,AES.MODE_CBC,wumei_iv,paddingMode="PKCS5Padding",characterSet='utf-8')
rData=aes.encryptFromString(plain_data)
printMsg("密码(已加密)"+rData.toBase64())
return rData.toBase64()
#回调函数。当尝试与MQTT broker 建立连接时,触发该函数。
#client 是本次连接的客户端实例。
#userdata 是用户的信息,一般为空。但如果有需要,也可以通过 user_data_set 函数设置。
#flags 保存服务器响应标志的字典
#rc 是响应码。
# 0: 连接成功
# 1: 连接失败-不正确的协议版本
# 2: 连接失败-无效的客户端标识符
# 3: 连接失败-服务器不可用
# 4: 连接失败-错误的用户名或密码
# 5: 连接失败-未授权
# 6-255: 未定义.
#一般情况下我们只需要关注rc响应码是否为0就可以了。
def on_connect(client,userdata,flags,rc):
if rc==0:
printMsg("连接成功")
# 放在on_connect下可以保证重连重新订阅
# 订阅(OTA、NTP、属性、功能、实时监测)
client.subscribe(sOtaTopic, 1)
client.subscribe(sNtpTopic, 1)
client.subscribe(sPropertyTopic, 1)
client.subscribe(sFunctionTopic, 1)
client.subscribe(sPropertyOnline, 1)
client.subscribe(sFunctionOnline, 1)
client.subscribe(sMonitorTopic, 1)
printMsg("订阅主题:" + sOtaTopic)
printMsg("订阅主题:" + sNtpTopic)
printMsg("订阅主题:" + sPropertyTopic)
printMsg("订阅主题:" + sFunctionTopic)
printMsg("订阅主题:" + sPropertyOnline)
printMsg("订阅主题:" + sFunctionOnline)
printMsg("订阅主题:" + sMonitorTopic)
# 发布设备信息
publishInfo()
global g_rc
g_rc=0
else:
printMsg("连接失败rc="+str(rc))
printMsg("3秒后重连...")
time.sleep(3)
connectMqtt()
# 物模型-属性处理
def processProperty(payload):
data=json.loads(payload)
for item in data:
# 匹配云端定义的属性(不包含属性中的监测数据)
id = item["id"]
value=item["value"]
printMsg(str(id)+":"+str(value))
# 最后发布属性,服务端订阅存储(重要)
publishProperty(json.dumps(data))
# 物模型-功能处理
def processFunction(payload):
data=json.loads(payload)
for item in data:
# 匹配云端定义的功能
id = item["id"]
value=item["value"]
if(id=="switch"):
printMsg("开关 switch"+ str(value))
elif(id=="gear"):
printMsg("档位 gear"+ str(value))
elif(id=="light_color"):
printMsg("灯光颜色 light_color"+ str(value))
elif(id=="message"):
printMsg("屏显消息 message"+ str(value))
elif(id=="report_monitor"):
msg=randomPropertyData();
printMsg("订阅到上报监测数据指令,上报数据:")
printMsg(msg)
publishProperty(msg)
# 最后发布属性,服务端订阅存储(重要)
publishProperty(json.dumps(data))
# 回调函数在客户端订阅的主题上接收到消息时调用“message”变量是一个MQTT消息描述所有消息特征
def on_message(client,userdata,msg):
printMsg("接收数据:"+msg.topic+" "+str(msg.payload))
if(msg.topic==sOtaTopic):
printMsg("订阅到设备升级指令...")
jsonData=json.loads(msg.payload)
newVersion = jsonData["version"]
downloadUrl = jsonData["downloadUrl"]
printMsg("固件版本:"+newVersion)
printMsg("下载地址:"+downloadUrl)
elif(msg.topic==sNtpTopic):
printMsg("订阅到NTP时间...");
jsonData=json.loads(msg.payload)
deviceSendTime = jsonData["deviceSendTime"]
serverSendTime = jsonData["serverSendTime"]
serverRecvTime = jsonData["serverRecvTime"]
deviceRecvTime = round(time.time()*1000)
now = (serverSendTime + serverRecvTime + deviceRecvTime - deviceSendTime) / 2
printMsg("当前时间:"+str(round(now)))
elif(msg.topic==sPropertyTopic or msg.topic==sPropertyOnline):
printMsg("订阅到属性指令...")
processProperty(msg.payload)
elif(msg.topic==sFunctionTopic or msg.topic==sFunctionOnline):
printMsg("订阅到功能指令...")
processFunction(msg.payload)
elif(msg.topic==sMonitorTopic):
# python全局变量的使用
global t2
global monitorCount
global monitorInterval
printMsg("订阅到实时监测指令...")
jsonData=json.loads(msg.payload)
monitorCount = jsonData["count"]
monitorInterval = jsonData["interval"]/1000
t2.cancel()
t2=threading.Timer(monitorInterval,timing_publishMonitor)
t2.start()
# 1.发布设备信息
def publishInfo():
# rssi值 树莓派中暂时不处理wifi信号问题
# 信号强度信号极好4格[-55— 0]信号好3格[-70— -55]信号一般2格[-85— -70]信号差1格[-100— -85]
# status值 1-未激活2-禁用3-在线4-离线)
doc={"rssi":1,"firmwareVersion":firmwareVersion,"status":3,"userId":userId,"longitude":longitude,"latitude":latitude,"summary":{"name":"device","chip":"esp8266","author":"kerwincui","version":1.6,"create":"2022 - 06 - 06"}}
# client.publish('raspberry/topic',payload=i,qos=0,retain=False)
jsonData=json.dumps(doc)
printMsg("发布设备信息:"+pInfoTopic+" "+jsonData)
client.publish(pInfoTopic,jsonData)
# 2.发布时钟同步信,用于获取当前时间(可选)
def publishNtp():
data={"deviceSendTime":round(time.time()*1000)}
jsonData=json.dumps(data)
printMsg("发布NTP信息"+jsonData)
client.publish(pNtpTopic,jsonData)
# 3.发布属性
# msg 接收格式json
def publishProperty(msg):
printMsg("发布属性:" + msg)
client.publish(pPropertyTopic, msg)
# 4.发布功能
def publishFunction( msg):
printMsg("发布功能:" + msg)
client.publish(pFunctionTopic, msg)
# 5.发布事件
def publishEvent():
objTmeperature={"id":"height_temperature","value":40,"remark":"温度过高警告"}
objException={"id":"exception","value":"异常消息消息内容XXXXXXXX","remark":"设备发生错误"}
data=[objTmeperature,objException]
jsonData=json.dumps(data)
printMsg("发布事件:"+jsonData)
client.publish(pEventTopic,jsonData)
# 6.发布实时监测数据
def publishMonitor():
msg=randomPropertyData()
# 发布为实时监测数据,不会存储
printMsg("发布实时监测数据:"+msg)
client.publish(pMonitorTopic,msg)
# 随机生成监测值
def randomPropertyData():
# 匹配云端定义的监测数据,随机数代替监测结果
# random.randint(0,10) #生成数据包括0,10
# random.uniform(30,60)生成数据为浮点型
objTmeperature={"id":"temperature","value":str(round(random.uniform(10,30),2)),"remark":""}
objHumidity={"id":"humidity","value":str(round(random.uniform(30,60),2)),"remark":""}
objCo2={"id":"co2","value":str(random.randint(400,1000)),"remark":""}
objBrightness={"id":"brightness","value":str(random.randint(1000,10000)),"remark":""}
printMsg("随机生成监测数据值:")
data=[objTmeperature,objHumidity,objCo2,objBrightness]
print(json.dumps(data))
return json.dumps(data)
#连接mqtt
def connectMqtt():
printMsg("连接Mqtt服务器")
# 生成mqtt认证密码(设备加密认证密码加密格式为mqtt密码 & 过期时间 & 授权码,其中授权码为可选)
password = generationPwd()
encryptPassword=encrypt(password,mqttSecret,wumei_iv)
client.username_pw_set(mqttUserName,encryptPassword)
client.on_connect=on_connect
client.on_message=on_message
client.connect(mqttHost,mqttPort,10)
#打印提示信息
def printMsg(msg):
print("[{}] {}".format(time.strftime("%Y-%m-%d %H:%M:%S"),msg))
# 生成密码
def generationPwd():
try:
doc=json.loads(getTime())
except:
printMsg("Json解析失败")
exit()
deviceSendTime = doc["deviceSendTime"]
serverSendTime = doc["serverSendTime"]
serverRecvTime = doc["serverRecvTime"]
deviceRecvTime = round(time.time()*1000)
now = (serverSendTime + serverRecvTime + deviceRecvTime - deviceSendTime) / 2
expireTime = int(now + 1 * 60 * 60 * 1000)
# 密码加密格式为mqtt密码 & 过期时间 & 授权码(可选),如果产品启用了授权码就必须加上
password=""
if(authCode == ""):
password = mqttPwd + "&" + str(expireTime, 0)
else:
password = mqttPwd + "&" + str(expireTime, 0) + "&" + authCode
printMsg("密码(未加密):" + password)
return password
# HTTP获取时间
def getTime():
try:
r=requests.get(ntpServer+str(round(time.time()*1000)))
if(r.status_code>0):
if(r.status_code==200 or r.status_code==301):
printMsg("获取时间成功data:"+r.text)
return r.text
else:
printMsg("获取时间失败error:"+r.status_code)
except:
printMsg("连接Http失败")
# 定时上报属性
def timing_publishProperty():
printMsg("执行定时上报")
#发布事件
publishEvent()
#发布时钟同步
publishNtp()
# 发布属性(监测值)
msg=randomPropertyData()
publishProperty(msg)
t1=threading.Timer(60,timing_publishProperty)
t1.start()
# 定时上报监测数据
def timing_publishMonitor():
global monitorCount
monitorCount=monitorCount-1
printMsg("执行监测")
publishMonitor()
if(monitorCount>0):
t2=threading.Timer(monitorInterval,timing_publishMonitor)
t2.start()
if __name__ == '__main__':
connectMqtt()
client.loop_start()
printMsg("等待连接MQTT")
while(g_rc!=0):
print("-",end=" ")
time.sleep(1)
t1=threading.Timer(60,timing_publishProperty)
t1.setDaemon(True) #当主线程被关闭后,子线程也关闭
t1.start()
t2=threading.Timer(monitorInterval,timing_publishMonitor)
t2.setDaemon(True) #当主线程被关闭后,子线程也关闭
t2.start()
while True:
time.sleep(10) #定时上报、检测上报都是线程执行,主线程可以做自己的任务