|
|
@ -1,285 +0,0 @@
|
|
|
|
# 假设这是yolo_detector.py文件的一部分
|
|
|
|
|
|
|
|
import torch
|
|
|
|
|
|
|
|
import cv2
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from utils.general import non_max_suppression, scale_coords
|
|
|
|
|
|
|
|
from utils.augmentations import letterbox
|
|
|
|
|
|
|
|
from utils.torch_utils import select_device
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from models.experimental import attempt_load
|
|
|
|
|
|
|
|
from PIL import Image
|
|
|
|
|
|
|
|
from typing import Tuple, Union
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 配置日志
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class YOLO_Detector:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, weights, img_size, conf_thres, iou_thres, classes=None, agnostic=False):
|
|
|
|
|
|
|
|
self.weights = weights
|
|
|
|
|
|
|
|
self.img_size = img_size
|
|
|
|
|
|
|
|
self.conf_thres = conf_thres
|
|
|
|
|
|
|
|
self.iou_thres = iou_thres
|
|
|
|
|
|
|
|
self.classes = classes
|
|
|
|
|
|
|
|
self.agnostic = agnostic
|
|
|
|
|
|
|
|
self.device = select_device('')
|
|
|
|
|
|
|
|
logging.info("YOLO_Detector initialized with configuration: weights=%s, img_size=%d, conf_thres=%.2f, iou_thres=%.2f", weights, img_size, conf_thres, iou_thres)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def select_device(device_id='0'):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
根据设备ID选择计算设备。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
|
|
- device_id: 设备ID,默认为'0'。可以是整数,表示CUDA设备的索引;也可以是字符串'gpu',表示优先选择CUDA设备。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
|
|
- torch.device对象,指向选择的计算设备,可以是CUDA设备或CPU设备。
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
# 尝试将设备ID转换为整数,以便选择具体的CUDA设备
|
|
|
|
|
|
|
|
device_id = int(device_id)
|
|
|
|
|
|
|
|
# 如果CUDA可用,返回指定索引的CUDA设备;否则返回CPU设备
|
|
|
|
|
|
|
|
return torch.device(f'cuda:{device_id}' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
|
|
# 如果设备ID不是整数,尝试将其解释为字符串指令
|
|
|
|
|
|
|
|
# 如果CUDA可用且设备ID为'gpu',返回CUDA设备;否则返回CPU设备
|
|
|
|
|
|
|
|
return torch.device('cuda' if torch.cuda.is_available() and device_id.lower() == 'gpu' else 'cpu')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def attempt_load(weights, map_location=torch.device('cpu')):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
尝试加载预训练模型。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
|
|
weights (str or Tensor): 模型权重的文件路径或Tensor。
|
|
|
|
|
|
|
|
map_location (torch.device): 指定加载模型时的目标设备。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
|
|
torch.nn.Module: 加载成功的模型,如果加载失败则返回None。
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# 设置日志记录级别为ERROR,只记录错误信息
|
|
|
|
|
|
|
|
# 设置日志记录
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.ERROR)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
# 尝试加载模型
|
|
|
|
|
|
|
|
# 使用torch.nn.Module.load_state_dict显式加载状态来提高安全性
|
|
|
|
|
|
|
|
# 首先尝试直接加载模型
|
|
|
|
|
|
|
|
model = torch.load(weights, map_location=map_location)
|
|
|
|
|
|
|
|
except FileNotFoundError as e:
|
|
|
|
|
|
|
|
# 记录文件找不到的错误信息并返回None
|
|
|
|
|
|
|
|
logging.error(f"模型文件找不到: {e}")
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
except torch.serialization.SerializationError as e:
|
|
|
|
|
|
|
|
# 记录模型版本不兼容的错误信息并返回None
|
|
|
|
|
|
|
|
logging.error(f"模型版本不兼容: {e}")
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
# 记录其他未知错误信息并返回None
|
|
|
|
|
|
|
|
logging.error(f"加载模型时发生未知错误: {e}")
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 检查加载的模型是否是字典类型,并且包含'model'键
|
|
|
|
|
|
|
|
# 检查是否模型是一个字典,如果是,尝试从字典中加载'model'键
|
|
|
|
|
|
|
|
if isinstance(model, dict) and 'model' in model:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
# 如果model不是nn.Module的实例,抛出异常
|
|
|
|
|
|
|
|
# 确保model是torch.nn.Module的实例
|
|
|
|
|
|
|
|
if not isinstance(model, torch.nn.Module):
|
|
|
|
|
|
|
|
raise ValueError("模型字典中的'model'键值不是torch.nn.Module的实例")
|
|
|
|
|
|
|
|
# 创建一个新的nn.Module实例,并加载字典中的模型权重
|
|
|
|
|
|
|
|
# 加载模型状态
|
|
|
|
|
|
|
|
actual_model = torch.nn.Module()
|
|
|
|
|
|
|
|
actual_model.load_state_dict(model['model'])
|
|
|
|
|
|
|
|
return actual_model
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
# 记录从字典加载模型失败的错误信息并返回None
|
|
|
|
|
|
|
|
logging.error(f"从字典加载模型时发生错误: {e}")
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# 如果加载的模型不是字典类型,直接返回加载的模型
|
|
|
|
|
|
|
|
return model
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def letterbox(img: Image.Image, new_shape: Union[int, Tuple[int, int]] = 640, color: Tuple[int, int, int] = (114, 114, 114), auto: bool = True, scaleFill: bool = False, scaleUp: bool = True) -> Image.Image:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
对图像进行letterbox调整大小。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
|
|
- img: PIL Image对象,需要调整大小的图像。
|
|
|
|
|
|
|
|
- new_shape: 目标图像的宽度和高度,可以是整数(表示正方形目标)或元组(表示宽高)。
|
|
|
|
|
|
|
|
- color: 填充边框的颜色,使用RGB格式的三元组表示。
|
|
|
|
|
|
|
|
- auto: 是否根据图像纵横比自动调整大小。
|
|
|
|
|
|
|
|
- scaleFill: 是否按需扩展图像以填满整个目标区域。(当前未使用)
|
|
|
|
|
|
|
|
- scaleUp: 如果设置为False,则不会对小于目标尺寸的图像进行放大。(当前未使用)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
|
|
- Image.Image: 调整大小并填充后的图像。
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 验证输入
|
|
|
|
|
|
|
|
if not isinstance(img, Image.Image):
|
|
|
|
|
|
|
|
raise ValueError("img 参数必须是 PIL Image 对象。")
|
|
|
|
|
|
|
|
if isinstance(new_shape, int):
|
|
|
|
|
|
|
|
new_shape = (new_shape, new_shape)
|
|
|
|
|
|
|
|
if not isinstance(new_shape, tuple) or len(new_shape) != 2 or any(not isinstance(x, int) or x <= 0 for x in new_shape):
|
|
|
|
|
|
|
|
raise ValueError("new_shape 参数必须是正整数或包含两个正整数的元组。")
|
|
|
|
|
|
|
|
if not isinstance(color, tuple) or len(color) != 3 or any(not isinstance(x, int) or x < 0 or x > 255 for x in color):
|
|
|
|
|
|
|
|
raise ValueError("color 参数必须是 RGB 颜色的三元组,每个值介于 0 和 255 之间。")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 获取图像当前尺寸并处理边界条件
|
|
|
|
|
|
|
|
h, w = img.size
|
|
|
|
|
|
|
|
if h == 0 or w == 0:
|
|
|
|
|
|
|
|
raise ValueError("图像的宽度或高度为0。")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 计算新尺寸
|
|
|
|
|
|
|
|
new_h, new_w = new_shape
|
|
|
|
|
|
|
|
if auto:
|
|
|
|
|
|
|
|
aspect_ratio = min(new_h / h, new_w / w)
|
|
|
|
|
|
|
|
new_w, new_h = int(w * aspect_ratio), int(h * aspect_ratio)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
new_w, new_h = new_shape
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 调整图像大小并在新图像中填充
|
|
|
|
|
|
|
|
resized = img.resize((new_w, new_h), Image.BILINEAR)
|
|
|
|
|
|
|
|
new_img = Image.new('RGB', (new_shape[1], new_shape[0]), color=color)
|
|
|
|
|
|
|
|
offset = ((new_shape[1] - new_w) // 2, (new_shape[0] - new_h) // 2)
|
|
|
|
|
|
|
|
new_img.paste(resized, offset)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return new_img
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
对预测框进行非极大值抑制(Non-Maximum Suppression, NMS)操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
|
|
- pred: 包含检测结果的张量,形状为 (num_boxes, num_classes + 5),其中每个元素包含 [x1, y1, x2, y2, confidence, class_id]
|
|
|
|
|
|
|
|
- conf_thres: 保留的置信度阈值
|
|
|
|
|
|
|
|
- iou_thres: IOU阈值用于去除重叠的框
|
|
|
|
|
|
|
|
- classes: 如果不为None,则只对指定类别的预测框进行NMS
|
|
|
|
|
|
|
|
- agnostic: 是否忽略类别信息进行NMS,默认False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
|
|
- filtered_boxes: 保留下来的预测框,形状为 (num_filtered_boxes, num_classes + 5)
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
if pred.numel() == 0:
|
|
|
|
|
|
|
|
return pred.new_zeros((0, pred.size(-1)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 获取置信度大于conf_thres的预测框
|
|
|
|
|
|
|
|
indices = torch.where(pred[..., 4] > conf_thres)[0]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 将pred张量切片为仅包含这些框的张量
|
|
|
|
|
|
|
|
pred = pred[indices]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 如果指定了类别,只保留这些类别的框
|
|
|
|
|
|
|
|
if classes is not None:
|
|
|
|
|
|
|
|
class_mask = pred[..., 5].unsqueeze(-1) == classes[:, None]
|
|
|
|
|
|
|
|
pred = pred[class_mask.any(dim=1)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 对预测框进行排序,按置信度降序
|
|
|
|
|
|
|
|
sorted_scores, sorted_indices = pred[..., 4].sort(descending=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 初始化保留的框索引列表
|
|
|
|
|
|
|
|
kept_indices = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while sorted_indices.numel() > 0:
|
|
|
|
|
|
|
|
i = sorted_indices[0]
|
|
|
|
|
|
|
|
kept_indices.append(i.item())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 计算与当前框IOU大于iou_thres的其他框
|
|
|
|
|
|
|
|
iou = box_iou(pred[i.unsqueeze(0)], pred[sorted_indices[1:]])
|
|
|
|
|
|
|
|
mask = iou < iou_thres
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 更新排序索引
|
|
|
|
|
|
|
|
sorted_indices = sorted_indices[mask]
|
|
|
|
|
|
|
|
sorted_scores = sorted_scores[mask]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 根据保留的索引筛选预测框
|
|
|
|
|
|
|
|
kept_boxes = pred[torch.tensor(kept_indices, dtype=torch.long)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 如果agnostic为True,忽略类别信息
|
|
|
|
|
|
|
|
if agnostic:
|
|
|
|
|
|
|
|
kept_boxes[..., 5] = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return kept_boxes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scale_coords(img_shape, coords, new_shape):
|
|
|
|
|
|
|
|
# 确保输入参数符合预期的格式和类型
|
|
|
|
|
|
|
|
if not (isinstance(img_shape, list) and isinstance(new_shape, list) and len(img_shape) == 2 and len(new_shape) == 2):
|
|
|
|
|
|
|
|
raise ValueError("img_shape and new_shape must be lists of length 2.")
|
|
|
|
|
|
|
|
if not (isinstance(coords, np.ndarray) and coords.shape[1] == 4):
|
|
|
|
|
|
|
|
raise ValueError("coords must be a numpy array with 4 columns.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 验证形状为正
|
|
|
|
|
|
|
|
if any(i <= 0 for i in img_shape) or any(i <= 0 for i in new_shape):
|
|
|
|
|
|
|
|
raise ValueError("img_shape and new_shape must have positive elements.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 计算缩放比例
|
|
|
|
|
|
|
|
ratio_w = new_shape[0] / img_shape[1]
|
|
|
|
|
|
|
|
ratio_h = new_shape[1] / img_shape[0]
|
|
|
|
|
|
|
|
ratio = min(ratio_w, ratio_h) # 保证比例不会超过任一边界
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ratio < 1:
|
|
|
|
|
|
|
|
new_w = int(img_shape[1] * ratio)
|
|
|
|
|
|
|
|
new_h = int(img_shape[0] * ratio)
|
|
|
|
|
|
|
|
# 重新计算中心点
|
|
|
|
|
|
|
|
center_x = (coords[:, 0] + coords[:, 2]) / 2
|
|
|
|
|
|
|
|
center_y = (coords[:, 1] + coords[:, 3]) / 2
|
|
|
|
|
|
|
|
center_x = center_x * new_w / img_shape[1]
|
|
|
|
|
|
|
|
center_y = center_y * new_h / img_shape[0]
|
|
|
|
|
|
|
|
# 计算新的宽和高
|
|
|
|
|
|
|
|
w = (coords[:, 2] - coords[:, 0]) * ratio
|
|
|
|
|
|
|
|
h = (coords[:, 3] - coords[:, 1]) * ratio # 完善高度的更新计算
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 更新坐标
|
|
|
|
|
|
|
|
scaled_coords = np.array([center_x - w / 2, center_y - h / 2, center_x + w / 2, center_y + h / 2]).T
|
|
|
|
|
|
|
|
return scaled_coords
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# 如果ratio >= 1,不需要缩放,直接返回原coords
|
|
|
|
|
|
|
|
return coords
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_image(weights, img_path, img_size=640, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False):
|
|
|
|
|
|
|
|
device = select_device('')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
device = select_device('')
|
|
|
|
|
|
|
|
logging.info("Selecting device: %s", device)
|
|
|
|
|
|
|
|
model = attempt_load(weights, map_location=device)
|
|
|
|
|
|
|
|
model.eval()
|
|
|
|
|
|
|
|
logging.info("Model loaded successfully")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
logging.error(f"Error loading the model: {e}")
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
img = Image.open(img_path)
|
|
|
|
|
|
|
|
img = letterbox(img, new_shape=img_size)[0]
|
|
|
|
|
|
|
|
img = np.array(img)
|
|
|
|
|
|
|
|
logging.info("Image loaded and preprocessed")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
logging.error(f"Error loading the image: {e}")
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
img = img[:, :, ::-1].transpose(2, 0, 1)
|
|
|
|
|
|
|
|
img = np.ascontiguousarray(img)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
img_tensor = torch.from_numpy(img).to(device)
|
|
|
|
|
|
|
|
img_tensor = img_tensor.float()
|
|
|
|
|
|
|
|
img_tensor /= 255.0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if img_tensor.ndimension() == 3:
|
|
|
|
|
|
|
|
img_tensor = img_tensor.unsqueeze(0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
pred = model(img_tensor, augment=False)[0]
|
|
|
|
|
|
|
|
logging.info("Inference completed")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
logging.error(f"Error during inference: {e}")
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pred = non_max_suppression(pred, conf_thres, iou_thres, classes=classes, agnostic=agnostic)
|
|
|
|
|
|
|
|
if len(pred):
|
|
|
|
|
|
|
|
pred[:, :4] = scale_coords(img.shape[1:], pred[:, :4], img_size).round()
|
|
|
|
|
|
|
|
logging.info("Non-maximum suppression completed")
|
|
|
|
|
|
|
|
return pred
|
|
|
|
|