import pytz from functools import reduce # from collections import OrderedDict from datetime import datetime, time, timedelta from django.db.models import Max from celery import task from counter.models import DeviceCount from counter.models import DeviceInfo as D1 from mosquito.models import MosqPostStatistic, DevicePostStatistic, DeviceInfo, Org def max_count(x, y): if x.count > y.count: return x return y def shortest_date(date, date_list): distance_list = [(date - _date).days for _date in date_list] shortest_distance = min(filter(lambda x: x > 0, distance_list)) return shortest_distance def fill_date(ret=None, tz=None): date_list = [k for k in ret] min_date = min(date_list) max_date = datetime.now(tz).date() days = (max_date - min_date).days for d in range(1, days + 1): date_ahead = min_date + timedelta(days=d) if date_ahead not in ret: distance = shortest_date(date_ahead, date_list) ret[date_ahead] = ret[date_ahead - timedelta(days=distance)] return ret def get_daily_statistic(enable_container=False): device_container = {} daily_ret = {} queryset = DeviceCount.objects.raw('select id, date(data_time) as date from device_count group by date') if queryset: tz = pytz.timezone("UTC") device_list = {item.device_id for item in DeviceCount.objects.distinct()} for item in device_list: device_container[item] = set() for q in queryset: # 按照 device_id 分组查询最新时间 max_time,然后通过id + max_time 获取实例,用来得到count midnight = tz.localize(datetime.combine(q.date, time(23, 59)), is_dst=None) daily_queryset = [DeviceCount.objects.filter(device_id=x['device_id'], data_time=x['max_time']) for x in DeviceCount.objects.filter(data_time__lte=midnight).values( 'device_id').annotate(max_time=Max('data_time'))] if daily_queryset: daily_queryset = [reduce(max_count, entry) if len(entry) > 1 else entry[0] for entry in daily_queryset] if enable_container: for device_id in device_list: device_container[device_id] = device_container[device_id] | \ {(entry.data_time.date(), entry.count) for entry in daily_queryset if entry.device_id == device_id} calc_result = sum(map(lambda x: int(x.count), daily_queryset)) date = q.date daily_ret[date] = calc_result return daily_ret, device_container @task() def update_daily_statistic(): full_ret = None # 计算每天聚合值 daily_ret, device_container = get_daily_statistic(enable_container=True) # 计算没有记录的日期,值 if daily_ret: full_ret = fill_date(daily_ret, tz=pytz.timezone("UTC")) if device_container: for device_id in device_container: device = DeviceInfo.objects.get(device_id=device_id) org = device.org container = dict(device_container[device_id]) full_daily_ret = fill_date(container, tz=pytz.timezone("UTC")) _date_list = [k for k in full_daily_ret] _date_list.sort() for d in _date_list: try: obj = DevicePostStatistic.objects.get(device_id=device_id, date=d) except DevicePostStatistic.DoesNotExist: obj = None if obj: obj.total = full_daily_ret[d] obj.org = org obj.save() else: DevicePostStatistic.objects.create(device_id=device_id, total=full_daily_ret[d], org=org, date=d) # 写入数据库 if full_ret: _date_list = [k for k in full_ret] _date_list.sort() is_first = True for d in _date_list: try: obj = MosqPostStatistic.objects.get(date=d) except MosqPostStatistic.DoesNotExist: obj = None if is_first: if obj: obj.total, obj.increment = full_ret[d], full_ret[d] obj.save() else: MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=full_ret[d]) else: increment = full_ret[d] - full_ret[d - timedelta(days=1)] if obj: obj.total, obj.increment = full_ret[d], increment obj.save() else: MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=increment) is_first = False return _date_list @task() def update_latest_statistic(): pass @task() def update_mosq_device_info(): """ 更新mosquito的DeviceInfo表""" d1 = D1.objects.values_list('device_id', flat=True) d2 = DeviceInfo.objects.values_list('device_id', flat=True) s1 = set(d1) s2 = set(d2) x = s1 - s2 default_org = Org.objects.all().first() for device_id in x: DeviceInfo.objects.create(device_id=device_id, org=default_org)