172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import django
|
|
from typing import Dict
|
|
from pathlib import Path
|
|
from enum import Enum
|
|
from datetime import datetime, timedelta
|
|
import paho.mqtt.client as mqtt
|
|
|
|
|
|
pwd = os.path.dirname(os.path.realpath(__file__))
|
|
project_path = Path(pwd).parent.parent
|
|
if project_path not in sys.path:
|
|
sys.path.append(str(project_path))
|
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings")
|
|
django.setup()
|
|
|
|
from mosquito.models import MosqPost, DeviceInfo, DevicePostStatistic, Org
|
|
|
|
# MQTT 配置
|
|
MQTT_BROKER = "8.217.112.255"
|
|
MQTT_PORT = 1883
|
|
DATA_TOPIC = 'solarmosquitolamp/devices/+/data'
|
|
STATE_TOPIC = 'solarmosquitolamp/devices/+/state'
|
|
|
|
|
|
class PowerType(Enum):
|
|
Load = 'Load'
|
|
StorageBattery = 'StorageBattery'
|
|
SolarPanels = 'SolarPanels'
|
|
ACPower = 'AC Power'
|
|
DCPower = 'DC Power'
|
|
|
|
|
|
# 当我们从MQTT服务器接收到消息时调用此函数
|
|
def on_message(client, userdata, message):
|
|
# 提取消息的主题
|
|
topic = message.topic
|
|
print(f"Message received on topic: {topic}")
|
|
|
|
# 从主题中解析 device_id
|
|
topic_parts = topic.split('/')
|
|
device_id = topic_parts[2]
|
|
|
|
payload = message.payload.decode('utf-8')
|
|
|
|
# update and create
|
|
post_data = json.loads(payload)
|
|
try:
|
|
update_device_info(device_id, post_data=post_data)
|
|
create_mosq_post(device_id, post_data=post_data)
|
|
update_mosq_device_statistic(device_id, post_data=post_data)
|
|
print(f"Device: {device_id} update & create post success")
|
|
except KeyError as e:
|
|
print(f"Device: {device_id} update & create post failed")
|
|
print(f"key: {e} is not existed")
|
|
|
|
|
|
def update_device_info(device_id: str, post_data: Dict):
|
|
device = DeviceInfo.objects.filter(device_id=device_id).first()
|
|
if post_data['positioning']:
|
|
latitude, longitude = post_data['positioning'].split(',')
|
|
latitude = float(latitude.replace('N ', ''))
|
|
longitude = float(longitude.replace('E ', ''))
|
|
else:
|
|
latitude, longitude = None, None
|
|
|
|
count = post_data['count']
|
|
signal = post_data['signal']
|
|
led_status = post_data['RemoteLEDlightingFixtures']['LED']
|
|
energy = post_data['quantityofelectricity']
|
|
load = to_string(post_data, power_type=PowerType.Load)
|
|
storage_battery = to_string(post_data, power_type=PowerType.StorageBattery)
|
|
solar_panels = to_string(post_data, power_type=PowerType.SolarPanels)
|
|
ac_power = to_string(post_data, power_type=PowerType.ACPower)
|
|
dc_power = to_string(post_data, power_type=PowerType.DCPower)
|
|
if device is None:
|
|
DeviceInfo.objects.create(device_id=device_id,
|
|
longitude=longitude,
|
|
latitude=latitude,
|
|
count=count,
|
|
signal=signal,
|
|
led_status=led_status,
|
|
energy=energy,
|
|
load=load,
|
|
storage_battery=storage_battery,
|
|
solar_panels=solar_panels,
|
|
ac_power=ac_power,
|
|
dc_power=dc_power,
|
|
org_id=1,)
|
|
else:
|
|
device.load = load
|
|
device.storage_battery = storage_battery
|
|
device.solar_panels = solar_panels
|
|
device.ac_power = ac_power
|
|
device.dc_power = dc_power
|
|
device.count = count
|
|
device.signal = signal
|
|
device.led_status = led_status
|
|
device.energy = energy
|
|
device.save()
|
|
|
|
|
|
def create_mosq_post(device_id: str, post_data: Dict):
|
|
load = to_string(post_data, power_type=PowerType.Load)
|
|
storage_battery = to_string(post_data, power_type=PowerType.StorageBattery)
|
|
solar_panels = to_string(post_data, power_type=PowerType.SolarPanels)
|
|
ac_power = to_string(post_data, power_type=PowerType.ACPower)
|
|
dc_power = to_string(post_data, power_type=PowerType.DCPower)
|
|
count = post_data['count']
|
|
led_status = post_data['RemoteLEDlightingFixtures']['LED']
|
|
signal = post_data['signal']
|
|
energy = post_data['quantityofelectricity']
|
|
MosqPost.objects.create(device_id=device_id,
|
|
count=count,
|
|
led_status=led_status,
|
|
load=load,
|
|
signal=signal,
|
|
energy=energy,
|
|
storage_battery=storage_battery,
|
|
solar_panels=solar_panels,
|
|
ac_power=ac_power,
|
|
dc_power=dc_power)
|
|
|
|
|
|
def update_mosq_device_statistic(device_id: str, post_data: Dict):
|
|
device = DeviceInfo.objects.filter(device_id=device_id).first()
|
|
datetime_format = "%Y-%m-%d %H:%M:%S"
|
|
cur_date = datetime.strptime(post_data['time'], datetime_format).date()
|
|
cur_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date).first()
|
|
last_record = DevicePostStatistic.objects.filter(device_id=device_id, date=cur_date - timedelta(days=1)).first()
|
|
if last_record and cur_record:
|
|
incr = cur_record.total - last_record.total
|
|
else:
|
|
incr = 0
|
|
|
|
mqtt_total = post_data['count']
|
|
if cur_record:
|
|
cur_record.total = mqtt_total
|
|
cur_record.increment = incr
|
|
cur_record.save()
|
|
else:
|
|
DevicePostStatistic.objects.create(device_id=device_id,
|
|
date=cur_date,
|
|
total=mqtt_total,
|
|
increment=incr,
|
|
org=device.org)
|
|
|
|
|
|
def to_string(post_data: Dict, power_type: PowerType) -> str:
|
|
_type = power_type.value
|
|
load_v = post_data[_type]['voltage']
|
|
load_c = post_data[_type]['current']
|
|
load_p = post_data[_type]['power']
|
|
return ",".join([str(load_v), str(load_c), str(load_p)])
|
|
|
|
|
|
# 初始化MQTT客户端
|
|
client = mqtt.Client()
|
|
client.on_message = on_message
|
|
|
|
client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
|
|
|
# 订阅所有设备的主题
|
|
client.subscribe(DATA_TOPIC)
|
|
# client.subscribe(STATE_TOPIC)
|
|
|
|
# 开始MQTT客户端
|
|
client.loop_forever()
|