Mosqkiller-API/apps/mosquito/mqtt_task.py

164 lines
5.8 KiB
Python

import os
import sys
import json
import django
from typing import Dict
from pathlib import Path
from enum import Enum
from datetime import datetime
import paho.mqtt.client as mqtt
pwd = os.path.dirname(os.path.realpath(__file__))
project_path = Path(pwd).parent.parent
if project_path not in sys.path:
sys.path.append(str(project_path))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings")
django.setup()
from mosquito.models import MosqPost, DeviceInfo, DevicePostStatistic, Org
# MQTT 配置
MQTT_BROKER = "8.217.112.255"
MQTT_PORT = 1883
DATA_TOPIC = 'solarmosquitolamp/devices/+/data'
STATE_TOPIC = 'solarmosquitolamp/devices/+/state'
class PowerType(Enum):
Load = 'Load'
StorageBattery = 'StorageBattery'
SolarPanels = 'SolarPanels'
ACPower = 'AC Power'
DCPower = 'DC Power'
# 当我们从MQTT服务器接收到消息时调用此函数
def on_message(client, userdata, message):
# 提取消息的主题
topic = message.topic
print(f"Message received on topic: {topic}")
# 从主题中解析 device_id
topic_parts = topic.split('/')
device_id = topic_parts[2]
payload = message.payload.decode('utf-8')
# update and create
post_data = json.loads(payload)
try:
update_device_info(device_id, post_data=post_data)
create_mosq_post(device_id, post_data=post_data)
update_mosq_device_statistic(device_id, post_data=post_data)
print(f"Device: {device_id} update & create post success")
except KeyError as e:
print(f"Device: {device_id} update & create post failed")
print(f"key: {e} is not existed")
def update_device_info(device_id: str, post_data: Dict):
device = DeviceInfo.objects.filter(device_id=device_id).first()
latitude, longitude = post_data['positioning'].split(',')
latitude = float(latitude.replace('N ', ''))
longitude = float(longitude.replace('E ', ''))
count = post_data['count']
signal = post_data['signal']
led_status = post_data['RemoteLEDlightingFixtures']['LED']
energy = post_data['quantityofelectricity']
load = to_string(post_data, power_type=PowerType.Load)
storage_battery = to_string(post_data, power_type=PowerType.StorageBattery)
solar_panels = to_string(post_data, power_type=PowerType.SolarPanels)
ac_power = to_string(post_data, power_type=PowerType.ACPower)
dc_power = to_string(post_data, power_type=PowerType.DCPower)
if device is None:
DeviceInfo.objects.create(device_id=device_id,
longitude=longitude,
latitude=latitude,
count=count,
signal=signal,
led_status=led_status,
energy=energy,
load=load,
storage_battery=storage_battery,
solar_panels=solar_panels,
ac_power=ac_power,
dc_power=dc_power,
org_id=1,)
else:
device.load = load
device.storage_battery = storage_battery
device.solar_panels = solar_panels
device.ac_power = ac_power
device.dc_power = dc_power
device.count = count
device.signal = signal
device.led_status = led_status
device.energy = energy
device.save()
def create_mosq_post(device_id: str, post_data: Dict):
load = to_string(post_data, power_type=PowerType.Load)
storage_battery = to_string(post_data, power_type=PowerType.StorageBattery)
solar_panels = to_string(post_data, power_type=PowerType.SolarPanels)
ac_power = to_string(post_data, power_type=PowerType.ACPower)
dc_power = to_string(post_data, power_type=PowerType.DCPower)
count = post_data['count']
led_status = post_data['RemoteLEDlightingFixtures']['LED']
signal = post_data['signal']
energy = post_data['quantityofelectricity']
MosqPost.objects.create(device_id=device_id,
count=count,
led_status=led_status,
load=load,
signal=signal,
energy=energy,
storage_battery=storage_battery,
solar_panels=solar_panels,
ac_power=ac_power,
dc_power=dc_power)
def update_mosq_device_statistic(device_id: str, post_data: Dict):
datetime_format = "%Y-%m-%d %H:%M:%S"
cur_date = datetime.strptime(post_data['time'], datetime_format).date()
old_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date).first()
org = Org.objects.get(id=1)
incr = 0
cur_total = post_data['count']
if old_record:
device = DeviceInfo.objects.filter(device_id=device_id).first()
if device:
org = device.org
incr = cur_total - old_record.total
DevicePostStatistic.objects.create(device_id=device_id,
date=cur_date,
total=cur_total,
increment=incr,
org=org)
def to_string(post_data: Dict, power_type: PowerType) -> str:
_type = power_type.value
load_v = post_data[_type]['voltage']
load_c = post_data[_type]['current']
load_p = post_data[_type]['power']
return ",".join([str(load_v), str(load_c), str(load_p)])
# 初始化MQTT客户端
client = mqtt.Client()
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# 订阅所有设备的主题
client.subscribe(DATA_TOPIC)
# client.subscribe(STATE_TOPIC)
# 开始MQTT客户端
client.loop_forever()