#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 道闸控制模块 负责与Hi3861设备通信,控制道闸开关 """ import socket import json import time from datetime import datetime, timedelta from PyQt5.QtCore import QObject, pyqtSignal, QThread class GateControlThread(QThread): """道闸控制线程,用于异步发送命令""" command_sent = pyqtSignal(str, bool) # 信号:命令内容,是否成功 def __init__(self, ip, port, command): super().__init__() self.ip = ip self.port = port self.command = command def run(self): """发送命令到Hi3861设备""" try: # 创建UDP socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 发送命令 json_command = json.dumps(self.command, ensure_ascii=False) sock.sendto(json_command.encode('utf-8'), (self.ip, self.port)) # 发出成功信号 self.command_sent.emit(json_command, True) except Exception as e: # 发出失败信号 self.command_sent.emit(f"发送失败: {e}", False) finally: sock.close() class GateController(QObject): """道闸控制器""" # 信号 log_message = pyqtSignal(str) # 日志消息 gate_opened = pyqtSignal(str) # 道闸打开信号,附带车牌号 def __init__(self, ip="192.168.43.12", port=8081): super().__init__() self.ip = ip self.port = port self.last_pass_times = {} # 记录车牌上次通过时间 self.thread_pool = [] # 线程池 def send_command(self, cmd, text=""): """ 发送命令到道闸 参数: cmd: 命令类型 (1-4) text: 显示文本 返回: bool: 是否发送成功 """ # 创建JSON命令 command = { "cmd": cmd, "text": text } # 创建并启动线程发送命令 thread = GateControlThread(self.ip, self.port, command) thread.command_sent.connect(self.on_command_sent) thread.start() self.thread_pool.append(thread) # 记录日志 cmd_desc = { 1: "自动开闸(10秒后关闭)", 2: "手动开闸", 3: "手动关闸", 4: "仅显示信息" } self.log_message.emit(f"发送命令: {cmd_desc.get(cmd, '未知命令')} - {text}") return True def on_command_sent(self, message, success): """命令发送结果处理""" if success: self.log_message.emit(f"命令发送成功: {message}") else: self.log_message.emit(f"命令发送失败: {message}") def auto_open_gate(self, plate_number): """ 自动开闸(检测到白名单车牌时调用) 参数: plate_number: 车牌号 """ # 获取当前时间 current_time = datetime.now() time_diff_str = "" # 检查是否是第一次通行 if plate_number in self.last_pass_times: # 第二次或更多次通行,计算时间差 last_time = self.last_pass_times[plate_number] time_diff = current_time - last_time # 格式化时间差 total_seconds = int(time_diff.total_seconds()) minutes = total_seconds // 60 seconds = total_seconds % 60 if minutes > 0: time_diff_str = f" {minutes}min{seconds}sec" else: time_diff_str = f" {seconds}sec" # 计算时间差后清空之前记录的时间点 del self.last_pass_times[plate_number] log_msg = f"检测到白名单车牌: {plate_number},自动开闸{time_diff_str},已清空时间记录" else: # 第一次通行,只记录时间,不计算时间差 self.last_pass_times[plate_number] = current_time log_msg = f"检测到白名单车牌: {plate_number},首次通行,已记录时间" # 发送开闸命令 display_text = f"{plate_number} 通行{time_diff_str}" self.send_command(1, display_text) # 发出信号 self.gate_opened.emit(plate_number) # 记录日志 self.log_message.emit(log_msg) def manual_open_gate(self): """手动开闸""" self.send_command(2, "") self.log_message.emit("手动开闸") def manual_close_gate(self): """手动关闸""" self.send_command(3, "") self.log_message.emit("手动关闸") def display_message(self, text): """仅显示信息,不控制道闸""" self.send_command(4, text) self.log_message.emit(f"显示信息: {text}") def deny_access(self, plate_number): """ 拒绝通行(检测到非白名单车牌时调用) 参数: plate_number: 车牌号 """ self.send_command(4, f"{plate_number} 禁止通行") self.log_message.emit(f"检测到非白名单车牌: {plate_number},拒绝通行") class WhitelistManager(QObject): """白名单管理器""" # 信号 whitelist_changed = pyqtSignal(list) # 白名单变更信号 def __init__(self): super().__init__() self.whitelist = [] # 白名单车牌列表 def add_plate(self, plate_number): """ 添加车牌到白名单 参数: plate_number: 车牌号 返回: bool: 是否添加成功 """ if not plate_number or plate_number in self.whitelist: return False self.whitelist.append(plate_number) self.whitelist_changed.emit(self.whitelist.copy()) return True def remove_plate(self, plate_number): """ 从白名单移除车牌 参数: plate_number: 车牌号 返回: bool: 是否移除成功 """ if plate_number in self.whitelist: self.whitelist.remove(plate_number) self.whitelist_changed.emit(self.whitelist.copy()) return True return False def edit_plate(self, old_plate, new_plate): """ 编辑白名单中的车牌 参数: old_plate: 原车牌号 new_plate: 新车牌号 返回: bool: 是否编辑成功 """ if old_plate in self.whitelist and new_plate not in self.whitelist: index = self.whitelist.index(old_plate) self.whitelist[index] = new_plate self.whitelist_changed.emit(self.whitelist.copy()) return True return False def is_whitelisted(self, plate_number): """ 检查车牌是否在白名单中 参数: plate_number: 车牌号 返回: bool: 是否在白名单中 """ return plate_number in self.whitelist def get_whitelist(self): """获取白名单副本""" return self.whitelist.copy() def clear_whitelist(self): """清空白名单""" self.whitelist.clear() self.whitelist_changed.emit(self.whitelist.copy())