2025-08-28 12:00:56 +08:00

280 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频捕获管理
支持摄像头和视频文件的切换和管理
"""
import cv2
import os
import time
import threading
from typing import Optional, Tuple
class VideoCapture:
"""视频捕获管理类"""
def __init__(self):
"""
初始化视频捕获管理器
"""
self.cap = None
self.is_camera = True
self.video_path = None
self.fps_counter = FPSCounter()
self.frame_lock = threading.Lock()
self.current_frame = None
self.is_running = False
self.video_fps = 30.0 # 视频原始帧率
# 设置视频文件路径
self.video_file_path = os.path.join(os.path.dirname(__file__), "..", "video.mp4")
def start_capture(self, use_camera: int = 1) -> bool:
"""
开始视频捕获
Args:
use_camera: 1使用摄像头0使用视频文件
Returns:
是否成功启动
"""
self.stop_capture()
self.is_camera = bool(use_camera)
try:
if self.is_camera:
# 使用摄像头
self.cap = cv2.VideoCapture(0)
if not self.cap.isOpened():
# 尝试其他摄像头索引
for i in range(1, 5):
self.cap = cv2.VideoCapture(i)
if self.cap.isOpened():
break
else:
print("无法打开摄像头")
return False
# 设置摄像头参数
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.cap.set(cv2.CAP_PROP_FPS, 30)
print("摄像头启动成功")
else:
# 使用视频文件
if not os.path.exists(self.video_file_path):
print(f"视频文件不存在: {self.video_file_path}")
return False
self.cap = cv2.VideoCapture(self.video_file_path)
if not self.cap.isOpened():
print(f"无法打开视频文件: {self.video_file_path}")
return False
# 获取视频原始帧率
self.video_fps = self.cap.get(cv2.CAP_PROP_FPS)
if self.video_fps <= 0:
self.video_fps = 25.0 # 默认帧率
print(f"视频文件加载成功: {self.video_file_path}, FPS: {self.video_fps}")
self.is_running = True
self.fps_counter.reset()
return True
except Exception as e:
print(f"启动视频捕获失败: {e}")
return False
def stop_capture(self):
"""停止视频捕获"""
self.is_running = False
if self.cap is not None:
self.cap.release()
self.cap = None
with self.frame_lock:
self.current_frame = None
print("视频捕获已停止")
def get_frame(self) -> Tuple[Optional[cv2.Mat], float]:
"""
获取当前帧
Returns:
(frame, fps): 当前帧和FPS
"""
if not self.is_running or self.cap is None:
return None, 0.0
try:
ret, frame = self.cap.read()
if not ret:
if not self.is_camera:
# 视频文件播放完毕,重新开始(循环播放)
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = self.cap.read()
if not ret:
return None, 0.0
# 更新FPS计数器
fps = self.fps_counter.update()
# 在帧上绘制FPS信息
frame_with_fps = self._draw_fps(frame, fps)
with self.frame_lock:
self.current_frame = frame_with_fps.copy()
return frame_with_fps, fps
except Exception as e:
print(f"获取帧失败: {e}")
return None, 0.0
def _draw_fps(self, frame: cv2.Mat, fps: float) -> cv2.Mat:
"""
在帧上绘制FPS信息
Args:
frame: 输入帧
fps: 当前FPS
Returns:
绘制了FPS的帧
"""
result_frame = frame.copy()
# FPS文本
fps_text = f"FPS: {fps:.1f}"
# 文本参数
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.7
color = (0, 255, 0) # 绿色
thickness = 2
# 获取文本尺寸
text_size = cv2.getTextSize(fps_text, font, font_scale, thickness)[0]
# 绘制背景矩形
cv2.rectangle(result_frame,
(10, 10),
(20 + text_size[0], 20 + text_size[1]),
(0, 0, 0), -1)
# 绘制FPS文本
cv2.putText(result_frame, fps_text,
(15, 15 + text_size[1]),
font, font_scale, color, thickness)
return result_frame
def get_capture_info(self) -> dict:
"""
获取捕获信息
Returns:
捕获信息字典
"""
info = {
'is_running': self.is_running,
'is_camera': self.is_camera,
'video_path': self.video_file_path if not self.is_camera else None,
'fps': self.fps_counter.get_fps(),
'video_fps': self.video_fps
}
if self.cap is not None:
try:
info['width'] = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
info['height'] = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if not self.is_camera:
info['total_frames'] = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
info['current_frame'] = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))
except:
pass
return info
def get_video_fps(self) -> float:
"""
获取视频帧率
Returns:
视频帧率摄像头返回30.0,视频文件返回原始帧率
"""
if self.is_camera:
return 30.0 # 摄像头固定30FPS
else:
return self.video_fps # 视频文件原始帧率
def __del__(self):
"""析构函数"""
self.stop_capture()
class FPSCounter:
"""FPS计数器"""
def __init__(self, window_size: int = 30):
"""
初始化FPS计数器
Args:
window_size: 滑动窗口大小
"""
self.window_size = window_size
self.frame_times = []
self.last_time = time.time()
def update(self) -> float:
"""
更新FPS计数
Returns:
当前FPS
"""
current_time = time.time()
# 添加当前帧时间
self.frame_times.append(current_time)
# 保持窗口大小
if len(self.frame_times) > self.window_size:
self.frame_times.pop(0)
# 计算FPS
if len(self.frame_times) >= 2:
time_diff = self.frame_times[-1] - self.frame_times[0]
if time_diff > 0:
fps = (len(self.frame_times) - 1) / time_diff
return fps
return 0.0
def get_fps(self) -> float:
"""
获取当前FPS
Returns:
当前FPS
"""
if len(self.frame_times) >= 2:
time_diff = self.frame_times[-1] - self.frame_times[0]
if time_diff > 0:
return (len(self.frame_times) - 1) / time_diff
return 0.0
def reset(self):
"""重置计数器"""
self.frame_times.clear()
self.last_time = time.time()