import os import re import json import struct from datetime import datetime # 常量定义 STEPS = [ {"desc": "1. [基础] 按电源键开机", "cmd": "power_on"}, {"desc": "2. [基础] 按电源键关机", "cmd": "power_off"}, *[{"desc": f"3.{i+1} [制冷] 设置{i+16}℃", "cmd": f"cool_{i+16}"} for i in range(5)],#基础温度为16 *[{"desc": f"4.{i+1} [制热] 设置{i+16}℃", "cmd": f"heat_{i+16}"} for i in range(5)], {"desc": "5.1 [模式] 制冷模式", "cmd": "mode_cool"}, {"desc": "5.2 [模式] 除湿模式", "cmd": "mode_dry"}, {"desc": "5.3 [模式] 送风模式", "cmd": "mode_fan"}, {"desc": "5.4 [模式] 上下风", "cmd": "up_down"},#上下风 {"desc": "6.1 [风速] 左右风", "cmd": "left_right"},#左右风 {"desc": "6.2 [风速] 低速", "cmd": "fan_low"}, {"desc": "6.3 [风速] 中速", "cmd": "fan_mid"}, {"desc": "6.4 [风速] 高速", "cmd": "fan_high"}, {"desc": "7.1 [特殊] 超强风速", "cmd": "turbo_on"},#超强风速 {"desc": "7.2 [特殊] 静音风速", "cmd": "turbo_off"},#静音风速 {"desc": "7.5 [特殊] 睡眠模式开", "cmd": "sleep_on"}, {"desc": "7.6 [特殊] 睡眠模式关", "cmd": "sleep_off"}, ] def validate_signal(data): """增强版信号验证""" pattern = r""" uint16_t\s+rawData\[(\d+)\]\s*=\s* # 匹配数组声明 {([\d,\s]+)};?[\s\S]*? # 捕获数组内容 Protocol\s*=\s*(\w+) # 协议类型 (?:.*?(\d+)\s+bits)? # 可选位数匹配 """ match = re.search(pattern, data, re.VERBOSE | re.IGNORECASE) if not match: return False, None, None, None try: arr_length = int(match.group(1)) raw_data = list(map(int, match.group(2).split(','))) protocol = match.group(3).upper() bits = int(match.group(4)) if match.group(4) else len(raw_data)*16 # 数据长度校验 if len(raw_data) != arr_length: print(f"数据长度不匹配: 声明{arr_length} 实际{len(raw_data)}") return False, None, None, None return raw_data, protocol, bits, len(data) except Exception as e: print(f"解析错误: {str(e)}") return False, None, None, None def analyze_signals(collected_data): """执行控制变量分析""" analysis = {"protocols": {}} # 按协议分组 protocol_groups = {} for cmd, data in collected_data.items(): proto = data['protocol'] protocol_groups.setdefault(proto, []).append(data) # 分析每个协议 for proto, group in protocol_groups.items(): proto_analysis = { "sample_count": len(group), "fixed_bits": [], "variable_bits": [], "template": [] } # 位级分析 max_length = max(len(d["raw"]) for d in group) bit_analysis = [[] for _ in range(max_length)] for data in group: for i, val in enumerate(data["raw"]): bit_analysis[i].append(val) # 识别固定/可变位 for idx, values in enumerate(bit_analysis): unique_vals = set(values) if len(unique_vals) == 1: proto_analysis["fixed_bits"].append({ "position": idx, "value": values[0] }) else: proto_analysis["variable_bits"].append({ "position": idx, "values": list(unique_vals) }) # 生成协议模板 proto_analysis["template"] = [ "V" if any(idx == b["position"] for b in proto_analysis["variable_bits"]) else group[0]["raw"][idx] for idx in range(len(group[0]["raw"])) ] analysis["protocols"][proto] = proto_analysis return analysis def collect_signals(): """主采集流程""" collected = {} print("\n空调红外信号采集系统启动") for step_num, step in enumerate(STEPS, 1): while True: print(f"\n[{step_num}/{len(STEPS)}] {step['desc']}") print("请输入红外信号数据(输入'exit'退出):") data = input().strip() if data.lower() == 'exit': if input("确认退出?(y/n): ").lower() == 'y': save_data(collected) return valid, raw, proto, bits = validate_signal(data) if valid: collected[step['cmd']] = { "raw": raw, "protocol": proto, "bits": bits, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } print(f"√ 已记录:{step['cmd']}") break print("× 格式错误!需包含:\n1.完整C数组 2.协议信息 3.正确数据格式") analysis = analyze_signals(collected) save_data(collected, analysis) print(f"\n采集完成!有效数据:{len(collected)}条") def convert_to_bin(json_path): """将JSON配置转换为二进制格式""" with open(json_path, 'r') as f: config = json.load(f) bin_data = bytearray() for cmd in config['raw_data'].values(): # 协议类型(4字节) + 位数(4字节) + 时间戳(12字节) bin_data.extend(struct.pack('