yolorestart

This commit is contained in:
2025-08-30 11:53:20 +08:00
parent 090c44433f
commit 19176c71b7
26 changed files with 939 additions and 1825 deletions

View File

@@ -1 +0,0 @@
# 工具模块初始化文件

View File

@@ -1,280 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频捕获管理
支持摄像头和视频文件的切换和管理
"""
import cv2
import os
import time
import threading
from typing import Optional, Tuple
class VideoCapture:
"""视频捕获管理类"""
def __init__(self):
"""
初始化视频捕获管理器
"""
self.cap = None
self.is_camera = True
self.video_path = None
self.fps_counter = FPSCounter()
self.frame_lock = threading.Lock()
self.current_frame = None
self.is_running = False
self.video_fps = 30.0 # 视频原始帧率
# 设置视频文件路径
self.video_file_path = os.path.join(os.path.dirname(__file__), "..", "video.mp4")
def start_capture(self, use_camera: int = 1) -> bool:
"""
开始视频捕获
Args:
use_camera: 1使用摄像头0使用视频文件
Returns:
是否成功启动
"""
self.stop_capture()
self.is_camera = bool(use_camera)
try:
if self.is_camera:
# 使用摄像头
self.cap = cv2.VideoCapture(0)
if not self.cap.isOpened():
# 尝试其他摄像头索引
for i in range(1, 5):
self.cap = cv2.VideoCapture(i)
if self.cap.isOpened():
break
else:
print("无法打开摄像头")
return False
# 设置摄像头参数
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.cap.set(cv2.CAP_PROP_FPS, 30)
print("摄像头启动成功")
else:
# 使用视频文件
if not os.path.exists(self.video_file_path):
print(f"视频文件不存在: {self.video_file_path}")
return False
self.cap = cv2.VideoCapture(self.video_file_path)
if not self.cap.isOpened():
print(f"无法打开视频文件: {self.video_file_path}")
return False
# 获取视频原始帧率
self.video_fps = self.cap.get(cv2.CAP_PROP_FPS)
if self.video_fps <= 0:
self.video_fps = 25.0 # 默认帧率
print(f"视频文件加载成功: {self.video_file_path}, FPS: {self.video_fps}")
self.is_running = True
self.fps_counter.reset()
return True
except Exception as e:
print(f"启动视频捕获失败: {e}")
return False
def stop_capture(self):
"""停止视频捕获"""
self.is_running = False
if self.cap is not None:
self.cap.release()
self.cap = None
with self.frame_lock:
self.current_frame = None
print("视频捕获已停止")
def get_frame(self) -> Tuple[Optional[cv2.Mat], float]:
"""
获取当前帧
Returns:
(frame, fps): 当前帧和FPS
"""
if not self.is_running or self.cap is None:
return None, 0.0
try:
ret, frame = self.cap.read()
if not ret:
if not self.is_camera:
# 视频文件播放完毕,重新开始(循环播放)
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = self.cap.read()
if not ret:
return None, 0.0
# 更新FPS计数器
fps = self.fps_counter.update()
# 在帧上绘制FPS信息
frame_with_fps = self._draw_fps(frame, fps)
with self.frame_lock:
self.current_frame = frame_with_fps.copy()
return frame_with_fps, fps
except Exception as e:
print(f"获取帧失败: {e}")
return None, 0.0
def _draw_fps(self, frame: cv2.Mat, fps: float) -> cv2.Mat:
"""
在帧上绘制FPS信息
Args:
frame: 输入帧
fps: 当前FPS
Returns:
绘制了FPS的帧
"""
result_frame = frame.copy()
# FPS文本
fps_text = f"FPS: {fps:.1f}"
# 文本参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.7
color = (0, 255, 0) # 绿色
thickness = 2
# 获取文本尺寸
text_size = cv2.getTextSize(fps_text, font, font_scale, thickness)[0]
# 绘制背景矩形
cv2.rectangle(result_frame,
(10, 10),
(20 + text_size[0], 20 + text_size[1]),
(0, 0, 0), -1)
# 绘制FPS文本
cv2.putText(result_frame, fps_text,
(15, 15 + text_size[1]),
font, font_scale, color, thickness)
return result_frame
def get_capture_info(self) -> dict:
"""
获取捕获信息
Returns:
捕获信息字典
"""
info = {
'is_running': self.is_running,
'is_camera': self.is_camera,
'video_path': self.video_file_path if not self.is_camera else None,
'fps': self.fps_counter.get_fps(),
'video_fps': self.video_fps
}
if self.cap is not None:
try:
info['width'] = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
info['height'] = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if not self.is_camera:
info['total_frames'] = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
info['current_frame'] = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
except:
pass
return info
def get_video_fps(self) -> float:
"""
获取视频帧率
Returns:
视频帧率摄像头返回30.0,视频文件返回原始帧率
"""
if self.is_camera:
return 30.0 # 摄像头固定30FPS
else:
return self.video_fps # 视频文件原始帧率
def __del__(self):
"""析构函数"""
self.stop_capture()
class FPSCounter:
"""FPS计数器"""
def __init__(self, window_size: int = 30):
"""
初始化FPS计数器
Args:
window_size: 滑动窗口大小
"""
self.window_size = window_size
self.frame_times = []
self.last_time = time.time()
def update(self) -> float:
"""
更新FPS计数
Returns:
当前FPS
"""
current_time = time.time()
# 添加当前帧时间
self.frame_times.append(current_time)
# 保持窗口大小
if len(self.frame_times) > self.window_size:
self.frame_times.pop(0)
# 计算FPS
if len(self.frame_times) >= 2:
time_diff = self.frame_times[-1] - self.frame_times[0]
if time_diff > 0:
fps = (len(self.frame_times) - 1) / time_diff
return fps
return 0.0
def get_fps(self) -> float:
"""
获取当前FPS
Returns:
当前FPS
"""
if len(self.frame_times) >= 2:
time_diff = self.frame_times[-1] - self.frame_times[0]
if time_diff > 0:
return (len(self.frame_times) - 1) / time_diff
return 0.0
def reset(self):
"""重置计数器"""
self.frame_times.clear()
self.last_time = time.time()