import os import sys import json import django from typing import Dict from pathlib import Path from enum import Enum from datetime import datetime, timedelta import paho.mqtt.client as mqtt pwd = os.path.dirname(os.path.realpath(__file__)) project_path = Path(pwd).parent.parent if project_path not in sys.path: sys.path.append(str(project_path)) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings") django.setup() from mosquito.models import MosqPost, DeviceInfo, DevicePostStatistic, Org # MQTT 配置 MQTT_BROKER = "8.217.112.255" MQTT_PORT = 1883 DATA_TOPIC = 'solarmosquitolamp/devices/+/data' STATE_TOPIC = 'solarmosquitolamp/devices/+/state' class PowerType(Enum): Load = 'Load' StorageBattery = 'StorageBattery' SolarPanels = 'SolarPanels' ACPower = 'AC Power' DCPower = 'DC Power' # 当我们从MQTT服务器接收到消息时调用此函数 def on_message(client, userdata, message): # 提取消息的主题 topic = message.topic print(f"Message received on topic: {topic}") # 从主题中解析 device_id topic_parts = topic.split('/') device_id = topic_parts[2] payload = message.payload.decode('utf-8') # update and create post_data = json.loads(payload) try: update_device_info(device_id, post_data=post_data) create_mosq_post(device_id, post_data=post_data) update_mosq_device_statistic(device_id, post_data=post_data) print(f"Device: {device_id} update & create post success") except KeyError as e: print(f"Device: {device_id} update & create post failed") print(f"key: {e} is not existed") def update_device_info(device_id: str, post_data: Dict): device = DeviceInfo.objects.filter(device_id=device_id).first() latitude, longitude = post_data['positioning'].split(',') latitude = float(latitude.replace('N ', '')) longitude = float(longitude.replace('E ', '')) count = post_data['count'] signal = post_data['signal'] led_status = post_data['RemoteLEDlightingFixtures']['LED'] energy = post_data['quantityofelectricity'] load = to_string(post_data, power_type=PowerType.Load) storage_battery = to_string(post_data, power_type=PowerType.StorageBattery) solar_panels = to_string(post_data, power_type=PowerType.SolarPanels) ac_power = to_string(post_data, power_type=PowerType.ACPower) dc_power = to_string(post_data, power_type=PowerType.DCPower) if device is None: DeviceInfo.objects.create(device_id=device_id, longitude=longitude, latitude=latitude, count=count, signal=signal, led_status=led_status, energy=energy, load=load, storage_battery=storage_battery, solar_panels=solar_panels, ac_power=ac_power, dc_power=dc_power, org_id=1,) else: device.load = load device.storage_battery = storage_battery device.solar_panels = solar_panels device.ac_power = ac_power device.dc_power = dc_power device.count = count device.signal = signal device.led_status = led_status device.energy = energy device.save() def create_mosq_post(device_id: str, post_data: Dict): load = to_string(post_data, power_type=PowerType.Load) storage_battery = to_string(post_data, power_type=PowerType.StorageBattery) solar_panels = to_string(post_data, power_type=PowerType.SolarPanels) ac_power = to_string(post_data, power_type=PowerType.ACPower) dc_power = to_string(post_data, power_type=PowerType.DCPower) count = post_data['count'] led_status = post_data['RemoteLEDlightingFixtures']['LED'] signal = post_data['signal'] energy = post_data['quantityofelectricity'] MosqPost.objects.create(device_id=device_id, count=count, led_status=led_status, load=load, signal=signal, energy=energy, storage_battery=storage_battery, solar_panels=solar_panels, ac_power=ac_power, dc_power=dc_power) def update_mosq_device_statistic(device_id: str, post_data: Dict): device = DeviceInfo.objects.filter(device_id=device_id).first() datetime_format = "%Y-%m-%d %H:%M:%S" cur_date = datetime.strptime(post_data['time'], datetime_format).date() cur_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date).first() last_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date - timedelta(days=1)).first() if last_record and cur_record: incr = cur_record.total - last_record.total else: incr = 0 mqtt_total = post_data['count'] if cur_record: cur_record.total = mqtt_total cur_record.increment = incr cur_record.save() else: DevicePostStatistic.objects.create(device_id=device_id, date=cur_date, total=mqtt_total, increment=incr, org=device.org) def to_string(post_data: Dict, power_type: PowerType) -> str: _type = power_type.value load_v = post_data[_type]['voltage'] load_c = post_data[_type]['current'] load_p = post_data[_type]['power'] return ",".join([str(load_v), str(load_c), str(load_p)]) # 初始化MQTT客户端 client = mqtt.Client() client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) # 订阅所有设备的主题 client.subscribe(DATA_TOPIC) # client.subscribe(STATE_TOPIC) # 开始MQTT客户端 client.loop_forever()