import time as std_time import pytz import requests from functools import reduce from datetime import datetime, time, timedelta from django.db.models import Max from celery import task import os import django pwd = os.path.dirname(os.path.realpath(__file__)) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mosqkiller.settings") django.setup() from counter.models import DeviceCount as DeviceCount from counter.models import DeviceInfo as CounterDeviceInfor from mosquito.models import MosqPostStatistic, DevicePostStatistic, DeviceInfo, Org def max_count(x, y): if x.count > y.count: return x return y def shortest_date(date, date_list): distance_list = [(date - _date).days for _date in date_list] shortest_distance = min(filter(lambda x: x > 0, distance_list)) return shortest_distance def fill_date(ret=None, tz=None): date_list = [k for k in ret] if len(date_list) == 0: return None min_date = min(date_list) max_date = datetime.now(tz).date() days = (max_date - min_date).days for d in range(1, days + 1): date_ahead = min_date + timedelta(days=d) if date_ahead not in ret: distance = shortest_date(date_ahead, date_list) ret[date_ahead] = ret[date_ahead - timedelta(days=distance)] return ret def get_daily_statistic(enable_container=False): device_container = {} daily_ret = {} qs = DeviceCount.objects.raw('select id, date(data_time) as date from device_count group by date') if qs: tz = pytz.timezone("UTC") device_list = {item.device_id for item in DeviceInfo.objects.filter().exclude(chip_type='AIR-V2').distinct()} for item in device_list: device_container[item] = set() for q in qs: # 按照 device_id 分组查询最新时间 max_time,然后通过id + max_time 获取实例,用来得到count midnight = tz.localize(datetime.combine(q.date, time(23, 59)), is_dst=None) dc = DeviceCount.objects.filter(data_time__lte=midnight).values('device_id'). \ annotate(max_time=Max('data_time')) daily_queryset = [DeviceCount.objects.filter(device_id=x['device_id'], data_time=x['max_time']) for x in dc] if daily_queryset: # 有些相同时间的记录,选取最大值的记录 daily_queryset = [reduce(max_count, entry) if len(entry) > 1 else entry[0] for entry in daily_queryset] if enable_container: for device_id in device_list: device_container[device_id] = device_container[device_id] | \ {(entry.data_time.date(), entry.count) for entry in daily_queryset if entry.device_id == device_id} calc_result = sum(map(lambda x: int(x.count), daily_queryset)) date = q.date daily_ret[date] = calc_result return daily_ret, device_container @task() def update_daily_statistic(): """ daily_ret: {datetime.date(2018, 8, 10): 124, datetime.date(2018, 8, 11): 137, ... } device_container: {'869300034538870': {(datetime.date(2019, 5, 25), '7178'), (datetime.date(2019, 7, 11), '9293') ... } """ full_ret = None # 计算每天聚合值 daily_ret, device_container = get_daily_statistic(enable_container=True) # 计算没有记录的日期,值 if daily_ret: full_ret = fill_date(daily_ret, tz=pytz.timezone("UTC")) if device_container: for device_id in device_container: device = DeviceInfo.objects.get(device_id=device_id) org = device.org container = dict(device_container[device_id]) full_daily_ret = fill_date(container, tz=pytz.timezone("UTC")) if full_daily_ret is None: continue _date_list = [k for k in full_daily_ret] _date_list.sort() is_first = True for d in _date_list: if is_first: increment = full_daily_ret[d] else: increment = int(full_daily_ret[d]) - int(full_daily_ret[d - timedelta(days=1)]) try: obj = DevicePostStatistic.objects.get(device_id=device_id, date=d) except DevicePostStatistic.DoesNotExist: DevicePostStatistic.objects.create( device_id=device_id, total=full_daily_ret[d], increment=increment, org=org, date=d) else: obj.total = full_daily_ret[d] obj.increment = increment obj.org = org obj.save() is_first = False # 写入数据库 if full_ret: _date_list = [k for k in full_ret] _date_list.sort() is_first = True for d in _date_list: try: obj = MosqPostStatistic.objects.get(date=d) except MosqPostStatistic.DoesNotExist: obj = None if is_first: if obj: obj.total, obj.increment = full_ret[d], full_ret[d] obj.save() else: MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=full_ret[d]) else: increment = full_ret[d] - full_ret[d - timedelta(days=1)] if obj: obj.total, obj.increment = full_ret[d], increment obj.save() else: MosqPostStatistic.objects.create(date=d, total=full_ret[d], increment=increment) is_first = False return _date_list def get_location(lon: int, lat: int) -> str: key = 'f9fc42eb1a45f311beeb832740f67b0f' url = 'https://restapi.amap.com/v3/geocode/regeo?output=json&location={},{}&key={}'.format(lon, lat, key) resp = requests.get(url) if resp.status_code != 200: return '' data = resp.json() if data['status'] == '1' and data['info'] == 'OK': address = data['regeocode']['formatted_address'] # address == [] if isinstance(address, list): return '' return address else: return '' @task() def update_mosql_device_location(): """ 更新 mosquito DeviceInfo 表 location 字段 高德逆地理编码配额:5000/天,并发30/秒 """ device_list = DeviceInfo.objects.all() for device in device_list: if device.longitude and device.latitude: location = get_location(device.longitude, device.latitude) std_time.sleep(0.1) device.location = location device.save() @task() def update_mosq_device_info(): """ 更新 mosquito DeviceInfo 表""" counter_devices = CounterDeviceInfor.objects.values_list('device_id', flat=True) mosq_devices = DeviceInfo.objects.values_list('device_id', flat=True) s1 = set(counter_devices) s2 = set(mosq_devices) delta = s1 - s2 # update existing device for device_id in mosq_devices: counter_device = DeviceCount.objects.filter(device_id=device_id).order_by('-data_time').first() if counter_device is None: continue ds = DeviceInfo.objects.get(device_id=device_id) ds.longitude = counter_device.longitude ds.latitude = counter_device.latitude ds.save() # create new device default_org = Org.objects.all().first() for device_id in delta: counter_device = DeviceCount.objects.filter(device_id=device_id).order_by('-data_time').first() # 设备上报了信息 if counter_device: latitude = counter_device.latitude longitude = counter_device.longitude else: latitude = None longitude = None DeviceInfo.objects.create(device_id=device_id, org=default_org, latitude=latitude, longitude=longitude) return list(delta) if __name__ == '__main__': a, b = get_daily_statistic() print(a) print(b) # print(a, b) # update_mosq_device_info() # update_mosql_device_location()