from flask import Flask, render_template, jsonify, request import serial import serial.tools.list_ports import threading import time import subprocess import sys import logging app = Flask(__name__) # 初始化灯光状态 lights = { 'common': False, 'alarm': False, 'up': False, 'down': False, 'emergency_stop': False } # 全局变量用于存储串口对象 ser = None # 用于存储所有接收到的信号 all_signals = [] # 线程控制变量 thread_running = False def reset_lights(): global lights for key in lights: lights[key] = False def analyze_signals(): global lights, all_signals reset_lights() # 首先检查急停条件 if len(all_signals) >= 3 and (all_signals[-3:] == ['09', '0b', '08'] or all_signals[-3:] == ['09', '0d', '08']): lights['emergency_stop'] = True print("急停灯亮") lights['common'] = False lights['alarm'] = False lights['up'] = False lights['down'] = False all_signals.clear() return # 如果急停条件满足,立即返回 # 然后检查 '08' 信号 if all_signals and all_signals[-1] == '08': print("收到08信号, 所有灯熄灭") reset_lights() # 重置所有灯的状态 return if '09' in all_signals: lights['common'] = True if '0b' in all_signals or (len(all_signals) >= 2 and all_signals[-2:] == ['09', '0b']): lights['up'] = True lights['common'] = True if all_signals[-1:] != ['0b']: lights['up'] = False if '0d' in all_signals or (len(all_signals) >= 2 and all_signals[-2:] == ['09', '0d']): lights['down'] = True lights['common'] = True if all_signals[-1:] != ['0d']: lights['down'] = False if len(all_signals) >= 3 and all_signals[-3:] == ['09', '08', '09']: lights['alarm'] = True if all_signals[-2:] != ['08', '09']: lights['alarm'] = False all_signals.clear() def read_serial(): global all_signals, thread_running while thread_running: try: if ser and ser.is_open and ser.in_waiting: data = ser.read().hex() print(f"接收到数据: {data}") all_signals.append(data) analyze_signals() if len(all_signals) > 100: all_signals = all_signals[-100:] print(f"当前灯光状态: {lights}") print(f"当前信号数组: {all_signals}") else: time.sleep(0.05) except serial.SerialException as e: print(f"串口错误: {e}") time.sleep(1) # 设置日志 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @app.route('/') def index(): return render_template('index.html') @app.route('/get_lights_status') def get_lights_status(): return jsonify(lights) @app.route('/get_signals') def get_signals(): return jsonify(all_signals) @app.route('/get_ports') def get_ports(): try: ports = [] if sys.platform.startswith('win'): result = subprocess.check_output(['wmic', 'path', 'Win32_PnPEntity', 'Get', 'DeviceID,Caption'], universal_newlines=True) for line in result.split('\n'): if '(COM' in line: com = line.split('(')[1].split(')')[0] ports.append({'device': com, 'description': line.strip()}) elif sys.platform.startswith('linux') or sys.platform.startswith('darwin'): cmd = 'ls /dev/tty*' if sys.platform.startswith('linux') else 'ls /dev/tty.*' result = subprocess.check_output(cmd, shell=True, universal_newlines=True) for line in result.split('\n'): if line: ports.append({'device': line, 'description': line}) else: raise OSError(f"Unsupported operating system: {sys.platform}") logger.debug(f"Found ports: {ports}") return jsonify(ports) except Exception as e: logger.error(f"Error getting ports: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/set_port', methods=['POST']) def set_port(): global ser, thread_running data = request.json port = data.get('port') if not port: return jsonify({'success': False, 'message': '未提供串口地址'}) try: if ser and ser.is_open: thread_running = False time.sleep(0.5) ser.close() ser = serial.Serial(port, 9600, timeout=1) thread_running = True threading.Thread(target=read_serial, daemon=True).start() return jsonify({'success': True, 'message': '串口设置成功'}) except serial.SerialException as e: return jsonify({'success': False, 'message': f'串口打开失败: {str(e)}'}) except Exception as e: return jsonify({'success': False, 'message': f'未知错误: {str(e)}'}) @app.route('/test_port', methods=['POST']) def test_port(): global ser if not ser or not ser.is_open: return jsonify({'success': False, 'message': '串口未打开'}) try: ser.write(b'TEST\r\n') response = ser.readline().decode().strip() if response: return jsonify({'success': True, 'message': f'收到响应: {response}'}) else: return jsonify({'success': False, 'message': '未收到响应'}) except Exception as e: return jsonify({'success': False, 'message': f'测试失败: {str(e)}'}) @app.route('/clear_signal_data', methods=['POST']) def clear_signal_data(): global lights_status lights_status = { 'common': False, 'alarm': False, 'up': False, 'down': False, 'emergency_stop': False } return jsonify({"success": True}) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5888)