import pytz from enum import Enum from typing import Optional from django.utils.timezone import datetime from rest_framework import serializers from counter.models import DeviceCount, DeviceInfo from mosquito.models import DeviceInfo as MosquitoDeviceInfo from counter.api.serializers import get_device_latest_by_cache, get_vol class LedHealthStatus(Enum): Good = '良好' General = '一般' Bad = '较差' Unknown = '未知' def __repr__(self): return self.value def __str__(self): return self.value class LedStatusMobileSerializer(serializers.ModelSerializer): device_name = serializers.SerializerMethodField() working_hours = serializers.SerializerMethodField() energy = serializers.SerializerMethodField() class Meta: model = DeviceInfo fields = [ 'device_id', 'device_name', 'working_hours', 'energy', ] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.cur_device = None def get_device_name(self, obj) -> Optional[str]: device = MosquitoDeviceInfo.objects.filter(device_id=obj.device_id).first() if device: self.cur_device = device return device.device_name return None def get_working_hours(self, obj): if self.cur_device: diff = datetime.now(tz=pytz.timezone('UTC')) - self.cur_device.launch_time hours = diff.days * 24 + int(diff.seconds / 3600) return hours return None def get_energy(self, obj): working_hours = self.get_working_hours(obj) if working_hours: used = working_hours / self.cur_device.led_lifetime return int((1 - used) * 100) return None class DeviceInfoMobileSerializer(serializers.ModelSerializer): device_name = serializers.SerializerMethodField() chip_type = serializers.SerializerMethodField() remote_control_enabled = serializers.SerializerMethodField() status = serializers.SerializerMethodField() signal = serializers.SerializerMethodField() count = serializers.SerializerMethodField() energy = serializers.SerializerMethodField() coordinate = serializers.SerializerMethodField() location = serializers.SerializerMethodField() location_group = serializers.SerializerMethodField() image_url = serializers.SerializerMethodField() icon_url = serializers.SerializerMethodField() led_status = serializers.SerializerMethodField() led_health = serializers.SerializerMethodField() class Meta: model = DeviceInfo fields = [ 'device_id', 'device_name', 'chip_type', 'remote_control_enabled', 'status', 'count', 'signal', 'energy', 'image_url', 'icon_url', 'led_status', 'led_health', 'coordinate', 'location', 'location_group', ] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.latest = None self.counter_device = None self.mosq_device_info = None def get_device_name(self, obj) -> Optional[str]: qs = MosquitoDeviceInfo.objects.filter(device_id=obj.device_id) if qs.count() > 0: device = qs.first() self.mosq_device_info = device return device.device_name return None def get_chip_type(self, obj): return obj.chip_type def get_remote_control_enabled(self, obj): return obj.chip_type == 'AIR-V2' def get_status(self, obj) -> int: # 区分一代和二代设备 if obj.chip_type == 'AIR-V2': mosq_device_info = self.mosq_device_info if mosq_device_info and mosq_device_info.last_connect: self.mosq_device_info = mosq_device_info now = datetime.now(pytz.utc) dt = abs(now - mosq_device_info.last_connect) if dt.seconds > 300: return 0 else: return 1 return obj.online def get_coordinate(self, obj): if self.mosq_device_info: lon, lat = [self.mosq_device_info.longitude, self.mosq_device_info.latitude] if lon and lat: return [lon, lat] return None def get_location(self, obj): if self.mosq_device_info: return self.mosq_device_info.location return None def get_location_group(self, obj): if self.mosq_device_info: return self.mosq_device_info.location_group return None def get_count(self, obj): self.latest = get_device_latest_by_cache(obj.device_id) # 区分一代和二代设备 if obj.chip_type == 'AIR-V2': if self.mosq_device_info and self.mosq_device_info.count is not None: return str(self.mosq_device_info.count) if self.latest: return self.latest['count'] return '0' def get_signal(self, obj): # 区分一代和二代设备 if obj.chip_type == 'AIR-V2': if self.mosq_device_info and self.mosq_device_info.signal is not None: return str(self.mosq_device_info.signal) if self.latest: return self.latest['csq'] return '0' def get_energy(self, obj): # 区分一代和二代设备 if obj.chip_type == 'AIR-V2': if self.mosq_device_info and self.mosq_device_info.energy is not None: return str(self.mosq_device_info.energy) if self.latest: if float(self.latest['vol']) <= 100: return '{}{}'.format(round(float(self.latest['vol']), 1), '%') return get_vol(obj.device_id, self.latest['vol']) return '0%' def get_image_url(self, obj): if self.mosq_device_info: return self.mosq_device_info.image_url return None def get_icon_url(self, obj): if self.mosq_device_info: return self.mosq_device_info.icon_url return None def get_led_status(self, obj): if self.mosq_device_info: return self.mosq_device_info.led_status return None def get_led_health(self, obj): if self.mosq_device_info: diff = datetime.now(tz=pytz.timezone('UTC')) - self.mosq_device_info.launch_time hours = diff.days * 24 + int(diff.seconds / 3600) rate = hours / self.mosq_device_info.led_lifetime if rate <= 0.3: return LedHealthStatus.Good.value if 0.3 < rate <= 0.6: return LedHealthStatus.General.value return LedHealthStatus.Bad.value return LedHealthStatus.Unknown.value