import os import sys import json import django from typing import Dict from pathlib import Path from enum import Enum import paho.mqtt.client as mqtt pwd = os.path.dirname(os.path.realpath(__file__)) project_path = Path(pwd).parent.parent if project_path not in sys.path: sys.path.append(str(project_path)) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings") django.setup() from mosquito.models import MosqPost, DeviceInfo # MQTT 配置 MQTT_BROKER = "8.217.112.255" MQTT_PORT = 1883 DATA_TOPIC = 'solarmosquitolamp/devices/+/data' STATE_TOPIC = 'solarmosquitolamp/devices/+/state' class PowerType(Enum): Load = 'Load' StorageBattery = 'StorageBattery' SolarPanels = 'SolarPanels' ACPower = 'AC Power' DCPower = 'DC Power' # 当我们从MQTT服务器接收到消息时调用此函数 def on_message(client, userdata, message): # 提取消息的主题 topic = message.topic print(f"Message received on topic: {topic}") # 从主题中解析 device_id topic_parts = topic.split('/') device_id = topic_parts[2] payload = message.payload.decode('utf-8') # update and create post_data = json.loads(payload) try: update_device_info(device_id, post_data=post_data) create_mosq_post(device_id, post_data=post_data) print(f"Device: {device_id} update & create post success") except KeyError as e: print(f"Device: {device_id} update & create post failed") print(f"key: {e} is not existed") def update_device_info(device_id: str, post_data: Dict): device = DeviceInfo.objects.filter(device_id=device_id).first() if device: device.load = to_string(post_data, power_type=PowerType.Load) device.storage_battery = to_string(post_data, power_type=PowerType.StorageBattery) device.solar_panels = to_string(post_data, power_type=PowerType.SolarPanels) device.ac_power = to_string(post_data, power_type=PowerType.ACPower) device.dc_power = to_string(post_data, power_type=PowerType.DCPower) device.count = post_data['count'] device.signal = post_data['signal'] device.led_status = post_data['RemoteLEDlightingFixtures']['LED'] device.energy = post_data['quantityofelectricity'] device.save() def create_mosq_post(device_id: str, post_data: Dict): load = to_string(post_data, power_type=PowerType.Load) storage_battery = to_string(post_data, power_type=PowerType.StorageBattery) solar_panels = to_string(post_data, power_type=PowerType.SolarPanels) ac_power = to_string(post_data, power_type=PowerType.ACPower) dc_power = to_string(post_data, power_type=PowerType.DCPower) count = post_data['count'] led_status = post_data['RemoteLEDlightingFixtures']['LED'] signal = post_data['signal'] energy = post_data['quantityofelectricity'] MosqPost.objects.create(device_id=device_id, count=count, led_status=led_status, load=load, signal=signal, energy=energy, storage_battery=storage_battery, solar_panels=solar_panels, ac_power=ac_power, dc_power=dc_power) def to_string(post_data: Dict, power_type: PowerType) -> str: _type = power_type.value load_v = post_data[_type]['voltage'] load_c = post_data[_type]['current'] load_p = post_data[_type]['power'] return ",".join([str(load_v), str(load_c), str(load_p)]) # 初始化MQTT客户端 client = mqtt.Client() client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) # 订阅所有设备的主题 client.subscribe(DATA_TOPIC) # client.subscribe(STATE_TOPIC) # 开始MQTT客户端 client.loop_forever()