import os import sys import json import pytz import django from typing import Dict from pathlib import Path from enum import Enum from datetime import datetime, timedelta import paho.mqtt.client as mqtt pwd = os.path.dirname(os.path.realpath(__file__)) project_path = Path(pwd).parent.parent if project_path not in sys.path: sys.path.append(str(project_path)) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings") django.setup() from mosquito.models import MosqPost, DeviceInfo, DevicePostStatistic, Org # MQTT 配置 MQTT_BROKER = "8.217.112.255" MQTT_PORT = 1883 DATA_TOPIC = 'solarmosquitolamp/devices/+/data' STATE_TOPIC = 'solarmosquitolamp/devices/+/state' class PowerType(Enum): Load = 'Load' StorageBattery = 'StorageBattery' SolarPanels = 'SolarPanels' ACPower = 'AC Power' DCPower = 'DC Power' # 当我们从MQTT服务器接收到消息时调用此函数 def on_message(client, userdata, message): # 提取消息的主题 topic = message.topic print(f"Message received on topic: {topic}") # 从主题中解析 device_id topic_parts = topic.split('/') device_id = topic_parts[2] topic_postfix = topic_parts[3] payload = message.payload.decode('utf-8') post_data = json.loads(payload) if topic_postfix == 'state': device = DeviceInfo.objects.filter(device_id=device_id).first() if device: ts = post_data['Time Stamp'] tz = pytz.timezone("Asia/Shanghai") device.last_connect = tz.localize(datetime.fromtimestamp(ts)) device.save() print(f"Device: {device_id} state post success") return try: update_device_info(device_id, post_data=post_data) create_mosq_post(device_id, post_data=post_data) update_mosq_device_statistic(device_id, post_data=post_data) print(f"Device: {device_id} update & create post success") except KeyError as e: print(f"Device: {device_id} update & create post failed") print(f"key: {e} is not existed") def update_device_info(device_id: str, post_data: Dict): device = DeviceInfo.objects.filter(device_id=device_id).first() if post_data['positioning']: latitude, longitude = post_data['positioning'].split(',') latitude = float(latitude.replace('N ', '')) longitude = float(longitude.replace('E ', '')) else: latitude, longitude = None, None count = post_data['count'] signal = post_data['signal'] led_status = post_data['RemoteLEDlightingFixtures']['LED'] energy = post_data['quantityofelectricity'] load = to_string(post_data, power_type=PowerType.Load) storage_battery = to_string(post_data, power_type=PowerType.StorageBattery) solar_panels = to_string(post_data, power_type=PowerType.SolarPanels) ac_power = to_string(post_data, power_type=PowerType.ACPower) dc_power = to_string(post_data, power_type=PowerType.DCPower) if device is None: DeviceInfo.objects.create(device_id=device_id, longitude=longitude, latitude=latitude, count=count, signal=signal, led_status=led_status, energy=energy, load=load, storage_battery=storage_battery, solar_panels=solar_panels, ac_power=ac_power, dc_power=dc_power, org_id=1,) else: device.load = load device.storage_battery = storage_battery device.solar_panels = solar_panels device.ac_power = ac_power device.dc_power = dc_power device.count = count device.signal = signal device.led_status = led_status device.energy = energy device.save() def create_mosq_post(device_id: str, post_data: Dict): load = to_string(post_data, power_type=PowerType.Load) storage_battery = to_string(post_data, power_type=PowerType.StorageBattery) solar_panels = to_string(post_data, power_type=PowerType.SolarPanels) ac_power = to_string(post_data, power_type=PowerType.ACPower) dc_power = to_string(post_data, power_type=PowerType.DCPower) count = post_data['count'] led_status = post_data['RemoteLEDlightingFixtures']['LED'] signal = post_data['signal'] energy = post_data['quantityofelectricity'] MosqPost.objects.create(device_id=device_id, count=count, led_status=led_status, load=load, signal=signal, energy=energy, storage_battery=storage_battery, solar_panels=solar_panels, ac_power=ac_power, dc_power=dc_power) def update_mosq_device_statistic(device_id: str, post_data: Dict): device = DeviceInfo.objects.filter(device_id=device_id).first() datetime_format = "%Y-%m-%d %H:%M:%S" cur_date = datetime.strptime(post_data['time'], datetime_format).date() cur_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date).first() last_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date - timedelta(days=1)).first() if last_record and cur_record: incr = cur_record.total - last_record.total else: incr = 0 mqtt_total = post_data['count'] if cur_record: cur_record.total = mqtt_total cur_record.increment = incr cur_record.save() else: DevicePostStatistic.objects.create(device_id=device_id, date=cur_date, total=mqtt_total, increment=incr, org=device.org) def to_string(post_data: Dict, power_type: PowerType) -> str: _type = power_type.value load_v = post_data[_type]['voltage'] load_c = post_data[_type]['current'] load_p = post_data[_type]['power'] return ",".join([str(load_v), str(load_c), str(load_p)]) # 初始化MQTT客户端 client = mqtt.Client() client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) # 订阅所有设备的主题 client.subscribe(DATA_TOPIC) client.subscribe(STATE_TOPIC) # 开始MQTT客户端 client.loop_forever()