Mosqkiller-API/apps/mosquito/tasks.py

169 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pytz
from functools import reduce
# from collections import OrderedDict
from datetime import datetime, time, timedelta
from django.db.models import Max
from celery import task
import os
import django
pwd = os.path.dirname(os.path.realpath(__file__))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings")
django.setup()
from counter.models import DeviceCount
from counter.models import DeviceInfo as D1
from mosquito.models import MosqPostStatistic, DevicePostStatistic, DeviceInfo, Org, WeatherStationDeviceInfo
def max_count(x, y):
if x.count > y.count:
return x
return y
def shortest_date(date, date_list):
distance_list = [(date - _date).days for _date in date_list]
shortest_distance = min(filter(lambda x: x > 0, distance_list))
return shortest_distance
def fill_date(ret=None, tz=None):
date_list = [k for k in ret]
if len(date_list) == 0:
return None
min_date = min(date_list)
max_date = datetime.now(tz).date()
days = (max_date - min_date).days
for d in range(1, days + 1):
date_ahead = min_date + timedelta(days=d)
if date_ahead not in ret:
distance = shortest_date(date_ahead, date_list)
ret[date_ahead] = ret[date_ahead - timedelta(days=distance)]
return ret
def get_daily_statistic(enable_container=False):
device_container = {}
daily_ret = {}
qs = DeviceCount.objects.raw('select id, date(data_time) as date from device_count group by date')
if qs:
tz = pytz.timezone("UTC")
device_list = {item.device_id for item in DeviceInfo.objects.distinct()}
for item in device_list:
device_container[item] = set()
for q in qs:
# 按照 device_id 分组查询最新时间 max_time然后通过id + max_time 获取实例用来得到count
midnight = tz.localize(datetime.combine(q.date, time(23, 59)), is_dst=None)
dc = DeviceCount.objects.filter(data_time__lte=midnight).values('device_id').\
annotate(max_time=Max('data_time'))
daily_queryset = [DeviceCount.objects.filter(device_id=x['device_id'], data_time=x['max_time']) for x in dc]
if daily_queryset:
# 有些相同时间的记录,选取最大值的记录
daily_queryset = [reduce(max_count, entry) if len(entry) > 1 else entry[0] for entry in daily_queryset]
if enable_container:
for device_id in device_list:
device_container[device_id] = device_container[device_id] | \
{(entry.data_time.date(), entry.count)
for entry in daily_queryset if entry.device_id == device_id}
calc_result = sum(map(lambda x: int(x.count), daily_queryset))
date = q.date
daily_ret[date] = calc_result
return daily_ret, device_container
@task()
def update_daily_statistic():
"""
daily_ret: {datetime.date(2018, 8, 10): 124, datetime.date(2018, 8, 11): 137, ... }
device_container: {'869300034538870': {(datetime.date(2019, 5, 25), '7178'), (datetime.date(2019, 7, 11), '9293') ... }
"""
full_ret = None
# 计算每天聚合值
daily_ret, device_container = get_daily_statistic(enable_container=True)
# 计算没有记录的日期,值
if daily_ret:
full_ret = fill_date(daily_ret, tz=pytz.timezone("UTC"))
if device_container:
for device_id in device_container:
device = DeviceInfo.objects.get(device_id=device_id)
org = device.org
container = dict(device_container[device_id])
full_daily_ret = fill_date(container, tz=pytz.timezone("UTC"))
if full_daily_ret is None:
continue
_date_list = [k for k in full_daily_ret]
_date_list.sort()
is_first = True
for d in _date_list:
if is_first:
increment = full_daily_ret[d]
else:
increment = int(full_daily_ret[d]) - int(full_daily_ret[d - timedelta(days=1)])
try:
obj = DevicePostStatistic.objects.get(device_id=device_id, date=d)
except DevicePostStatistic.DoesNotExist:
DevicePostStatistic.objects.create(
device_id=device_id, total=full_daily_ret[d], increment=increment, org=org, date=d)
else:
obj.total = full_daily_ret[d]
obj.increment = increment
obj.org = org
obj.save()
is_first = False
# 写入数据库
if full_ret:
_date_list = [k for k in full_ret]
_date_list.sort()
is_first = True
for d in _date_list:
try:
obj = MosqPostStatistic.objects.get(date=d)
except MosqPostStatistic.DoesNotExist:
obj = None
if is_first:
if obj:
obj.total, obj.increment = full_ret[d], full_ret[d]
obj.save()
else:
MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=full_ret[d])
else:
increment = full_ret[d] - full_ret[d - timedelta(days=1)]
if obj:
obj.total, obj.increment = full_ret[d], increment
obj.save()
else:
MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=increment)
is_first = False
return _date_list
@task()
def update_latest_statistic():
pass
@task()
def update_mosq_device_info():
""" 更新mosquito的DeviceInfo表"""
d1 = D1.objects.values_list('device_id', flat=True)
d2 = DeviceInfo.objects.values_list('device_id', flat=True)
s1 = set(d1)
s2 = set(d2)
x = s1 - s2
default_org = Org.objects.all().first()
for device_id in x:
DeviceInfo.objects.create(device_id=device_id, org=default_org)
return list(x)
if __name__ == '__main__':
# a, b = get_daily_statistic()
# print(a, b)
update_daily_statistic()