life_handle_tool/app.py

178 lines
5.5 KiB
Python

from flask import Flask, render_template, jsonify, request
import serial
import serial.tools.list_ports
import threading
import time
import subprocess
import sys
import logging
app = Flask(__name__)
# 初始化灯光状态
lights = {
'common': False,
'alarm': False,
'up': False,
'down': False,
'emergency_stop': False
}
# 全局变量用于存储串口对象
ser = None
# 用于存储所有接收到的信号
all_signals = []
# 线程控制变量
thread_running = False
def reset_lights():
global lights
for key in lights:
lights[key] = False
def analyze_signals():
global lights, all_signals
reset_lights()
# 首先检查急停条件
if len(all_signals) >= 3 and (all_signals[-3:] == ['09', '0b', '08'] or all_signals[-3:] == ['09', '0d', '08']):
lights['emergency_stop'] = True
print("急停灯亮")
lights['common'] = False
lights['alarm'] = False
lights['up'] = False
lights['down'] = False
return # 如果急停条件满足,立即返回
# 然后检查 '08' 信号
if all_signals and all_signals[-1] == '08':
print("收到08信号, 所有灯熄灭")
reset_lights() # 重置所有灯的状态
return
if '09' in all_signals:
lights['common'] = True
if len(all_signals) >= 3 and all_signals[-3:] == ['09', '08', '09']:
lights['alarm'] = True
if all_signals[-2:] != ['08', '09']:
lights['alarm'] = False
if '0b' in all_signals or (len(all_signals) >= 2 and all_signals[-2:] == ['09', '0b']):
lights['up'] = True
if all_signals[-1:] != ['0b']:
lights['up'] = False
if '0d' in all_signals or (len(all_signals) >= 2 and all_signals[-2:] == ['09', '0d']):
lights['down'] = True
if all_signals[-1:] != ['0d']:
lights['down'] = False
def read_serial():
global all_signals, thread_running
while thread_running:
try:
if ser and ser.is_open and ser.in_waiting:
data = ser.read().hex()
print(f"接收到数据: {data}")
all_signals.append(data)
analyze_signals()
if len(all_signals) > 100:
all_signals = all_signals[-100:]
print(f"当前灯光状态: {lights}")
print(f"当前信号数组: {all_signals}")
else:
time.sleep(0.05)
except serial.SerialException as e:
print(f"串口错误: {e}")
time.sleep(1)
# 设置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/get_lights_status')
def get_lights_status():
return jsonify(lights)
@app.route('/get_signals')
def get_signals():
return jsonify(all_signals)
@app.route('/get_ports')
def get_ports():
try:
ports = []
if sys.platform.startswith('win'):
result = subprocess.check_output(['wmic', 'path', 'Win32_PnPEntity', 'Get', 'DeviceID,Caption'], universal_newlines=True)
for line in result.split('\n'):
if '(COM' in line:
com = line.split('(')[1].split(')')[0]
ports.append({'device': com, 'description': line.strip()})
elif sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
cmd = 'ls /dev/tty*' if sys.platform.startswith('linux') else 'ls /dev/tty.*'
result = subprocess.check_output(cmd, shell=True, universal_newlines=True)
for line in result.split('\n'):
if line:
ports.append({'device': line, 'description': line})
else:
raise OSError(f"Unsupported operating system: {sys.platform}")
logger.debug(f"Found ports: {ports}")
return jsonify(ports)
except Exception as e:
logger.error(f"Error getting ports: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/set_port', methods=['POST'])
def set_port():
global ser, thread_running
data = request.json
port = data.get('port')
if not port:
return jsonify({'success': False, 'message': '未提供串口地址'})
try:
if ser and ser.is_open:
thread_running = False
time.sleep(0.5)
ser.close()
ser = serial.Serial(port, 9600, timeout=1)
thread_running = True
threading.Thread(target=read_serial, daemon=True).start()
return jsonify({'success': True, 'message': '串口设置成功'})
except serial.SerialException as e:
return jsonify({'success': False, 'message': f'串口打开失败: {str(e)}'})
except Exception as e:
return jsonify({'success': False, 'message': f'未知错误: {str(e)}'})
@app.route('/test_port', methods=['POST'])
def test_port():
global ser
if not ser or not ser.is_open:
return jsonify({'success': False, 'message': '串口未打开'})
try:
ser.write(b'TEST\r\n')
response = ser.readline().decode().strip()
if response:
return jsonify({'success': True, 'message': f'收到响应: {response}'})
else:
return jsonify({'success': False, 'message': '未收到响应'})
except Exception as e:
return jsonify({'success': False, 'message': f'测试失败: {str(e)}'})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5888)