251 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						||
# -*- coding: utf-8 -*-
 | 
						||
"""
 | 
						||
道闸控制模块
 | 
						||
负责与Hi3861设备通信,控制道闸开关
 | 
						||
"""
 | 
						||
 | 
						||
import socket
 | 
						||
import json
 | 
						||
import time
 | 
						||
from datetime import datetime, timedelta
 | 
						||
from PyQt5.QtCore import QObject, pyqtSignal, QThread
 | 
						||
 | 
						||
 | 
						||
class GateControlThread(QThread):
 | 
						||
    """道闸控制线程,用于异步发送命令"""
 | 
						||
    command_sent = pyqtSignal(str, bool)  # 信号:命令内容,是否成功
 | 
						||
    
 | 
						||
    def __init__(self, ip, port, command):
 | 
						||
        super().__init__()
 | 
						||
        self.ip = ip
 | 
						||
        self.port = port
 | 
						||
        self.command = command
 | 
						||
    
 | 
						||
    def run(self):
 | 
						||
        """发送命令到Hi3861设备"""
 | 
						||
        try:
 | 
						||
            # 创建UDP socket
 | 
						||
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						||
            
 | 
						||
            # 发送命令
 | 
						||
            json_command = json.dumps(self.command, ensure_ascii=False)
 | 
						||
            sock.sendto(json_command.encode('utf-8'), (self.ip, self.port))
 | 
						||
            
 | 
						||
            # 发出成功信号
 | 
						||
            self.command_sent.emit(json_command, True)
 | 
						||
            
 | 
						||
        except Exception as e:
 | 
						||
            # 发出失败信号
 | 
						||
            self.command_sent.emit(f"发送失败: {e}", False)
 | 
						||
        finally:
 | 
						||
            sock.close()
 | 
						||
 | 
						||
 | 
						||
class GateController(QObject):
 | 
						||
    """道闸控制器"""
 | 
						||
    
 | 
						||
    # 信号
 | 
						||
    log_message = pyqtSignal(str)  # 日志消息
 | 
						||
    gate_opened = pyqtSignal(str)  # 道闸打开信号,附带车牌号
 | 
						||
    
 | 
						||
    def __init__(self, ip="192.168.43.12", port=8081):
 | 
						||
        super().__init__()
 | 
						||
        self.ip = ip
 | 
						||
        self.port = port
 | 
						||
        self.last_pass_times = {}  # 记录车牌上次通过时间
 | 
						||
        self.thread_pool = []  # 线程池
 | 
						||
        
 | 
						||
    def send_command(self, cmd, text=""):
 | 
						||
        """
 | 
						||
        发送命令到道闸
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            cmd: 命令类型 (1-4)
 | 
						||
            text: 显示文本
 | 
						||
            
 | 
						||
        返回:
 | 
						||
            bool: 是否发送成功
 | 
						||
        """
 | 
						||
        # 创建JSON命令
 | 
						||
        command = {
 | 
						||
            "cmd": cmd,
 | 
						||
            "text": text
 | 
						||
        }
 | 
						||
        
 | 
						||
        # 创建并启动线程发送命令
 | 
						||
        thread = GateControlThread(self.ip, self.port, command)
 | 
						||
        thread.command_sent.connect(self.on_command_sent)
 | 
						||
        thread.start()
 | 
						||
        self.thread_pool.append(thread)
 | 
						||
        
 | 
						||
        # 记录日志
 | 
						||
        cmd_desc = {
 | 
						||
            1: "自动开闸(10秒后关闭)",
 | 
						||
            2: "手动开闸",
 | 
						||
            3: "手动关闸",
 | 
						||
            4: "仅显示信息"
 | 
						||
        }
 | 
						||
        self.log_message.emit(f"发送命令: {cmd_desc.get(cmd, '未知命令')} - {text}")
 | 
						||
        
 | 
						||
        return True
 | 
						||
    
 | 
						||
    def on_command_sent(self, message, success):
 | 
						||
        """命令发送结果处理"""
 | 
						||
        if success:
 | 
						||
            self.log_message.emit(f"命令发送成功: {message}")
 | 
						||
        else:
 | 
						||
            self.log_message.emit(f"命令发送失败: {message}")
 | 
						||
    
 | 
						||
    def auto_open_gate(self, plate_number):
 | 
						||
        """
 | 
						||
        自动开闸(检测到白名单车牌时调用)
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            plate_number: 车牌号
 | 
						||
        """
 | 
						||
        # 获取当前时间
 | 
						||
        current_time = datetime.now()
 | 
						||
        time_diff_str = ""
 | 
						||
        
 | 
						||
        # 检查是否是第一次通行
 | 
						||
        if plate_number in self.last_pass_times:
 | 
						||
            # 第二次或更多次通行,计算时间差
 | 
						||
            last_time = self.last_pass_times[plate_number]
 | 
						||
            time_diff = current_time - last_time
 | 
						||
            
 | 
						||
            # 格式化时间差
 | 
						||
            total_seconds = int(time_diff.total_seconds())
 | 
						||
            minutes = total_seconds // 60
 | 
						||
            seconds = total_seconds % 60
 | 
						||
            
 | 
						||
            if minutes > 0:
 | 
						||
                time_diff_str = f" {minutes}min{seconds}sec"
 | 
						||
            else:
 | 
						||
                time_diff_str = f" {seconds}sec"
 | 
						||
            
 | 
						||
            # 计算时间差后清空之前记录的时间点
 | 
						||
            del self.last_pass_times[plate_number]
 | 
						||
            log_msg = f"检测到白名单车牌: {plate_number},自动开闸{time_diff_str},已清空时间记录"
 | 
						||
        else:
 | 
						||
            # 第一次通行,只记录时间,不计算时间差
 | 
						||
            self.last_pass_times[plate_number] = current_time
 | 
						||
            log_msg = f"检测到白名单车牌: {plate_number},首次通行,已记录时间"
 | 
						||
        
 | 
						||
        # 发送开闸命令
 | 
						||
        display_text = f"{plate_number} 通行{time_diff_str}"
 | 
						||
        self.send_command(1, display_text)
 | 
						||
        
 | 
						||
        # 发出信号
 | 
						||
        self.gate_opened.emit(plate_number)
 | 
						||
        
 | 
						||
        # 记录日志
 | 
						||
        self.log_message.emit(log_msg)
 | 
						||
    
 | 
						||
    def manual_open_gate(self):
 | 
						||
        """手动开闸"""
 | 
						||
        self.send_command(2, "")
 | 
						||
        self.log_message.emit("手动开闸")
 | 
						||
    
 | 
						||
    def manual_close_gate(self):
 | 
						||
        """手动关闸"""
 | 
						||
        self.send_command(3, "")
 | 
						||
        self.log_message.emit("手动关闸")
 | 
						||
    
 | 
						||
    def display_message(self, text):
 | 
						||
        """仅显示信息,不控制道闸"""
 | 
						||
        self.send_command(4, text)
 | 
						||
        self.log_message.emit(f"显示信息: {text}")
 | 
						||
    
 | 
						||
    def deny_access(self, plate_number):
 | 
						||
        """
 | 
						||
        拒绝通行(检测到非白名单车牌时调用)
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            plate_number: 车牌号
 | 
						||
        """
 | 
						||
        self.send_command(4, f"{plate_number} 禁止通行")
 | 
						||
        self.log_message.emit(f"检测到非白名单车牌: {plate_number},拒绝通行")
 | 
						||
 | 
						||
 | 
						||
class WhitelistManager(QObject):
 | 
						||
    """白名单管理器"""
 | 
						||
    
 | 
						||
    # 信号
 | 
						||
    whitelist_changed = pyqtSignal(list)  # 白名单变更信号
 | 
						||
    
 | 
						||
    def __init__(self):
 | 
						||
        super().__init__()
 | 
						||
        self.whitelist = []  # 白名单车牌列表
 | 
						||
    
 | 
						||
    def add_plate(self, plate_number):
 | 
						||
        """
 | 
						||
        添加车牌到白名单
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            plate_number: 车牌号
 | 
						||
            
 | 
						||
        返回:
 | 
						||
            bool: 是否添加成功
 | 
						||
        """
 | 
						||
        if not plate_number or plate_number in self.whitelist:
 | 
						||
            return False
 | 
						||
        
 | 
						||
        self.whitelist.append(plate_number)
 | 
						||
        self.whitelist_changed.emit(self.whitelist.copy())
 | 
						||
        return True
 | 
						||
    
 | 
						||
    def remove_plate(self, plate_number):
 | 
						||
        """
 | 
						||
        从白名单移除车牌
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            plate_number: 车牌号
 | 
						||
            
 | 
						||
        返回:
 | 
						||
            bool: 是否移除成功
 | 
						||
        """
 | 
						||
        if plate_number in self.whitelist:
 | 
						||
            self.whitelist.remove(plate_number)
 | 
						||
            self.whitelist_changed.emit(self.whitelist.copy())
 | 
						||
            return True
 | 
						||
        return False
 | 
						||
    
 | 
						||
    def edit_plate(self, old_plate, new_plate):
 | 
						||
        """
 | 
						||
        编辑白名单中的车牌
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            old_plate: 原车牌号
 | 
						||
            new_plate: 新车牌号
 | 
						||
            
 | 
						||
        返回:
 | 
						||
            bool: 是否编辑成功
 | 
						||
        """
 | 
						||
        if old_plate in self.whitelist and new_plate not in self.whitelist:
 | 
						||
            index = self.whitelist.index(old_plate)
 | 
						||
            self.whitelist[index] = new_plate
 | 
						||
            self.whitelist_changed.emit(self.whitelist.copy())
 | 
						||
            return True
 | 
						||
        return False
 | 
						||
    
 | 
						||
    def is_whitelisted(self, plate_number):
 | 
						||
        """
 | 
						||
        检查车牌是否在白名单中
 | 
						||
        
 | 
						||
        参数:
 | 
						||
            plate_number: 车牌号
 | 
						||
            
 | 
						||
        返回:
 | 
						||
            bool: 是否在白名单中
 | 
						||
        """
 | 
						||
        return plate_number in self.whitelist
 | 
						||
    
 | 
						||
    def get_whitelist(self):
 | 
						||
        """获取白名单副本"""
 | 
						||
        return self.whitelist.copy()
 | 
						||
    
 | 
						||
    def clear_whitelist(self):
 | 
						||
        """清空白名单"""
 | 
						||
        self.whitelist.clear()
 | 
						||
        self.whitelist_changed.emit(self.whitelist.copy()) |