Reid行人重识别

原理 #

在监控拍不到人脸的情况下,ReID可以代替人脸识别来在视频序列中找到我要找到目标对象。那么他的应用就很广了,可以做安防,可以做个人定位,在商场上可以配合推荐系统,搭建出个性化的推荐服务等等

我们利用训练后的网络计算特征从所有搜索到的图像中提取特征,并计算搜索图与地库之间的特征距离。然后根据计算出的距离对它们进行排序。排名越高,相似性越高,上图中,绿色边框的是正确检索的结果,红色边框的是错误检索的结果。

代码仓库:https://github.com/JDAI-CV/fast-reid

提供了针对ReID任务的完整的工具箱,包括训练、评估、微调和模型部署,另外实现了在多个任务中的最先进的模型。

实操 #

模型初始化 #

创建ONNX推理会话,使用CUDA执行提供者

class ReidModel:

    def __init__(self, model_name, model_dir):
        """
        初始化 ReidModel 实例。

        Args:
            model_name (str): Reid 模型的特定名称
            model_dir (str): 包含 Reid ONNX 模型文件的目录路径。
        """
        self._key = b'fm20Za6uii..........AvPdMhxXs='

        img_onnx_model_path = os.path.join(model_dir, "weights.onnx")
        decrypted_model = decrypt_model(img_onnx_model_path, self._key) //模型解密

        img_sess_options = onnxruntime.SessionOptions()
        img_run_options = onnxruntime.RunOptions()
        img_run_options.log_severity_level = 2

        self.input_height = 384
        self.input_width = 128
        self.session = onnxruntime.InferenceSession(decrypted_model,
                                                    sess_options=img_sess_options,
                                                    providers=["CUDAExecutionProvider"])# NVIDIA GPU (CUDA)
        self.input_name = self.session.get_inputs()[0].name

    @staticmethod
    def get_reid_instance() -> ReidModel:
        global _reid_model
        if not _reid_model:
            raise Exception("REID模型未初始化")
        return _reid_model

    @staticmethod
    def initialize_reid(reid_instance: ReidModel):
        global _reid_model
        _reid_model = reid_instance

提取目标图片特征 #

async def reid_image_embedding(image_file: UploadFile = File(None), image_path=Form(None)) -> APIResponse:
    """
    API 端点处理函数:接收图片输入(文件或路径),计算 ReID 特征向量。

    Args:
        image_file (UploadFile, optional): 上传的图片文件。默认为 None。
        image_path (str, optional): 图片的文件路径。默认为 None。

    Returns:
        APIResponse: 包含 ReID 特征向量列表的响应对象。
                     如果出错,响应对象的 error 属性将包含错误信息。
    """
    res = APIResponse()
    try:
        if image_path:
            image = cv2.imread(image_path)
            if image is None:
                raise FileNotFoundError(f"无法读取图片文件: {image_path}")
        elif image_file:
            contents = await image_file.read()
            image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR)
            # 检查是否成功解码
            if image is None:
                raise ValueError(f"无法解码上传的图片内容。请检查文件格式。")
        else:
            raise ValueError(f"无有效图片,请检查请求参数是否正确")
        features = ReidService.compute_image_features(image)
        res.data = features.tolist()
    except Exception as e:
        res.error = str(e)
        traceback.print_exc()
    return res
 @staticmethod
    def compute_image_features(image_bgr):
        """
       提取给定 BGR 图像的 ReID 特征。
       Args:
           image_bgr (np.ndarray): OpenCV BGR 图像。
       Returns:
           np.ndarray: 归一化后的特征向量,如果出错则返回 None。
       """
        from app.lib.model import ReidModel
        model = ReidModel.get_reid_instance()

        if image_bgr is None or image_bgr.size == 0:
            logger.error("[ReID] 输入图像为空。")
            return None

        # 1. 预处理 使用 OpenCV 进行图像尺寸调整
        processed_image = ReidService.reid_preprocess_image(image_bgr, model.input_height, model.input_width)
        if processed_image is None:
            logger.error("[ReID] 预处理失败。")
            return None

        # 2. ONNX 推理
        try:
            feature = model.session.run(None, {model.input_name: processed_image})[0]
            if feature is None:
                logger.error("[ReID] session.run 返回了 None!")
                return None
            # 检查原始特征是否包含 NaN 或 Inf
            if np.any(np.isnan(feature)) or np.any(np.isinf(feature)):
                logger.warning("ONNX 模型直接输出了 NaN 或 Inf 值!")

        except Exception as e:
            logger.error(f"[ReID] session.run 执行时出错: {e}")
            return None

        # 3. 特征归一化
        normalized_feat = ReidService.normalize_feature(feature, axis=1)
        if normalized_feat is None:
            logger.error("[ReID] 归一化失败")
            return None
        # 再次检查归一化后的特征
        if np.any(np.isnan(normalized_feat)) or np.any(np.isinf(normalized_feat)):
            logger.error("归一化后的 ReID 特征包含 NaN 或 Inf 值!")
            return None

        return normalized_feat

获取yolo数据类别的每个目标Reid特征向量 #

async def reid_yolo_image_embedding(
        # 图片路径
        image_path=Form(None),
        # 上传标准图片文件 (JPEG, PNG 等)
        image_file: UploadFile = File(None),
        # 上传原始 NumPy 数组字节 (需要同时提供形状和数据类型)
        image_raw_bytes: UploadFile = File(None),
        # 如果提供了 image_raw_bytes,此参数必须:表示 NumPy 数组形状的 JSON 字符串
        image_shape_json=Form(None),
        # 如果提供了 image_raw_bytes,此参数必须:表示 NumPy 数组数据类型的字符串, 如果使用 image_raw_bytes,此参数必须:表示 NumPy
        # 数组数据类型的字符串(例如,'uint8','float32')
        image_dtype_str=Form(None),
        # 必需:包含边界框信息的 JSON 字符串 必需:包含对象列表的 JSON 字符串。每个对象必须包含 'id'(字符串)和 'bbox_xyxyn'(4个浮点数的列表 [x1_n, y1_n, x2_n,
        # y2_n])字段。例如,'[{\"id\":\"obj1\", \"bbox_xyxyn\":[0.1,0.2,0.5,0.6]}, {\"id\":\"obj2\", \"bbox_xyxyn\":[0.7,
        # 0.8,0.9,0.95]}]'
        objects_json=Form(...)
) -> APIResponse:
    """
    接收图片输入(来自路径、文件上传或原始字节)和多个边界框坐标 (xyxyn 格式),
    计算 ReID 特征。

    图片输入方式(请提供其中一种):
    1. image_path:服务器端的图片文件路径。
    2. image_file:标准图片文件上传。
    3. image_raw_bytes, image_shape_json, image_dtype_str:原始 NumPy 数组字节及其元数据。

    Args:
        image_path (str, optional): 服务器端的图片文件路径。
        image_file (UploadFile, optional): 上传的图片文件。
        image_raw_bytes (UploadFile, optional): NumPy 数组的原始字节。
        image_shape_json (str, optional): NumPy 数组形状的 JSON 字符串(使用 image_raw_bytes 时必需)。
        image_dtype_str (str, optional): NumPy 数组数据类型的字符串(使用 image_raw_bytes 时必需)。
        objects_json (str): 包含对象及其 'id' 和 'bbox_xyxyn' 的 JSON 字符串。

    Returns:
        APIResponse: 包含 {'id': ..., 'features': [...]} 结果列表或错误信息的响应对象。
    """
    res = APIResponse()
    image: Optional[np.ndarray] = None  # 用于存储加载或重建的 NumPy 图像

    input_method_used = None

    try:
        # 1. 确定图像源并加载/重建图像
        if image_path:
            image = cv2.imread(image_path)
            if image is None:
                raise FileNotFoundError(f"无法读取图片文件: {image_path}. 请检查路径和文件是否存在。")
            input_method_used = "path"

        elif image_file is not None:
            contents = await image_file.read()
            if not contents:
                raise ValueError("上传的图片文件内容为空。")
            np_arr = np.frombuffer(contents, np.uint8)
            image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
            if image is None:
                raise ValueError(
                    f"无法解码上传的图片文件 '{image_file.filename}'. 请检查文件格式是否为标准图片 (JPEG, PNG 等)。")
            input_method_used = "file_upload"

        elif image_raw_bytes is not None:
            if image_shape_json is None or image_dtype_str is None:
                raise ValueError("使用 image_raw_bytes 输入时,image_shape_json 和 image_dtype_str 也是必须的。")

            raw_contents = await image_raw_bytes.read()
            if not raw_contents:
                raise ValueError("上传的原始字节内容为空。")

            try:
                image_shape_tuple = ast.literal_eval(image_shape_json)
                if not isinstance(image_shape_tuple, (tuple, list)):
                    raise ValueError(f"image_shape_json 格式错误,期望列表或元组字符串,实际收到: {image_shape_json}")

                image_dtype = np.dtype(image_dtype_str)

            except (SyntaxError, ValueError, TypeError) as e:
                raise ValueError(f"解析图像形状或数据类型失败: {e}") from e

            expected_size = np.prod(image_shape_tuple) * image_dtype.itemsize
            if len(raw_contents) != expected_size:
                raise ValueError(
                    f"上传字节数 ({len(raw_contents)}) 与根据形状 ({image_shape_tuple}) 和数据类型 ({image_dtype_str}) 计算的预期大小 ({expected_size}) 不匹配。")

            # 重建np数组
            image = np.frombuffer(raw_contents, dtype=image_dtype).reshape(image_shape_tuple)
            input_method_used = "raw_bytes"

        else:
            # No valid image input provided
            raise ValueError(
                "无有效图片输入。请使用 image_path, image_file, 或 (image_raw_bytes, image_shape_json, image_dtype_str) 中的一种方式提供图片。")

        if image is None or image.size == 0:
            raise ValueError("图像加载或重建失败,或结果为空图像。")

        if image.ndim < 2:
            raise ValueError(f"加载/重建的图像 NumPy 数组维度 {image.shape} 不是预期的二维或三维格式。")

        # 2. 解析和验证 objects_json 输入
        objects_to_process: List[BBoxObject] = []
        try:
            objects_list_raw = json.loads(objects_json)
            if not isinstance(objects_list_raw, list):
                raise ValueError("objects_json 应该是一个 JSON 列表。")
            objects_to_process = [BBoxObject(**obj_data) for obj_data in objects_list_raw]

        except (json.JSONDecodeError, ValidationError) as e:
            raise ValueError(f"无效的 objects_json 输入格式: {e}") from e
        except Exception as e:
            raise ValueError(f"解析 objects_json 时发生意外错误: {e}") from e

        #  3. 处理每个检测的对象并计算特征值
        results: List[ReIDResultItem] = []
        processed_count = 0
        for obj in objects_to_process:
            obj_id = obj.id
            bbox_xyxyn = obj.bbox_xyxyn

            try:
                # 根据坐标剪裁
                cropped_image = ReidService.crop_yolo_box(image, bbox_xyxyn)

                # Compute features
                features_np = ReidService.compute_image_features(cropped_image)
                features_list = features_np.tolist()
                results.append(ReIDResultItem(id=obj_id, features=features_list))
                processed_count += 1

            except Exception as e:
                logging.error(f"处理对象 {obj_id} 时发生错误: {e}")
                pass  # Skip

        # 4. Prepare response data
        res.data = [item.dict() for item in results]

    except Exception as e:
        res.error = f"请求处理失败: {str(e)}"
        traceback.print_exc()

    return res

对比特征向量相似度 #

async def reid_compute_cosine_similarity(request: CosineSimilarityRequest) -> APIResponse:
    """
    计算两个 ReID 特征向量的余弦相似度。

    Args:
        request (CosineSimilarityRequest): 包含两个特征向量列表的请求体。

    Returns:
        APIResponse: 包含余弦相似度得分的响应对象。
    """
    res = APIResponse()
    try:
        # 从请求体获取特征列表
        feat1_list = request.feature1
        feat2_list = request.feature2

        # 将列表转换为 NumPy 数组
        # 由于 CosineSimilarityRequest 要求输入是 List[List[float]],
        # 我们可以安全地转换为 NumPy 数组,并假设它的形状是 (1, D)。
        # 然后展平为 (D,) 以便传递给 cosine_similarity。
        try:
            feat1_np = np.array(feat1_list, dtype=np.float32)  # 转换为 numpy 数组
            feat2_np = np.array(feat2_list, dtype=np.float32)

            if feat1_np.ndim != 2 or feat1_np.shape[0] != 1 or feat2_np.ndim != 2 or feat2_np.shape[0] != 1:
                raise ValueError("输入特征向量列表的形状不符合预期(期望 [[...]])")

            # 展平数组为一维向量 (D,)
            feat1_np_flat = feat1_np.flatten()
            feat2_np_flat = feat2_np.flatten()

        except Exception as convert_error:
            # 转换失败通常是输入格式问题
            raise ValueError(f"无法将输入列表转换为 NumPy 数组或形状不正确: {convert_error}")

        # 调用 ReidService 的 cosine_similarity 方法
        # 传递展平后的一维 NumPy 数组
        similarity_score = ReidService.cosine_similarity(feat1_np_flat, feat2_np_flat)
        # similarity_score = ReidService.cosine_similarity(feat1_np, feat2_np)

        # 将结果放入响应对象的 data 字段
        res.data = {"similarity": float(similarity_score)}  # 确保是标准的 float 类型

    except Exception as e:
        res.error = str(e)
        traceback.print_exc()

    return res

完整代码 #

前后逻辑可参照fast-reid/tools/deploy/onnx_interence.py

import logging
import traceback
import cv2
import json
import ast
import numpy as np
from pydantic import BaseModel, ValidationError, Field
from typing import List, Dict, Any, Optional, Union
from app.model.response import APIResponse
from app.services.reid import ReidService
from fastapi import UploadFile, File, Form
from app.model.reid import *


async def reid_image_embedding(image_file: UploadFile = File(None), image_path=Form(None)) -> APIResponse:
    """
    API 端点处理函数:接收图片输入(文件或路径),计算 ReID 特征向量。

    Args:
        image_file (UploadFile, optional): 上传的图片文件。默认为 None。
        image_path (str, optional): 图片的文件路径。默认为 None。

    Returns:
        APIResponse: 包含 ReID 特征向量列表的响应对象。
                     如果出错,响应对象的 error 属性将包含错误信息。
    """
    res = APIResponse()
    try:
        if image_path:
            image = cv2.imread(image_path)
            if image is None:
                raise FileNotFoundError(f"无法读取图片文件: {image_path}")
        elif image_file:
            contents = await image_file.read()
            image = cv2.imdecode(np.frombuffer(contents, np.uint8), cv2.IMREAD_COLOR)
            # 检查是否成功解码
            if image is None:
                raise ValueError(f"无法解码上传的图片内容。请检查文件格式。")
        else:
            raise ValueError(f"无有效图片,请检查请求参数是否正确")
        features = ReidService.compute_image_features(image)
        res.data = features.tolist()
    except Exception as e:
        res.error = str(e)
        traceback.print_exc()
    return res


async def reid_yolo_image_embedding(
        # 图片路径
        image_path=Form(None),
        # 上传标准图片文件 (JPEG, PNG 等)
        image_file: UploadFile = File(None),
        # 上传原始 NumPy 数组字节 (需要同时提供形状和数据类型)
        image_raw_bytes: UploadFile = File(None),
        # 如果提供了 image_raw_bytes,此参数必须:表示 NumPy 数组形状的 JSON 字符串
        image_shape_json=Form(None),
        # 如果提供了 image_raw_bytes,此参数必须:表示 NumPy 数组数据类型的字符串, 如果使用 image_raw_bytes,此参数必须:表示 NumPy
        # 数组数据类型的字符串(例如,'uint8','float32')
        image_dtype_str=Form(None),
        # 必需:包含边界框信息的 JSON 字符串 必需:包含对象列表的 JSON 字符串。每个对象必须包含 'id'(字符串)和 'bbox_xyxyn'(4个浮点数的列表 [x1_n, y1_n, x2_n,
        # y2_n])字段。例如,'[{\"id\":\"obj1\", \"bbox_xyxyn\":[0.1,0.2,0.5,0.6]}, {\"id\":\"obj2\", \"bbox_xyxyn\":[0.7,
        # 0.8,0.9,0.95]}]'
        objects_json=Form(...)
) -> APIResponse:
    """
    接收图片输入(来自路径、文件上传或原始字节)和多个边界框坐标 (xyxyn 格式),
    计算 ReID 特征。

    图片输入方式(请提供其中一种):
    1. image_path:服务器端的图片文件路径。
    2. image_file:标准图片文件上传。
    3. image_raw_bytes, image_shape_json, image_dtype_str:原始 NumPy 数组字节及其元数据。

    Args:
        image_path (str, optional): 服务器端的图片文件路径。
        image_file (UploadFile, optional): 上传的图片文件。
        image_raw_bytes (UploadFile, optional): NumPy 数组的原始字节。
        image_shape_json (str, optional): NumPy 数组形状的 JSON 字符串(使用 image_raw_bytes 时必需)。
        image_dtype_str (str, optional): NumPy 数组数据类型的字符串(使用 image_raw_bytes 时必需)。
        objects_json (str): 包含对象及其 'id' 和 'bbox_xyxyn' 的 JSON 字符串。

    Returns:
        APIResponse: 包含 {'id': ..., 'features': [...]} 结果列表或错误信息的响应对象。
    """
    res = APIResponse()
    image: Optional[np.ndarray] = None  # 用于存储加载或重建的 NumPy 图像

    input_method_used = None

    try:
        # 1. 确定图像源并加载/重建图像
        if image_path:
            image = cv2.imread(image_path)
            if image is None:
                raise FileNotFoundError(f"无法读取图片文件: {image_path}. 请检查路径和文件是否存在。")
            input_method_used = "path"

        elif image_file is not None:
            contents = await image_file.read()
            if not contents:
                raise ValueError("上传的图片文件内容为空。")
            np_arr = np.frombuffer(contents, np.uint8)
            image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
            if image is None:
                raise ValueError(
                    f"无法解码上传的图片文件 '{image_file.filename}'. 请检查文件格式是否为标准图片 (JPEG, PNG 等)。")
            input_method_used = "file_upload"

        elif image_raw_bytes is not None:
            if image_shape_json is None or image_dtype_str is None:
                raise ValueError("使用 image_raw_bytes 输入时,image_shape_json 和 image_dtype_str 也是必须的。")

            raw_contents = await image_raw_bytes.read()
            if not raw_contents:
                raise ValueError("上传的原始字节内容为空。")

            try:
                image_shape_tuple = ast.literal_eval(image_shape_json)
                if not isinstance(image_shape_tuple, (tuple, list)):
                    raise ValueError(f"image_shape_json 格式错误,期望列表或元组字符串,实际收到: {image_shape_json}")

                image_dtype = np.dtype(image_dtype_str)

            except (SyntaxError, ValueError, TypeError) as e:
                raise ValueError(f"解析图像形状或数据类型失败: {e}") from e

            expected_size = np.prod(image_shape_tuple) * image_dtype.itemsize
            if len(raw_contents) != expected_size:
                raise ValueError(
                    f"上传字节数 ({len(raw_contents)}) 与根据形状 ({image_shape_tuple}) 和数据类型 ({image_dtype_str}) 计算的预期大小 ({expected_size}) 不匹配。")

            # 重建np数组
            image = np.frombuffer(raw_contents, dtype=image_dtype).reshape(image_shape_tuple)
            input_method_used = "raw_bytes"

        else:
            # No valid image input provided
            raise ValueError(
                "无有效图片输入。请使用 image_path, image_file, 或 (image_raw_bytes, image_shape_json, image_dtype_str) 中的一种方式提供图片。")

        if image is None or image.size == 0:
            raise ValueError("图像加载或重建失败,或结果为空图像。")

        if image.ndim < 2:
            raise ValueError(f"加载/重建的图像 NumPy 数组维度 {image.shape} 不是预期的二维或三维格式。")

        # 2. 解析和验证 objects_json 输入
        objects_to_process: List[BBoxObject] = []
        try:
            objects_list_raw = json.loads(objects_json)
            if not isinstance(objects_list_raw, list):
                raise ValueError("objects_json 应该是一个 JSON 列表。")
            objects_to_process = [BBoxObject(**obj_data) for obj_data in objects_list_raw]

        except (json.JSONDecodeError, ValidationError) as e:
            raise ValueError(f"无效的 objects_json 输入格式: {e}") from e
        except Exception as e:
            raise ValueError(f"解析 objects_json 时发生意外错误: {e}") from e

        #  3. 处理每个检测的对象并计算特征值
        results: List[ReIDResultItem] = []
        processed_count = 0
        for obj in objects_to_process:
            obj_id = obj.id
            bbox_xyxyn = obj.bbox_xyxyn

            try:
                # 根据坐标剪裁
                cropped_image = ReidService.crop_yolo_box(image, bbox_xyxyn)

                # Compute features
                features_np = ReidService.compute_image_features(cropped_image)
                features_list = features_np.tolist()
                results.append(ReIDResultItem(id=obj_id, features=features_list))
                processed_count += 1

            except Exception as e:
                logging.error(f"处理对象 {obj_id} 时发生错误: {e}")
                pass  # Skip

        # 4. Prepare response data
        res.data = [item.dict() for item in results]

    except Exception as e:
        res.error = f"请求处理失败: {str(e)}"
        traceback.print_exc()

    return res


async def reid_compute_cosine_similarity(request: CosineSimilarityRequest) -> APIResponse:
    """
    计算两个 ReID 特征向量的余弦相似度。

    Args:
        request (CosineSimilarityRequest): 包含两个特征向量列表的请求体。

    Returns:
        APIResponse: 包含余弦相似度得分的响应对象。
    """
    res = APIResponse()
    try:
        # 从请求体获取特征列表
        feat1_list = request.feature1
        feat2_list = request.feature2

        # 将列表转换为 NumPy 数组
        # 由于 CosineSimilarityRequest 要求输入是 List[List[float]],
        # 我们可以安全地转换为 NumPy 数组,并假设它的形状是 (1, D)。
        # 然后展平为 (D,) 以便传递给 cosine_similarity。
        try:
            feat1_np = np.array(feat1_list, dtype=np.float32)  # 转换为 numpy 数组
            feat2_np = np.array(feat2_list, dtype=np.float32)

            if feat1_np.ndim != 2 or feat1_np.shape[0] != 1 or feat2_np.ndim != 2 or feat2_np.shape[0] != 1:
                raise ValueError("输入特征向量列表的形状不符合预期(期望 [[...]])")

            # 展平数组为一维向量 (D,)
            feat1_np_flat = feat1_np.flatten()
            feat2_np_flat = feat2_np.flatten()

        except Exception as convert_error:
            # 转换失败通常是输入格式问题
            raise ValueError(f"无法将输入列表转换为 NumPy 数组或形状不正确: {convert_error}")

        # 调用 ReidService 的 cosine_similarity 方法
        # 传递展平后的一维 NumPy 数组
        similarity_score = ReidService.cosine_similarity(feat1_np_flat, feat2_np_flat)
        # similarity_score = ReidService.cosine_similarity(feat1_np, feat2_np)

        # 将结果放入响应对象的 data 字段
        res.data = {"similarity": float(similarity_score)}  # 确保是标准的 float 类型

    except Exception as e:
        res.error = str(e)
        traceback.print_exc()

    return res
from typing import List
from pydantic import BaseModel, Field


class BBoxObject(BaseModel):
    id: str
    # 明确坐标格式和长度
    bbox_xyxyn: List[float] = Field(..., min_items=4, max_items=4,
                                    description="Bounding box in normalized xyxyn format [x1_n, y1_n, x2_n, y2_n]")


class ReIDResultItem(BaseModel):
    id: str
    features: List[List[float]]  # 特征向量列表


class CosineSimilarityRequest(BaseModel):
    """
    计算余弦相似度请求体的数据模型。
    """
    feature1: List[List[float]]  # 第一个特征向量列表 (假设 [[...]])
    feature2: List[List[float]]  # 第二个特征向量列表 (假设 [[...]])
import numpy as np
import cv2
from app.lib.log import logger


class ReidService:

    @staticmethod
    def reid_preprocess_image(image_bgr, image_height, image_width):
        if image_bgr is None or image_bgr.size == 0:
            return None
        image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
        try:
            img_resized = cv2.resize(image_rgb, (image_width, image_height), interpolation=cv2.INTER_CUBIC)# 双三次插值 - 质量最高,速度较慢
        except cv2.error as e:
            logger.error(f"cv2.resize 失败: {e}. 输入图像形状: {image_bgr.shape}")
            return None
        img_tensor = img_resized.astype("float32").transpose(2, 0, 1)[np.newaxis]#将图像转换为深度学习模型所需的张量格式
        return img_tensor

    @staticmethod
    def normalize_feature(nparray, order=2, axis=-1):
        if nparray is None or nparray.size == 0:
            return nparray
        if np.any(np.isnan(nparray)) or np.any(np.isinf(nparray)):
            logger.warning("归一化前的特征包含 NaN 或 Inf。")
        norm = np.linalg.norm(nparray, ord=order, axis=axis, keepdims=True)
        denominator = norm + np.finfo(np.float32).eps
        if np.any(denominator == 0):
            logger.warning("特征向量范数为零。")
            return nparray
        result = nparray / denominator
        if np.any(np.isnan(result)) or np.any(np.isinf(result)):
            logger.warning("归一化后的特征包含 NaN 或 Inf。")
        return result

    @staticmethod
    def cosine_similarity(feat1, feat2):
        if feat1 is None or feat2 is None:
            return 0.0
        feat1 = feat1.flatten()
        feat2 = feat2.flatten()
        if feat1.size == 0 or feat2.size == 0 or feat1.shape != feat2.shape:
            logger.warning(f"特征形状不匹配或为空。f1:{feat1.shape}, f2:{feat2.shape}")
            return 0.0
        if np.any(np.isnan(feat1)) or np.any(np.isnan(feat2)) or np.any(np.isinf(feat1)) or np.any(
                np.isinf(feat2)):
            logger.warning("特征向量含NaN/Inf")
            return 0.0
        dot_product = np.dot(feat1, feat2)
        return np.clip(dot_product, -1.0, 1.0)

    @staticmethod
    def compute_image_features(image_bgr):
        """
       提取给定 BGR 图像的 ReID 特征。
       Args:
           image_bgr (np.ndarray): OpenCV BGR 图像。
       Returns:
           np.ndarray: 归一化后的特征向量,如果出错则返回 None。
       """
        from app.lib.model import ReidModel
        model = ReidModel.get_reid_instance()

        if image_bgr is None or image_bgr.size == 0:
            logger.error("[ReID] 输入图像为空。")
            return None

        # 1. 预处理
        processed_image = ReidService.reid_preprocess_image(image_bgr, model.input_height, model.input_width)
        if processed_image is None:
            logger.error("[ReID] 预处理失败。")
            return None

        # 2. ONNX 推理
        try:
            feature = model.session.run(None, {model.input_name: processed_image})[0]
            if feature is None:
                logger.error("[ReID] session.run 返回了 None!")
                return None
            # 检查原始特征是否包含 NaN 或 Inf
            if np.any(np.isnan(feature)) or np.any(np.isinf(feature)):
                logger.warning("ONNX 模型直接输出了 NaN 或 Inf 值!")

        except Exception as e:
            logger.error(f"[ReID] session.run 执行时出错: {e}")
            return None

        # 3. 特征归一化
        normalized_feat = ReidService.normalize_feature(feature, axis=1)
        if normalized_feat is None:
            logger.error("[ReID] 归一化失败")
            return None
        # 再次检查归一化后的特征
        if np.any(np.isnan(normalized_feat)) or np.any(np.isinf(normalized_feat)):
            logger.error("归一化后的 ReID 特征包含 NaN 或 Inf 值!")
            return None

        return normalized_feat

    @staticmethod
    def crop_yolo_box(image_bgr, yolo_xyxyn):
        """
        根据YOLO的归一化xyxyn坐标裁剪图像区域。

        参数:
            image_bgr (np.ndarray): OpenCV格式的BGR图像,形状为(H, W, C)。
            yolo_xyxyn (tuple/list/np.ndarray): 归一化的YOLO坐标,格式为(x_min_norm, y_min_norm, x_max_norm, y_max_norm)。

        返回:
            np.ndarray: 裁剪后的BGR图像区域。
        """
        orig_h, orig_w = image_bgr.shape[:2]
        # 将yolo_xyxyn解包为 x_min_norm, y_min_norm, x_max_norm, y_max_norm
        x_min_norm, y_min_norm, x_max_norm, y_max_norm = yolo_xyxyn

        # 将归一化坐标转换为实际像素坐标
        # 对于xyxyn格式,我们直接得到边界框的左上角和右下角坐标
        x1 = x_min_norm * orig_w
        y1 = y_min_norm * orig_h
        x2 = x_max_norm * orig_w
        y2 = y_max_norm * orig_h

        # 转换为整数并限制在图像范围内
        x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
        x1 = max(0, x1)
        y1 = max(0, y1)
        x2 = min(orig_w, x2)
        y2 = min(orig_h, y2)

        # 裁剪图像区域
        cropped_image = image_bgr[y1:y2, x1:x2]

        return cropped_image

相关参考文章 #

fast-reid入门教程

Fast-ReID 训练自己的数据集调优记录(一)

Fast-ReID 训练自己的数据集调优记录(二)

YoloV5 + deepsort + Fast-ReID 完整行人重识别系统(三)

ReID专栏(一) 任务与数据集概述

passvitb-image-reid-person 模型介绍

ReID简介

CVPR 2024 | ReID迎来大一统?一个模型拿下多类主流ReID任务新SOTA!开启ReID新纪元!

小白入门系列——ReID(一):什么是ReID?如何做ReID?ReID数据集?ReID评测指标?

行人重识别模型

详解ReID的各部分组成及Trick——基于FastReID