#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 向Hi3861设备发送JSON命令 """ import socket import json import time import pyttsx3 import threading target_ip = "192.168.43.12" target_port = 8081 def speak_text(text): """ 使用文本转语音播放文本 每次调用都创建新的引擎实例以避免并发问题 """ def _speak(): try: if text and text.strip(): # 确保文本不为空 # 在线程内部创建新的引擎实例 engine = pyttsx3.init() # 设置语音速度 engine.setProperty('rate', 150) # 设置音量(0.0到1.0) engine.setProperty('volume', 0.8) engine.say(text) engine.runAndWait() # 清理引擎 engine.stop() del engine except Exception as e: print(f"语音播放失败: {e}") # 在新线程中播放语音,避免阻塞 speech_thread = threading.Thread(target=_speak) speech_thread.daemon = True speech_thread.start() def send_command(cmd, text): #cmd为1,道闸打开十秒后关闭,oled显示字符串信息(默认使用及cmd为4) #cmd为2,道闸舵机向打开方向旋转90度,oled上不显示(仅在qt界面手动开闸时调用) #cmd为3,道闸舵机向关闭方向旋转90度,oled上不显示(仅在qt界面手动关闸时调用) #cmd为4,oled显示字符串信息,道闸舵机不旋转 command = { "cmd": cmd, "text": text } json_command = json.dumps(command, ensure_ascii=False) try: # 创建UDP socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.sendto(json_command.encode('utf-8'), (target_ip, target_port)) # 发送命令后播放语音 if text and text.strip(): speak_text(text) except Exception as e: print(f"发送命令失败: {e}") finally: sock.close()