Mosqkiller-API/apps/mosquito/mqtt_task.py

98 lines
3.1 KiB
Python

import json
import os
import django
from typing import Dict
from enum import Enum
import paho.mqtt.client as mqtt
pwd = os.path.dirname(os.path.realpath(__file__))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings")
django.setup()
from mosquito.models import MosqPost, DeviceInfo
# MQTT 配置
MQTT_BROKER = "8.217.112.255"
MQTT_PORT = 1883
DATA_TOPIC = 'solarmosquitolamp/devices/+/data'
STATE_TOPIC = 'solarmosquitolamp/devices/+/state'
class PowerType(Enum):
Load = 'Load'
StorageBattery = 'StorageBattery'
SolarPanels = 'SolarPanels'
ACPower = 'AC Power'
DCPower = 'DC Power'
# 当我们从MQTT服务器接收到消息时调用此函数
def on_message(client, userdata, message):
# 提取消息的主题
topic = message.topic
print(f"Message received on topic: {topic}")
# 从主题中解析 device_id
topic_parts = topic.split('/')
device_id = topic_parts[2]
# 将消息打印到控制台(或处理数据,例如存入数据库)
print(f"Device: {device_id} update & create post")
payload = message.payload.decode('utf-8')
# print(f"Message Payload: {payload}")
# update and create
post_data = json.loads(payload)
update_device_info(device_id, post_data=post_data)
create_mosq_post(device_id, post_data=post_data)
def update_device_info(device_id: str, post_data: Dict):
device = DeviceInfo.objects.filter(device_id=device_id).first()
if device:
device.load = to_string(post_data, power_type=PowerType.Load)
device.storage_battery = to_string(post_data, power_type=PowerType.StorageBattery)
device.solar_panels = to_string(post_data, power_type=PowerType.SolarPanels)
device.ac_power = to_string(post_data, power_type=PowerType.ACPower)
device.dc_power = to_string(post_data, power_type=PowerType.DCPower)
device.save()
def create_mosq_post(device_id: str, post_data: Dict):
load = to_string(post_data, power_type=PowerType.Load)
storage_battery = to_string(post_data, power_type=PowerType.StorageBattery)
solar_panels = to_string(post_data, power_type=PowerType.SolarPanels)
ac_power = to_string(post_data, power_type=PowerType.ACPower)
dc_power = to_string(post_data, power_type=PowerType.DCPower)
count = post_data['count']
MosqPost.objects.create(device_id=device_id,
count=count,
load=load,
storage_battery=storage_battery,
solar_panels=solar_panels,
ac_power=ac_power,
dc_power=dc_power)
def to_string(post_data: Dict, power_type: PowerType) -> str:
_type = power_type.value
load_v = post_data[_type]['voltage']
load_c = post_data[_type]['current']
load_p = post_data[_type]['power']
return ",".join([str(load_v), str(load_c), str(load_p)])
# 初始化MQTT客户端
client = mqtt.Client()
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# 订阅所有设备的主题
client.subscribe(DATA_TOPIC)
# client.subscribe(STATE_TOPIC)
# 开始MQTT客户端
client.loop_forever()