Mosqkiller-API/apps/mosquito/api/views.py

155 lines
6.2 KiB
Python

import re
import pytz
from rest_framework.generics import (
ListAPIView,
RetrieveAPIView,
CreateAPIView
)
from django.utils.timezone import timedelta, datetime
from rest_framework.response import Response
from rest_framework import filters
from rest_framework.pagination import PageNumberPagination
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.permissions import IsAuthenticated
from rest_framework.filters import SearchFilter, OrderingFilter
from django.db.models import Q
from counter.mixins.role import RoleMixin, DeviceListMixin, WeatherStationDeviceListMixin
from ..models import Mosquito, MosqPost, DeviceTempLog, WeatherLog
from .pagination import PostLimitOffsetPagination, DeviceLogHistoryPagination, WeatherlogHistoryPagination
from .serializers import (
MosqListSerializer,
MosqPostListSerializer,
DeviceTempLogSerializer,
WeatherLogSerializer,
)
class DeviceLogPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'limit'
page_query_param = 'page'
max_page_size = 1000
class MosquitoListAPIView(ListAPIView):
serializer_class = MosqListSerializer
permission_classes = [IsAuthenticated]
filter_backends = [SearchFilter, OrderingFilter]
pagination_class = PostLimitOffsetPagination
search_fields = ['name', 'device_id', 'region']
def get_queryset(self, *args, **kwargs):
queryset_list = Mosquito.objects.all()
query = self.request.GET.get('q')
if query:
queryset_list = queryset_list.filter(
Q(name__icontains=query) |
Q(device_id__icontains=query) |
Q(region__icontains=query)
).distinct()
return queryset_list
class MosquitoPostListAPIView(ListAPIView):
serializer_class = MosqPostListSerializer
permission_classes = [IsAuthenticated, ]
filter_backends = [SearchFilter, OrderingFilter]
pagination_class = PostLimitOffsetPagination
search_fields = ['mosq__name', 'mosq__region']
def get_queryset(self, *args, **kwargs):
queryset_list = MosqPost.objects.all()
query = self.request.GET.get('q')
if query:
queryset_list = queryset_list.filter(
Q(mosq__name__contains=query) |
Q(mosq__region=query)
).distinct()
return queryset_list
class DeviceTempLogListAPIView(ListAPIView, RoleMixin, DeviceListMixin):
serializer_class = DeviceTempLogSerializer
permission_classes = [IsAuthenticated]
filter_backends = (DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter)
pagination_class = DeviceLogHistoryPagination
search_fields = ['device_id']
filterset_fields = ['device_id']
ordering_fields = ['create_time']
def get_queryset(self, *args, **kwargs):
user_roles = self.get_role()
start = self.request.GET.get('start')
end = self.request.GET.get('end')
queryset = DeviceTempLog.objects.get_queryset().order_by('create_time')
if 'staff' in user_roles or 'manager' in user_roles:
devices = self.get_device_list()
device_ids = [device.device_id for device in devices if device.org == self.request.user.org]
queryset = queryset.filter(device_id__in=device_ids)
last_day = self.request.GET.get('last_day')
if last_day == '1':
t = datetime.now().replace(tzinfo=pytz.timezone('UTC')) - timedelta(days=1)
queryset = queryset.filter(last_time__gte=t).filter(last_time__minute=0)
if start and end:
t_pattern = re.compile(r'^2\d{3}-\d{1,2}-\d{1,2}$')
start_m = t_pattern.match(start)
end_m = t_pattern.match(end)
if not start_m or not end_m:
return Response({'detail': 'start or end must be format "yyyy-mm-dd"'})
start = datetime.strptime(start, '%Y-%m-%d')
end = datetime.strptime(end, '%Y-%m-%d')
queryset = queryset.filter(last_time__gte=start, last_time__lt=end + timedelta(days=1))
return queryset
class WeatherLogListAPIView(ListAPIView, RoleMixin, WeatherStationDeviceListMixin):
serializer_class = WeatherLogSerializer
permission_classes = [IsAuthenticated]
filter_backends = (DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter)
pagination_class = WeatherlogHistoryPagination
search_fields = ['device_id']
filterset_fields = ['device_id']
ordering_fields = ['data_time']
def get_queryset(self, *args, **kwargs):
user_roles = self.get_role()
device = self.request.GET.get('device')
start = self.request.GET.get('start')
end = self.request.GET.get('end')
queryset = WeatherLog.objects.get_queryset().order_by('-data_time')
if 'staff' in user_roles or 'manager' in user_roles:
device_list = self.get_device_list()
device_ids = [query.device_id for query in device_list if query.org == self.request.user.org]
queryset = queryset.filter(device_id__in=device_ids)
if device:
# device_id or device_name
qs = queryset.filter(device_id__icontains=device)
if qs.count() > 0:
queryset = qs
else:
qs = WeatherLog.objects.filter(device_name__contains=device)
device_ids = [device.device_id for device in qs]
queryset = queryset.filter(device_id__in=device_ids)
last_day = self.request.GET.get('last_day')
if last_day == '1':
t = datetime.now().replace(tzinfo=pytz.timezone('UTC')) - timedelta(days=1)
queryset = queryset.filter(data_time__gte=t).filter(data_time__minute=0)
if start and end:
t_pattern = re.compile(r'^2\d{3}-\d{1,2}-\d{1,2}$')
start_m = t_pattern.match(start)
end_m = t_pattern.match(end)
if not start_m or not end_m:
return Response({'detail': 'start or end must be format "yyyy-mm-dd"'})
start = datetime.strptime(start, '%Y-%m-%d')
end = datetime.strptime(end, '%Y-%m-%d')
queryset = queryset.filter(data_time__gte=start, data_time__lt=end + timedelta(days=1))
return queryset