251 lines
7.5 KiB
Python
251 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
道闸控制模块
|
||
负责与Hi3861设备通信,控制道闸开关
|
||
"""
|
||
|
||
import socket
|
||
import json
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from PyQt5.QtCore import QObject, pyqtSignal, QThread
|
||
|
||
|
||
class GateControlThread(QThread):
|
||
"""道闸控制线程,用于异步发送命令"""
|
||
command_sent = pyqtSignal(str, bool) # 信号:命令内容,是否成功
|
||
|
||
def __init__(self, ip, port, command):
|
||
super().__init__()
|
||
self.ip = ip
|
||
self.port = port
|
||
self.command = command
|
||
|
||
def run(self):
|
||
"""发送命令到Hi3861设备"""
|
||
try:
|
||
# 创建UDP socket
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
||
# 发送命令
|
||
json_command = json.dumps(self.command, ensure_ascii=False)
|
||
sock.sendto(json_command.encode('utf-8'), (self.ip, self.port))
|
||
|
||
# 发出成功信号
|
||
self.command_sent.emit(json_command, True)
|
||
|
||
except Exception as e:
|
||
# 发出失败信号
|
||
self.command_sent.emit(f"发送失败: {e}", False)
|
||
finally:
|
||
sock.close()
|
||
|
||
|
||
class GateController(QObject):
|
||
"""道闸控制器"""
|
||
|
||
# 信号
|
||
log_message = pyqtSignal(str) # 日志消息
|
||
gate_opened = pyqtSignal(str) # 道闸打开信号,附带车牌号
|
||
|
||
def __init__(self, ip="192.168.43.12", port=8081):
|
||
super().__init__()
|
||
self.ip = ip
|
||
self.port = port
|
||
self.last_pass_times = {} # 记录车牌上次通过时间
|
||
self.thread_pool = [] # 线程池
|
||
|
||
def send_command(self, cmd, text=""):
|
||
"""
|
||
发送命令到道闸
|
||
|
||
参数:
|
||
cmd: 命令类型 (1-4)
|
||
text: 显示文本
|
||
|
||
返回:
|
||
bool: 是否发送成功
|
||
"""
|
||
# 创建JSON命令
|
||
command = {
|
||
"cmd": cmd,
|
||
"text": text
|
||
}
|
||
|
||
# 创建并启动线程发送命令
|
||
thread = GateControlThread(self.ip, self.port, command)
|
||
thread.command_sent.connect(self.on_command_sent)
|
||
thread.start()
|
||
self.thread_pool.append(thread)
|
||
|
||
# 记录日志
|
||
cmd_desc = {
|
||
1: "自动开闸(10秒后关闭)",
|
||
2: "手动开闸",
|
||
3: "手动关闸",
|
||
4: "仅显示信息"
|
||
}
|
||
self.log_message.emit(f"发送命令: {cmd_desc.get(cmd, '未知命令')} - {text}")
|
||
|
||
return True
|
||
|
||
def on_command_sent(self, message, success):
|
||
"""命令发送结果处理"""
|
||
if success:
|
||
self.log_message.emit(f"命令发送成功: {message}")
|
||
else:
|
||
self.log_message.emit(f"命令发送失败: {message}")
|
||
|
||
def auto_open_gate(self, plate_number):
|
||
"""
|
||
自动开闸(检测到白名单车牌时调用)
|
||
|
||
参数:
|
||
plate_number: 车牌号
|
||
"""
|
||
# 获取当前时间
|
||
current_time = datetime.now()
|
||
time_diff_str = ""
|
||
|
||
# 检查是否是第一次通行
|
||
if plate_number in self.last_pass_times:
|
||
# 第二次或更多次通行,计算时间差
|
||
last_time = self.last_pass_times[plate_number]
|
||
time_diff = current_time - last_time
|
||
|
||
# 格式化时间差
|
||
total_seconds = int(time_diff.total_seconds())
|
||
minutes = total_seconds // 60
|
||
seconds = total_seconds % 60
|
||
|
||
if minutes > 0:
|
||
time_diff_str = f" {minutes}min{seconds}sec"
|
||
else:
|
||
time_diff_str = f" {seconds}sec"
|
||
|
||
# 计算时间差后清空之前记录的时间点
|
||
del self.last_pass_times[plate_number]
|
||
log_msg = f"检测到白名单车牌: {plate_number},自动开闸{time_diff_str},已清空时间记录"
|
||
else:
|
||
# 第一次通行,只记录时间,不计算时间差
|
||
self.last_pass_times[plate_number] = current_time
|
||
log_msg = f"检测到白名单车牌: {plate_number},首次通行,已记录时间"
|
||
|
||
# 发送开闸命令
|
||
display_text = f"{plate_number} 通行{time_diff_str}"
|
||
self.send_command(1, display_text)
|
||
|
||
# 发出信号
|
||
self.gate_opened.emit(plate_number)
|
||
|
||
# 记录日志
|
||
self.log_message.emit(log_msg)
|
||
|
||
def manual_open_gate(self):
|
||
"""手动开闸"""
|
||
self.send_command(2, "")
|
||
self.log_message.emit("手动开闸")
|
||
|
||
def manual_close_gate(self):
|
||
"""手动关闸"""
|
||
self.send_command(3, "")
|
||
self.log_message.emit("手动关闸")
|
||
|
||
def display_message(self, text):
|
||
"""仅显示信息,不控制道闸"""
|
||
self.send_command(4, text)
|
||
self.log_message.emit(f"显示信息: {text}")
|
||
|
||
def deny_access(self, plate_number):
|
||
"""
|
||
拒绝通行(检测到非白名单车牌时调用)
|
||
|
||
参数:
|
||
plate_number: 车牌号
|
||
"""
|
||
self.send_command(4, f"{plate_number} 禁止通行")
|
||
self.log_message.emit(f"检测到非白名单车牌: {plate_number},拒绝通行")
|
||
|
||
|
||
class WhitelistManager(QObject):
|
||
"""白名单管理器"""
|
||
|
||
# 信号
|
||
whitelist_changed = pyqtSignal(list) # 白名单变更信号
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.whitelist = [] # 白名单车牌列表
|
||
|
||
def add_plate(self, plate_number):
|
||
"""
|
||
添加车牌到白名单
|
||
|
||
参数:
|
||
plate_number: 车牌号
|
||
|
||
返回:
|
||
bool: 是否添加成功
|
||
"""
|
||
if not plate_number or plate_number in self.whitelist:
|
||
return False
|
||
|
||
self.whitelist.append(plate_number)
|
||
self.whitelist_changed.emit(self.whitelist.copy())
|
||
return True
|
||
|
||
def remove_plate(self, plate_number):
|
||
"""
|
||
从白名单移除车牌
|
||
|
||
参数:
|
||
plate_number: 车牌号
|
||
|
||
返回:
|
||
bool: 是否移除成功
|
||
"""
|
||
if plate_number in self.whitelist:
|
||
self.whitelist.remove(plate_number)
|
||
self.whitelist_changed.emit(self.whitelist.copy())
|
||
return True
|
||
return False
|
||
|
||
def edit_plate(self, old_plate, new_plate):
|
||
"""
|
||
编辑白名单中的车牌
|
||
|
||
参数:
|
||
old_plate: 原车牌号
|
||
new_plate: 新车牌号
|
||
|
||
返回:
|
||
bool: 是否编辑成功
|
||
"""
|
||
if old_plate in self.whitelist and new_plate not in self.whitelist:
|
||
index = self.whitelist.index(old_plate)
|
||
self.whitelist[index] = new_plate
|
||
self.whitelist_changed.emit(self.whitelist.copy())
|
||
return True
|
||
return False
|
||
|
||
def is_whitelisted(self, plate_number):
|
||
"""
|
||
检查车牌是否在白名单中
|
||
|
||
参数:
|
||
plate_number: 车牌号
|
||
|
||
返回:
|
||
bool: 是否在白名单中
|
||
"""
|
||
return plate_number in self.whitelist
|
||
|
||
def get_whitelist(self):
|
||
"""获取白名单副本"""
|
||
return self.whitelist.copy()
|
||
|
||
def clear_whitelist(self):
|
||
"""清空白名单"""
|
||
self.whitelist.clear()
|
||
self.whitelist_changed.emit(self.whitelist.copy()) |