# 假设这是yolo_detector.py文件的一部分
import torch
import cv2
import logging
import numpy as np
from utils.general import non_max_suppression, scale_coords
from utils.augmentations import letterbox
from utils.torch_utils import select_device
from models.experimental import attempt_load
from PIL import Image
from typing import Tuple, Union
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class YOLO_Detector:
def __init__(self, weights, img_size, conf_thres, iou_thres, classes=None, agnostic=False):
self.weights = weights
self.img_size = img_size
self.conf_thres = conf_thres
self.iou_thres = iou_thres
self.classes = classes
self.agnostic = agnostic
self.device = select_device('')
logging.info("YOLO_Detector initialized with configuration: weights=%s, img_size=%d, conf_thres=%.2f, iou_thres=%.2f", weights, img_size, conf_thres, iou_thres)
def select_device(device_id='0'):
- device_id: 设备ID,默认为'0'。可以是整数,表示CUDA设备的索引;也可以是字符串'gpu',表示优先选择CUDA设备。
- torch.device对象,指向选择的计算设备,可以是CUDA设备或CPU设备。
# 尝试将设备ID转换为整数,以便选择具体的CUDA设备
device_id = int(device_id)
# 如果CUDA可用,返回指定索引的CUDA设备;否则返回CPU设备
return torch.device(f'cuda:{device_id}' if torch.cuda.is_available() else 'cpu')
except ValueError:
# 如果设备ID不是整数,尝试将其解释为字符串指令
# 如果CUDA可用且设备ID为'gpu',返回CUDA设备;否则返回CPU设备
return torch.device('cuda' if torch.cuda.is_available() and device_id.lower() == 'gpu' else 'cpu')
def attempt_load(weights, map_location=torch.device('cpu')):
weights (str or Tensor): 模型权重的文件路径或Tensor。
map_location (torch.device): 指定加载模型时的目标设备。
torch.nn.Module: 加载成功的模型,如果加载失败则返回None。
# 设置日志记录级别为ERROR,只记录错误信息
# 设置日志记录
# 尝试加载模型
# 使用torch.nn.Module.load_state_dict显式加载状态来提高安全性
# 首先尝试直接加载模型
model = torch.load(weights, map_location=map_location)
except FileNotFoundError as e:
# 记录文件找不到的错误信息并返回None
logging.error(f"模型文件找不到: {e}")
return None
except torch.serialization.SerializationError as e:
# 记录模型版本不兼容的错误信息并返回None
logging.error(f"模型版本不兼容: {e}")
return None
except Exception as e:
# 记录其他未知错误信息并返回None
logging.error(f"加载模型时发生未知错误: {e}")
return None
# 检查加载的模型是否是字典类型,并且包含'model'键
# 检查是否模型是一个字典,如果是,尝试从字典中加载'model'键
if isinstance(model, dict) and 'model' in model:
# 如果model不是nn.Module的实例,抛出异常
# 确保model是torch.nn.Module的实例
if not isinstance(model, torch.nn.Module):
raise ValueError("模型字典中的'model'键值不是torch.nn.Module的实例")
# 创建一个新的nn.Module实例,并加载字典中的模型权重
# 加载模型状态
actual_model = torch.nn.Module()
return actual_model
except Exception as e:
# 记录从字典加载模型失败的错误信息并返回None
logging.error(f"从字典加载模型时发生错误: {e}")
return None
# 如果加载的模型不是字典类型,直接返回加载的模型
return model
def letterbox(img: Image.Image, new_shape: Union[int, Tuple[int, int]] = 640, color: Tuple[int, int, int] = (114, 114, 114), auto: bool = True, scaleFill: bool = False, scaleUp: bool = True) -> Image.Image:
- img: PIL Image对象,需要调整大小的图像。
- new_shape: 目标图像的宽度和高度,可以是整数(表示正方形目标)或元组(表示宽高)。
- color: 填充边框的颜色,使用RGB格式的三元组表示。
- auto: 是否根据图像纵横比自动调整大小。
- scaleFill: 是否按需扩展图像以填满整个目标区域。(当前未使用)
- scaleUp: 如果设置为False,则不会对小于目标尺寸的图像进行放大。(当前未使用)
- Image.Image: 调整大小并填充后的图像。
# 验证输入
if not isinstance(img, Image.Image):
raise ValueError("img 参数必须是 PIL Image 对象。")
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
if not isinstance(new_shape, tuple) or len(new_shape) != 2 or any(not isinstance(x, int) or x <= 0 for x in new_shape):
raise ValueError("new_shape 参数必须是正整数或包含两个正整数的元组。")
if not isinstance(color, tuple) or len(color) != 3 or any(not isinstance(x, int) or x < 0 or x > 255 for x in color):
raise ValueError("color 参数必须是 RGB 颜色的三元组,每个值介于 0 和 255 之间。")
# 获取图像当前尺寸并处理边界条件
h, w = img.size
if h == 0 or w == 0:
raise ValueError("图像的宽度或高度为0。")
# 计算新尺寸
new_h, new_w = new_shape
if auto:
aspect_ratio = min(new_h / h, new_w / w)
new_w, new_h = int(w * aspect_ratio), int(h * aspect_ratio)
new_w, new_h = new_shape
# 调整图像大小并在新图像中填充
resized = img.resize((new_w, new_h), Image.BILINEAR)
new_img = Image.new('RGB', (new_shape[1], new_shape[0]), color=color)
offset = ((new_shape[1] - new_w) // 2, (new_shape[0] - new_h) // 2)
new_img.paste(resized, offset)
return new_img
def non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False):
对预测框进行非极大值抑制(Non-Maximum Suppression, NMS)操作。
- pred: 包含检测结果的张量,形状为 (num_boxes, num_classes + 5),其中每个元素包含 [x1, y1, x2, y2, confidence, class_id]
- conf_thres: 保留的置信度阈值
- iou_thres: IOU阈值用于去除重叠的框
- classes: 如果不为None,则只对指定类别的预测框进行NMS
- agnostic: 是否忽略类别信息进行NMS,默认False
- filtered_boxes: 保留下来的预测框,形状为 (num_filtered_boxes, num_classes + 5)
if pred.numel() == 0:
return pred.new_zeros((0, pred.size(-1)))
# 获取置信度大于conf_thres的预测框
indices = torch.where(pred[..., 4] > conf_thres)[0]
# 将pred张量切片为仅包含这些框的张量
pred = pred[indices]
# 如果指定了类别,只保留这些类别的框
if classes is not None:
class_mask = pred[..., 5].unsqueeze(-1) == classes[:, None]
pred = pred[class_mask.any(dim=1)]
# 对预测框进行排序,按置信度降序
sorted_scores, sorted_indices = pred[..., 4].sort(descending=True)
# 初始化保留的框索引列表
kept_indices = []
while sorted_indices.numel() > 0:
i = sorted_indices[0]
# 计算与当前框IOU大于iou_thres的其他框
iou = box_iou(pred[i.unsqueeze(0)], pred[sorted_indices[1:]])
mask = iou < iou_thres
# 更新排序索引
sorted_indices = sorted_indices[mask]
sorted_scores = sorted_scores[mask]
# 根据保留的索引筛选预测框
kept_boxes = pred[torch.tensor(kept_indices, dtype=torch.long)]
# 如果agnostic为True,忽略类别信息
if agnostic:
kept_boxes[..., 5] = 0
return kept_boxes
def scale_coords(img_shape, coords, new_shape):
# 确保输入参数符合预期的格式和类型
if not (isinstance(img_shape, list) and isinstance(new_shape, list) and len(img_shape) == 2 and len(new_shape) == 2):
raise ValueError("img_shape and new_shape must be lists of length 2.")
if not (isinstance(coords, np.ndarray) and coords.shape[1] == 4):
raise ValueError("coords must be a numpy array with 4 columns.")
# 验证形状为正
if any(i <= 0 for i in img_shape) or any(i <= 0 for i in new_shape):
raise ValueError("img_shape and new_shape must have positive elements.")
# 计算缩放比例
ratio_w = new_shape[0] / img_shape[1]
ratio_h = new_shape[1] / img_shape[0]
ratio = min(ratio_w, ratio_h) # 保证比例不会超过任一边界
if ratio < 1:
new_w = int(img_shape[1] * ratio)
new_h = int(img_shape[0] * ratio)
# 重新计算中心点
center_x = (coords[:, 0] + coords[:, 2]) / 2
center_y = (coords[:, 1] + coords[:, 3]) / 2
center_x = center_x * new_w / img_shape[1]
center_y = center_y * new_h / img_shape[0]
# 计算新的宽和高
w = (coords[:, 2] - coords[:, 0]) * ratio
h = (coords[:, 3] - coords[:, 1]) * ratio # 完善高度的更新计算
# 更新坐标
scaled_coords = np.array([center_x - w / 2, center_y - h / 2, center_x + w / 2, center_y + h / 2]).T
return scaled_coords
# 如果ratio >= 1,不需要缩放,直接返回原coords
return coords
def detect_image(weights, img_path, img_size=640, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False):
device = select_device('')
device = select_device('')
logging.info("Selecting device: %s", device)
model = attempt_load(weights, map_location=device)
logging.info("Model loaded successfully")
except Exception as e:
logging.error(f"Error loading the model: {e}")
return []
img = Image.open(img_path)
img = letterbox(img, new_shape=img_size)[0]
img = np.array(img)
logging.info("Image loaded and preprocessed")
except Exception as e:
logging.error(f"Error loading the image: {e}")
return []
img = img[:, :, ::-1].transpose(2, 0, 1)
img = np.ascontiguousarray(img)
img_tensor = torch.from_numpy(img).to(device)
img_tensor = img_tensor.float()
img_tensor /= 255.0
if img_tensor.ndimension() == 3:
img_tensor = img_tensor.unsqueeze(0)
pred = model(img_tensor, augment=False)[0]
logging.info("Inference completed")
except Exception as e:
logging.error(f"Error during inference: {e}")
return []
pred = non_max_suppression(pred, conf_thres, iou_thres, classes=classes, agnostic=agnostic)
if len(pred):
pred[:, :4] = scale_coords(img.shape[1:], pred[:, :4], img_size).round()
logging.info("Non-maximum suppression completed")
return pred