分关键帧记录
#
import numpy as np
from typing import Union, BinaryIO
import decord
def _read_video_key_frames2(
video_file: Union[str, BinaryIO],
start_time: int,
end_time: int
) -> Generator[tuple[np.ndarray, int, int], None, None]:
"""
获取视频中的关键帧
:param video_file: 视频文件路径/流
:param start_time: 视频开始时间(毫秒)
:param end_time: 视频结束时间(毫秒)
:return: 生成器,产出(帧numpy数组,累计计数,时间戳)
"""
try:
# 初始化视频读取器(CPU 解码)
video = decord.VideoReader(video_file)
except Exception as e:
raise RuntimeError(f"Failed to open video with Decord: {str(e)}")
frame_count = 0
total_frames = len(video)
print(f"Total frames: {total_frames}")
fps = video.get_avg_fps()
# 警告:部分MP4格式(如警翼.MP4)可能出现读取异常
for i in range(total_frames):
try:
frame = video[i].asnumpy() # 读取帧数据
except Exception as e:
print(f"Skipping corrupted frame {i}: {e}")
continue
pts_sec = frame.time
pts_ms = int(round(pts_sec * 1000))
# 时间过滤
if pts_ms < start_time or pts_ms > end_time:
continue
# 避免重复帧处理(同一时间点的多帧只保留第一个)
# if pts_ms == prev_pts:
# continue
# prev_pts = pts_ms
frame_count += 1
yield (frame, frame_count, pts_ms)