Mosqkiller-API/apps/mosquito/tasks.py

140 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pytz
from functools import reduce
# from collections import OrderedDict
from datetime import datetime, time, timedelta
from django.db.models import Max
from celery import task
from counter.models import DeviceCount
from counter.models import DeviceInfo as D1
from mosquito.models import MosqPostStatistic, DevicePostStatistic, DeviceInfo, Org
def max_count(x, y):
if x.count > y.count:
return x
return y
def shortest_date(date, date_list):
distance_list = [(date - _date).days for _date in date_list]
shortest_distance = min(filter(lambda x: x > 0, distance_list))
return shortest_distance
def fill_date(ret=None, tz=None):
date_list = [k for k in ret]
min_date = min(date_list)
max_date = datetime.now(tz).date()
days = (max_date - min_date).days
for d in range(1, days + 1):
date_ahead = min_date + timedelta(days=d)
if date_ahead not in ret:
distance = shortest_date(date_ahead, date_list)
ret[date_ahead] = ret[date_ahead - timedelta(days=distance)]
return ret
def get_daily_statistic(enable_container=False):
device_container = {}
daily_ret = {}
queryset = DeviceCount.objects.raw('select id, date(data_time) as date from device_count group by date')
if queryset:
tz = pytz.timezone("UTC")
device_list = {item.device_id for item in DeviceCount.objects.distinct()}
for item in device_list:
device_container[item] = set()
for q in queryset:
# 按照 device_id 分组查询最新时间 max_time然后通过id + max_time 获取实例用来得到count
midnight = tz.localize(datetime.combine(q.date, time(23, 59)), is_dst=None)
daily_queryset = [DeviceCount.objects.filter(device_id=x['device_id'], data_time=x['max_time'])
for x in DeviceCount.objects.filter(data_time__lte=midnight).values(
'device_id').annotate(max_time=Max('data_time'))]
if daily_queryset:
daily_queryset = [reduce(max_count, entry) if len(entry) > 1 else entry[0] for entry in daily_queryset]
if enable_container:
for device_id in device_list:
device_container[device_id] = device_container[device_id] | \
{(entry.data_time.date(), entry.count)
for entry in daily_queryset if entry.device_id == device_id}
calc_result = sum(map(lambda x: int(x.count), daily_queryset))
date = q.date
daily_ret[date] = calc_result
return daily_ret, device_container
@task()
def update_daily_statistic():
full_ret = None
# 计算每天聚合值
daily_ret, device_container = get_daily_statistic(enable_container=True)
# 计算没有记录的日期,值
if daily_ret:
full_ret = fill_date(daily_ret, tz=pytz.timezone("UTC"))
if device_container:
for device_id in device_container:
device = DeviceInfo.objects.get(device_id=device_id)
org = device.org
container = dict(device_container[device_id])
full_daily_ret = fill_date(container, tz=pytz.timezone("UTC"))
_date_list = [k for k in full_daily_ret]
_date_list.sort()
for d in _date_list:
try:
obj = DevicePostStatistic.objects.get(device_id=device_id, date=d)
except DevicePostStatistic.DoesNotExist:
obj = None
if obj:
obj.total = full_daily_ret[d]
obj.org = org
obj.save()
else:
DevicePostStatistic.objects.create(device_id=device_id, total=full_daily_ret[d], org=org, date=d)
# 写入数据库
if full_ret:
_date_list = [k for k in full_ret]
_date_list.sort()
is_first = True
for d in _date_list:
try:
obj = MosqPostStatistic.objects.get(date=d)
except MosqPostStatistic.DoesNotExist:
obj = None
if is_first:
if obj:
obj.total, obj.increment = full_ret[d], full_ret[d]
obj.save()
else:
MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=full_ret[d])
else:
increment = full_ret[d] - full_ret[d - timedelta(days=1)]
if obj:
obj.total, obj.increment = full_ret[d], increment
obj.save()
else:
MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=increment)
is_first = False
return _date_list
@task()
def update_latest_statistic():
pass
@task()
def update_mosq_device_info():
""" 更新mosquito的DeviceInfo表"""
d1 = D1.objects.values('device_id', flat=True)
d2 = DeviceInfo.objects.values('device_id', flat=True)
s1 = set(d1)
s2 = set(d2)
x = s1 - s2
default_org = Org.objects.all().first()
for device_id in x:
DeviceInfo.objects.create(device_id=device_id, org=default_org)