207 lines
7.3 KiB
Python
207 lines
7.3 KiB
Python
import os
|
||
import re
|
||
import json
|
||
import struct
|
||
from datetime import datetime
|
||
|
||
# 常量定义
|
||
STEPS = [
|
||
{"desc": "1. [基础] 按电源键开机", "cmd": "power_on"},
|
||
{"desc": "2. [基础] 按电源键关机", "cmd": "power_off"},
|
||
*[{"desc": f"3.{i+1} [制冷] 设置{i+16}℃", "cmd": f"cool_{i+16}"} for i in range(5)],#基础温度为16
|
||
*[{"desc": f"4.{i+1} [制热] 设置{i+16}℃", "cmd": f"heat_{i+16}"} for i in range(5)],
|
||
{"desc": "5.1 [模式] 制冷模式", "cmd": "mode_cool"},
|
||
{"desc": "5.2 [模式] 除湿模式", "cmd": "mode_dry"},
|
||
{"desc": "5.3 [模式] 送风模式", "cmd": "mode_fan"},
|
||
{"desc": "5.4 [模式] 上下风", "cmd": "up_down"},#上下风
|
||
{"desc": "6.1 [风速] 左右风", "cmd": "left_right"},#左右风
|
||
{"desc": "6.2 [风速] 低速", "cmd": "fan_low"},
|
||
{"desc": "6.3 [风速] 中速", "cmd": "fan_mid"},
|
||
{"desc": "6.4 [风速] 高速", "cmd": "fan_high"},
|
||
{"desc": "7.1 [特殊] 超强风速", "cmd": "turbo_on"},#超强风速
|
||
{"desc": "7.2 [特殊] 静音风速", "cmd": "turbo_off"},#静音风速
|
||
{"desc": "7.5 [特殊] 睡眠模式开", "cmd": "sleep_on"},
|
||
{"desc": "7.6 [特殊] 睡眠模式关", "cmd": "sleep_off"},
|
||
|
||
|
||
|
||
|
||
]
|
||
|
||
def validate_signal(data):
|
||
"""增强版信号验证"""
|
||
pattern = r"""
|
||
uint16_t\s+rawData\[(\d+)\]\s*=\s* # 匹配数组声明
|
||
{([\d,\s]+)};?[\s\S]*? # 捕获数组内容
|
||
Protocol\s*=\s*(\w+) # 协议类型
|
||
(?:.*?(\d+)\s+bits)? # 可选位数匹配
|
||
"""
|
||
match = re.search(pattern, data, re.VERBOSE | re.IGNORECASE)
|
||
|
||
if not match:
|
||
return False, None, None, None
|
||
|
||
try:
|
||
arr_length = int(match.group(1))
|
||
raw_data = list(map(int, match.group(2).split(',')))
|
||
protocol = match.group(3).upper()
|
||
bits = int(match.group(4)) if match.group(4) else len(raw_data)*16
|
||
|
||
# 数据长度校验
|
||
if len(raw_data) != arr_length:
|
||
print(f"数据长度不匹配: 声明{arr_length} 实际{len(raw_data)}")
|
||
return False, None, None, None
|
||
|
||
return raw_data, protocol, bits, len(data)
|
||
except Exception as e:
|
||
print(f"解析错误: {str(e)}")
|
||
return False, None, None, None
|
||
|
||
def analyze_signals(collected_data):
|
||
"""执行控制变量分析"""
|
||
analysis = {"protocols": {}}
|
||
|
||
# 按协议分组
|
||
protocol_groups = {}
|
||
for cmd, data in collected_data.items():
|
||
proto = data['protocol']
|
||
protocol_groups.setdefault(proto, []).append(data)
|
||
|
||
# 分析每个协议
|
||
for proto, group in protocol_groups.items():
|
||
proto_analysis = {
|
||
"sample_count": len(group),
|
||
"fixed_bits": [],
|
||
"variable_bits": [],
|
||
"template": []
|
||
}
|
||
|
||
# 位级分析
|
||
max_length = max(len(d["raw"]) for d in group)
|
||
bit_analysis = [[] for _ in range(max_length)]
|
||
|
||
for data in group:
|
||
for i, val in enumerate(data["raw"]):
|
||
bit_analysis[i].append(val)
|
||
|
||
# 识别固定/可变位
|
||
for idx, values in enumerate(bit_analysis):
|
||
unique_vals = set(values)
|
||
if len(unique_vals) == 1:
|
||
proto_analysis["fixed_bits"].append({
|
||
"position": idx,
|
||
"value": values[0]
|
||
})
|
||
else:
|
||
proto_analysis["variable_bits"].append({
|
||
"position": idx,
|
||
"values": list(unique_vals)
|
||
})
|
||
|
||
# 生成协议模板
|
||
proto_analysis["template"] = [
|
||
"V" if any(idx == b["position"] for b in proto_analysis["variable_bits"])
|
||
else group[0]["raw"][idx]
|
||
for idx in range(len(group[0]["raw"]))
|
||
]
|
||
|
||
analysis["protocols"][proto] = proto_analysis
|
||
|
||
return analysis
|
||
|
||
def collect_signals():
|
||
"""主采集流程"""
|
||
collected = {}
|
||
print("\n空调红外信号采集系统启动")
|
||
|
||
for step_num, step in enumerate(STEPS, 1):
|
||
while True:
|
||
print(f"\n[{step_num}/{len(STEPS)}] {step['desc']}")
|
||
print("请输入红外信号数据(输入'exit'退出):")
|
||
data = input().strip()
|
||
|
||
if data.lower() == 'exit':
|
||
if input("确认退出?(y/n): ").lower() == 'y':
|
||
save_data(collected)
|
||
return
|
||
|
||
valid, raw, proto, bits = validate_signal(data)
|
||
if valid:
|
||
collected[step['cmd']] = {
|
||
"raw": raw,
|
||
"protocol": proto,
|
||
"bits": bits,
|
||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
print(f"√ 已记录:{step['cmd']}")
|
||
break
|
||
print("× 格式错误!需包含:\n1.完整C数组 2.协议信息 3.正确数据格式")
|
||
|
||
analysis = analyze_signals(collected)
|
||
save_data(collected, analysis)
|
||
print(f"\n采集完成!有效数据:{len(collected)}条")
|
||
|
||
def convert_to_bin(json_path):
|
||
"""将JSON配置转换为二进制格式"""
|
||
with open(json_path, 'r') as f:
|
||
config = json.load(f)
|
||
|
||
bin_data = bytearray()
|
||
for cmd in config['raw_data'].values():
|
||
# 协议类型(4字节) + 位数(4字节) + 时间戳(12字节)
|
||
bin_data.extend(struct.pack('<I', cmd['protocol']))
|
||
bin_data.extend(struct.pack('<I', cmd['bits']))
|
||
|
||
# 时间编码调整(年-2000压缩到1字节,其他字段范围校验)
|
||
dt = datetime.strptime(cmd['timestamp'], "%Y-%m-%d %H:%M:%S")
|
||
year_byte = dt.year - 2000
|
||
if not (0 <= year_byte <= 255):
|
||
year_byte = max(0, min(year_byte, 255))
|
||
time_bytes = struct.pack('<6B',
|
||
year_byte,
|
||
max(1, min(dt.month, 12)),
|
||
max(1, min(dt.day, 31)),
|
||
dt.hour,
|
||
dt.minute,
|
||
dt.second
|
||
)
|
||
bin_data.extend(time_bytes)
|
||
|
||
bin_path = os.path.splitext(json_path)[0] + '.bin'
|
||
with open(bin_path, 'wb') as f:
|
||
f.write(bin_data)
|
||
print(f"二进制配置已生成: {os.path.basename(bin_path)}")
|
||
|
||
|
||
def save_data(data, analysis=None):
|
||
"""保存数据到文件"""
|
||
# 简化后的配置数据结构
|
||
config = {
|
||
"protocol_info": {
|
||
"protocol_type": "NEC",
|
||
"bits": sum(len(v) for v in data.values())
|
||
},
|
||
"raw_data": data
|
||
}
|
||
|
||
file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "ir_config.json")
|
||
with open(file_path, "w") as f:
|
||
json.dump(config, f, indent=2)
|
||
print("配置已保存至 ir_config.json")
|
||
convert_to_bin(file_path)
|
||
|
||
if __name__ == "__main__":
|
||
print("="*40)
|
||
print("红外信号采集分析系统 v2.0")
|
||
print("操作说明:")
|
||
print("1. 保持设备连接")
|
||
print("2. 按提示操作遥控器")
|
||
print("3. 从串口监视器复制数据")
|
||
print("="*40)
|
||
|
||
try:
|
||
collect_signals()
|
||
except KeyboardInterrupt:
|
||
print("\n操作中断!正在保存数据...")
|
||
save_data(collected if 'collected' in locals() else {})
|