Decord

分关键帧记录 #

import numpy as np
from typing import Union, BinaryIO
import decord

def _read_video_key_frames2(
    video_file: Union[str, BinaryIO],
    start_time: int,
    end_time: int
) -> Generator[tuple[np.ndarray, int, int], None, None]:
    """
    获取视频中的关键帧
    
    :param video_file: 视频文件路径/流
    :param start_time: 视频开始时间(毫秒)
    :param end_time: 视频结束时间(毫秒)
    :return: 生成器,产出(帧numpy数组,累计计数,时间戳)
    """
    try:
        # 初始化视频读取器(CPU 解码)
        video = decord.VideoReader(video_file)
    except Exception as e:
        raise RuntimeError(f"Failed to open video with Decord: {str(e)}")
    
    frame_count = 0
    total_frames = len(video)
    print(f"Total frames: {total_frames}")
    
    fps = video.get_avg_fps()
    
    # 警告:部分MP4格式(如警翼.MP4)可能出现读取异常
    for i in range(total_frames):
        try:
            frame = video[i].asnumpy()  # 读取帧数据
        except Exception as e:
            print(f"Skipping corrupted frame {i}: {e}")
            continue
        
        pts_sec = frame.time
        pts_ms = int(round(pts_sec * 1000))
        
        # 时间过滤
        if pts_ms < start_time or pts_ms > end_time:
            continue
        
        # 避免重复帧处理(同一时间点的多帧只保留第一个)
        # if pts_ms == prev_pts:
        #     continue
        # prev_pts = pts_ms
        
        frame_count += 1
        yield (frame, frame_count, pts_ms)