Mosqkiller-API/apps/mosquito/tasks.py

216 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time as std_time
import pytz
import requests
from functools import reduce
from datetime import datetime, time, timedelta
from django.db.models import Max
from celery import task
import os
import django
pwd = os.path.dirname(os.path.realpath(__file__))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings")
django.setup()
from counter.models import DeviceCount as DeviceCount
from counter.models import DeviceInfo as CounterDeviceInfor
from mosquito.models import MosqPostStatistic, DevicePostStatistic, DeviceInfo, Org
def max_count(x, y):
if x.count > y.count:
return x
return y
def shortest_date(date, date_list):
distance_list = [(date - _date).days for _date in date_list]
shortest_distance = min(filter(lambda x: x > 0, distance_list))
return shortest_distance
def fill_date(ret=None, tz=None):
date_list = [k for k in ret]
if len(date_list) == 0:
return None
min_date = min(date_list)
max_date = datetime.now(tz).date()
days = (max_date - min_date).days
for d in range(1, days + 1):
date_ahead = min_date + timedelta(days=d)
if date_ahead not in ret:
distance = shortest_date(date_ahead, date_list)
ret[date_ahead] = ret[date_ahead - timedelta(days=distance)]
return ret
def get_daily_statistic(enable_container=False):
device_container = {}
daily_ret = {}
qs = DeviceCount.objects.raw('select id, date(data_time) as date from device_count group by date')
if qs:
tz = pytz.timezone("UTC")
device_list = {item.device_id for item in DeviceInfo.objects.distinct()}
for item in device_list:
device_container[item] = set()
for q in qs:
# 按照 device_id 分组查询最新时间 max_time然后通过id + max_time 获取实例用来得到count
midnight = tz.localize(datetime.combine(q.date, time(23, 59)), is_dst=None)
dc = DeviceCount.objects.filter(data_time__lte=midnight).values('device_id'). \
annotate(max_time=Max('data_time'))
daily_queryset = [DeviceCount.objects.filter(device_id=x['device_id'], data_time=x['max_time']) for x in dc]
if daily_queryset:
# 有些相同时间的记录,选取最大值的记录
daily_queryset = [reduce(max_count, entry) if len(entry) > 1 else entry[0] for entry in daily_queryset]
if enable_container:
for device_id in device_list:
device_container[device_id] = device_container[device_id] | \
{(entry.data_time.date(), entry.count)
for entry in daily_queryset if entry.device_id == device_id}
calc_result = sum(map(lambda x: int(x.count), daily_queryset))
date = q.date
daily_ret[date] = calc_result
return daily_ret, device_container
@task()
def update_daily_statistic():
"""
daily_ret: {datetime.date(2018, 8, 10): 124, datetime.date(2018, 8, 11): 137, ... }
device_container: {'869300034538870': {(datetime.date(2019, 5, 25), '7178'), (datetime.date(2019, 7, 11), '9293') ... }
"""
full_ret = None
# 计算每天聚合值
daily_ret, device_container = get_daily_statistic(enable_container=True)
# 计算没有记录的日期,值
if daily_ret:
full_ret = fill_date(daily_ret, tz=pytz.timezone("UTC"))
if device_container:
for device_id in device_container:
device = DeviceInfo.objects.get(device_id=device_id)
org = device.org
container = dict(device_container[device_id])
full_daily_ret = fill_date(container, tz=pytz.timezone("UTC"))
if full_daily_ret is None:
continue
_date_list = [k for k in full_daily_ret]
_date_list.sort()
is_first = True
for d in _date_list:
if is_first:
increment = full_daily_ret[d]
else:
increment = int(full_daily_ret[d]) - int(full_daily_ret[d - timedelta(days=1)])
try:
obj = DevicePostStatistic.objects.get(device_id=device_id, date=d)
except DevicePostStatistic.DoesNotExist:
DevicePostStatistic.objects.create(
device_id=device_id, total=full_daily_ret[d], increment=increment, org=org, date=d)
else:
obj.total = full_daily_ret[d]
obj.increment = increment
obj.org = org
obj.save()
is_first = False
# 写入数据库
if full_ret:
_date_list = [k for k in full_ret]
_date_list.sort()
is_first = True
for d in _date_list:
try:
obj = MosqPostStatistic.objects.get(date=d)
except MosqPostStatistic.DoesNotExist:
obj = None
if is_first:
if obj:
obj.total, obj.increment = full_ret[d], full_ret[d]
obj.save()
else:
MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=full_ret[d])
else:
increment = full_ret[d] - full_ret[d - timedelta(days=1)]
if obj:
obj.total, obj.increment = full_ret[d], increment
obj.save()
else:
MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=increment)
is_first = False
return _date_list
def get_location(lon: int, lat: int) -> str:
key = 'f9fc42eb1a45f311beeb832740f67b0f'
url = 'https://restapi.amap.com/v3/geocode/regeo?output=json&location={},{}&key={}'.format(lon, lat, key)
resp = requests.get(url)
if resp.status_code != 200:
return ''
data = resp.json()
if data['status'] == '1' and data['info'] == 'OK':
address = data['regeocode']['formatted_address']
# address == []
if isinstance(address, list):
return ''
return address
else:
return ''
@task()
def update_mosql_device_location():
"""
更新 mosquito DeviceInfo 表 location 字段
高德逆地理编码配额5000/天并发30/秒
"""
device_list = DeviceInfo.objects.all()
for device in device_list:
if device.longitude and device.latitude:
location = get_location(device.longitude, device.latitude)
std_time.sleep(0.1)
device.location = location
device.save()
@task()
def update_mosq_device_info():
""" 更新 mosquito DeviceInfo 表"""
counter_devices = CounterDeviceInfor.objects.values_list('device_id', flat=True)
mosq_devices = DeviceInfo.objects.values_list('device_id', flat=True)
s1 = set(counter_devices)
s2 = set(mosq_devices)
delta = s1 - s2
# update existing device
for device_id in mosq_devices:
counter_device = DeviceCount.objects.filter(device_id=device_id).order_by('-data_time').first()
if counter_device is None:
continue
ds = DeviceInfo.objects.get(device_id=device_id)
ds.longitude = counter_device.longitude
ds.latitude = counter_device.latitude
ds.save()
# create new device
default_org = Org.objects.all().first()
for device_id in delta:
counter_device = DeviceCount.objects.filter(device_id=device_id).order_by('-data_time').first()
DeviceInfo.objects.create(device_id=device_id,
org=default_org,
latitude=counter_device.latitude,
longitude=counter_device.longitude)
return list(delta)
if __name__ == '__main__':
# a, b = get_daily_statistic()
# print(a, b)
# update_mosq_device_info()
update_mosql_device_location()