69 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			69 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						||
# -*- coding: utf-8 -*-
 | 
						||
"""
 | 
						||
向Hi3861设备发送JSON命令
 | 
						||
"""
 | 
						||
 | 
						||
import socket
 | 
						||
import json
 | 
						||
import time
 | 
						||
import pyttsx3
 | 
						||
import threading
 | 
						||
 | 
						||
target_ip = "192.168.43.12"
 | 
						||
target_port = 8081
 | 
						||
 | 
						||
def speak_text(text):
 | 
						||
    """
 | 
						||
    使用文本转语音播放文本
 | 
						||
    每次调用都创建新的引擎实例以避免并发问题
 | 
						||
    """
 | 
						||
    def _speak():
 | 
						||
        try:
 | 
						||
            if text and text.strip():  # 确保文本不为空
 | 
						||
                # 在线程内部创建新的引擎实例
 | 
						||
                engine = pyttsx3.init()
 | 
						||
                # 设置语音速度
 | 
						||
                engine.setProperty('rate', 150)
 | 
						||
                # 设置音量(0.0到1.0)
 | 
						||
                engine.setProperty('volume', 0.8)
 | 
						||
                
 | 
						||
                engine.say(text)
 | 
						||
                engine.runAndWait()
 | 
						||
                
 | 
						||
                # 清理引擎
 | 
						||
                engine.stop()
 | 
						||
                del engine
 | 
						||
        except Exception as e:
 | 
						||
            print(f"语音播放失败: {e}")
 | 
						||
    
 | 
						||
    # 在新线程中播放语音,避免阻塞
 | 
						||
    speech_thread = threading.Thread(target=_speak)
 | 
						||
    speech_thread.daemon = True
 | 
						||
    speech_thread.start()
 | 
						||
 | 
						||
def send_command(cmd, text):
 | 
						||
    #cmd为1,道闸打开十秒后关闭,oled显示字符串信息(默认使用及cmd为4)
 | 
						||
    #cmd为2,道闸舵机向打开方向旋转90度,oled上不显示(仅在qt界面手动开闸时调用)
 | 
						||
    #cmd为3,道闸舵机向关闭方向旋转90度,oled上不显示(仅在qt界面手动关闸时调用)
 | 
						||
    #cmd为4,oled显示字符串信息,道闸舵机不旋转
 | 
						||
 | 
						||
    command = {
 | 
						||
        "cmd": cmd,
 | 
						||
        "text": text
 | 
						||
    }
 | 
						||
    
 | 
						||
    json_command = json.dumps(command, ensure_ascii=False)
 | 
						||
    try:
 | 
						||
        # 创建UDP socket
 | 
						||
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 | 
						||
        sock.sendto(json_command.encode('utf-8'), (target_ip, target_port))
 | 
						||
        
 | 
						||
        # 发送命令后播放语音
 | 
						||
        if text and text.strip():
 | 
						||
            speak_text(text)
 | 
						||
            
 | 
						||
    except Exception as e:
 | 
						||
        print(f"发送命令失败: {e}")
 | 
						||
    finally:
 | 
						||
        sock.close() |