import math import threading import time import contextlib import numpy as np import torch import torchvision from models.common import DetectMultiBackend import cv2 from utils.plots import Annotator import pkg_resources as pkg import platform import logging import logging.config import os from PIL import Image, ImageDraw, ImageFont from pathlib import Path def is_ascii(s=''): # Is string composed of all ASCII (no UTF) characters? (note str().isascii() introduced in python 3.7) s = str(s) # convert list, tuple, None, etc. to str return len(s.encode().decode('ascii', 'ignore')) == len(s) LOGGING_NAME = "yolov5" def set_logging(name=LOGGING_NAME, verbose=True): # sets up logging for the given name rank = int(os.getenv('RANK', -1)) # rank in world for Multi-GPU trainings level = logging.INFO if verbose and rank in {-1, 0} else logging.ERROR logging.config.dictConfig({ "version": 1, "disable_existing_loggers": False, "formatters": { name: { "format": "%(message)s"}}, "handlers": { name: { "class": "logging.StreamHandler", "formatter": name, "level": level,}}, "loggers": { name: { "level": level, "handlers": [name], "propagate": False,}}}) set_logging(LOGGING_NAME) # run before defining LOGGER LOGGER = logging.getLogger(LOGGING_NAME) # define globally (used in train.py, val.py, detect.py, etc.) def emojis(str=''): # Return platform-dependent emoji-safe version of string return str.encode().decode('ascii', 'ignore') if platform.system() == 'Windows' else str def check_version(current='0.0.0', minimum='0.0.0', name='version ', pinned=False, hard=False, verbose=False): # Check version vs. required version current, minimum = (pkg.parse_version(x) for x in (current, minimum)) result = (current == minimum) if pinned else (current >= minimum) # bool s = f'WARNING ⚠️ {name}{minimum} is required by YOLOv5, but {name}{current} is currently installed' # string if hard: assert result, emojis(s) # assert min requirements met if verbose and not result: LOGGER.warning(s) return result def smart_inference_mode(torch_1_9=check_version(torch.__version__, '1.9.0')): # Applies torch.inference_mode() decorator if torch>=1.9.0 else torch.no_grad() decorator def decorate(fn): return (torch.inference_mode if torch_1_9 else torch.no_grad)()(fn) return decorate def box_iou(box1, box2, eps=1e-7): # https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py """ Return intersection-over-union (Jaccard index) of boxes. Both sets of boxes are expected to be in (x1, y1, x2, y2) format. Arguments: box1 (Tensor[N, 4]) box2 (Tensor[M, 4]) Returns: iou (Tensor[N, M]): the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2 """ # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2) inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp(0).prod(2) # IoU = inter / (area1 + area2 - inter) return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps) def xywh2xyxy(x): # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def xyxy2xywh(x): # Convert nx4 boxes from [x1, y1, x2, y2] to [x, y, w, h] where xy1=top-left, xy2=bottom-right y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = (x[:, 0] + x[:, 2]) / 2 # x center y[:, 1] = (x[:, 1] + x[:, 3]) / 2 # y center y[:, 2] = x[:, 2] - x[:, 0] # width y[:, 3] = x[:, 3] - x[:, 1] # height return y def clip_boxes(boxes, shape): # Clip boxes (xyxy) to image shape (height, width) if isinstance(boxes, torch.Tensor): # faster individually boxes[:, 0].clamp_(0, shape[1]) # x1 boxes[:, 1].clamp_(0, shape[0]) # y1 boxes[:, 2].clamp_(0, shape[1]) # x2 boxes[:, 3].clamp_(0, shape[0]) # y2 else: # np.array (faster grouped) boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) # x1, x2 boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) # y1, y2 def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None): # Rescale boxes (xyxy) from img1_shape to img0_shape if ratio_pad is None: # calculate from img0_shape gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding else: gain = ratio_pad[0][0] pad = ratio_pad[1] boxes[:, [0, 2]] -= pad[0] # x padding boxes[:, [1, 3]] -= pad[1] # y padding boxes[:, :4] /= gain clip_boxes(boxes, img0_shape) return boxes def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): # Resize and pad image while meeting stride-multiple constraints shape = im.shape[:2] # current shape [height, width] if isinstance(new_shape, int): new_shape = (new_shape, new_shape) # Scale ratio (new / old) r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) if not scaleup: # only scale down, do not scale up (for better val mAP) r = min(r, 1.0) # Compute padding ratio = r, r # width, height ratios new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding if auto: # minimum rectangle dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding elif scaleFill: # stretch dw, dh = 0.0, 0.0 new_unpad = (new_shape[1], new_shape[0]) ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios dw /= 2 # divide padding into 2 sides dh /= 2 if shape[::-1] != new_unpad: # resize im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border return im, ratio, (dw, dh) def non_max_suppression( prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, labels=(), max_det=300, nm=0, # number of masks ): """Non-Maximum Suppression (NMS) on inference results to reject overlapping detections Returns: list of detections, on (n,6) tensor per image [xyxy, conf, cls] """ # YOLOv5 model in validation model, output = (inference_out, loss_out) prediction = prediction[0] # select only inference output nc = prediction.shape[2] - nm - 5 # number of classes xc = prediction[..., 4] > conf_thres # candidates # Checks mi = 5 + nc # mask start index output = [torch.zeros((0, 6 + nm), device=prediction.device)] for xi, x in enumerate(prediction): # image index, image inference # Apply constraints # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height x = x[xc[xi]] # confidence # Cat apriori labels if autolabelling if labels and len(labels[xi]): lb = labels[xi] v = torch.zeros((len(lb), nc + nm + 5), device=x.device) v[:, :4] = lb[:, 1:5] # box v[:, 4] = 1.0 # conf v[range(len(lb)), lb[:, 0].long() + 5] = 1.0 # cls x = torch.cat((x, v), 0) # If none remain process next image if not x.shape[0]: continue # Compute conf x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf # Box/Mask box = xywh2xyxy(x[:, :4]) # center_x, center_y, width, height) to (x1, y1, x2, y2) mask = x[:, mi:] # zero columns if no masks # Detections matrix nx6 (xyxy, conf, cls) conf, j = x[:, 5:mi].max(1, keepdim=True) x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres] # Filter by class if classes is not None: x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] # Apply finite constraint # if not torch.isfinite(x).all(): # x = x[torch.isfinite(x).all(1)] # Check shape n = x.shape[0] # number of boxes if not n: # no boxes continue x = x[x[:, 4].argsort(descending=True)] # sort by confidence # Batched NMS c = x[:, 5:6] # classes boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS # limit detections i = i[:max_det] # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix weights = iou * scores[None] # box weights x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes i = i[iou.sum(1) > 1] # require redundancy output[xi] = x[i] return output def is_writeable(dir, test=False): # Return True if directory has write permissions, test opening a file with write permissions if test=True if not test: return os.access(dir, os.W_OK) # possible issues on Windows file = Path(dir) / 'tmp.txt' try: with open(file, 'w'): # open file with write permissions pass file.unlink() # remove file return True except OSError: return False def user_config_dir(dir='Ultralytics', env_var='YOLOV5_CONFIG_DIR'): # Return path of user configuration directory. Prefer environment variable if exists. Make dir if required. env = os.getenv(env_var) if env: path = Path(env) # use environment variable else: cfg = {'Windows': 'AppData/Roaming', 'Linux': '.config', 'Darwin': 'Library/Application Support'} # 3 OS dirs path = Path.home() / cfg.get(platform.system(), '') # OS-specific config dir path = (path if is_writeable(path) else Path('/tmp')) / dir # GCP and AWS lambda fix, only /tmp is writeable path.mkdir(exist_ok=True) # make if required return path CONFIG_DIR = user_config_dir() # Ultralytics settings dir FONT = 'Arial.ttf' # https://ultralytics.com/assets/Arial.ttf def check_font(font=FONT, progress=False): # Download font to CONFIG_DIR if necessary font = Path(font) file = CONFIG_DIR / font.name if not font.exists() and not file.exists(): url = f'https://ultralytics.com/assets/{font.name}' LOGGER.info(f'Downloading {url} to {file}...') torch.hub.download_url_to_file(url, str(file), progress=progress) def check_python(minimum='3.7.0'): # Check current python version vs. required python version check_version(platform.python_version(), minimum, name='Python ', hard=True) class TryExcept(contextlib.ContextDecorator): # YOLOv5 TryExcept class. Usage: @TryExcept() decorator or 'with TryExcept():' context manager def __init__(self, msg=''): self.msg = msg def __enter__(self): pass def __exit__(self, exc_type, value, traceback): if value: print(emojis(f"{self.msg}{': ' if self.msg else ''}{value}")) return True class TryExcept(contextlib.ContextDecorator): # YOLOv5 TryExcept class. Usage: @TryExcept() decorator or 'with TryExcept():' context manager def __init__(self, msg=''): self.msg = msg def __enter__(self): pass def __exit__(self, exc_type, value, traceback): if value: print(emojis(f"{self.msg}{': ' if self.msg else ''}{value}")) return True def colorstr(*input): # Colors a string https://en.wikipedia.org/wiki/ANSI_escape_code, i.e. colorstr('blue', 'hello world') *args, string = input if len(input) > 1 else ('blue', 'bold', input[0]) # color arguments, string colors = { 'black': '\033[30m', # basic colors 'red': '\033[31m', 'green': '\033[32m', 'yellow': '\033[33m', 'blue': '\033[34m', 'magenta': '\033[35m', 'cyan': '\033[36m', 'white': '\033[37m', 'bright_black': '\033[90m', # bright colors 'bright_red': '\033[91m', 'bright_green': '\033[92m', 'bright_yellow': '\033[93m', 'bright_blue': '\033[94m', 'bright_magenta': '\033[95m', 'bright_cyan': '\033[96m', 'bright_white': '\033[97m', 'end': '\033[0m', # misc 'bold': '\033[1m', 'underline': '\033[4m'} return ''.join(colors[x] for x in args) + f'{string}' + colors['end'] @TryExcept() # def check_requirements(requirements='requirements.txt', exclude=(), install=True, cmds=''): # # Check installed dependencies meet YOLOv5 requirements (pass *.txt file or list of packages or single package str) # prefix = colorstr('red', 'bold', 'requirements:') # check_python() # check python version # if isinstance(requirements, Path): # requirements.txt file # file = requirements.resolve() # assert file.exists(), f"{prefix} {file} not found, check failed." # with file.open() as f: # requirements = [f'{x.name}{x.specifier}' for x in pkg.parse_requirements(f) if x.name not in exclude] # elif isinstance(requirements, str): # requirements = [requirements] # # s = '' # n = 0 # for r in requirements: # try: # pkg.require(r) # except (pkg.VersionConflict, pkg.DistributionNotFound): # exception if requirements not met # s += f'"{r}" ' # n += 1 # # if s and install and AUTOINSTALL: # check environment variable # LOGGER.info(f"{prefix} YOLOv5 requirement{'s' * (n > 1)} {s}not found, attempting AutoUpdate...") # try: # # assert check_online(), "AutoUpdate skipped (offline)" # LOGGER.info(check_output(f'pip install {s} {cmds}', shell=True).decode()) # source = file if 'file' in locals() else requirements # s = f"{prefix} {n} package{'s' * (n > 1)} updated per {source}\n" \ # f"{prefix} ⚠️ {colorstr('bold', 'Restart runtime or rerun command for updates to take effect')}\n" # LOGGER.info(s) # except Exception as e: # LOGGER.warning(f'{prefix} ❌ {e}') # def check_pil_font(font=FONT, size=10): # # Return a PIL TrueType Font, downloading to CONFIG_DIR if necessary # font = Path(font) # font = font if font.exists() else (CONFIG_DIR / font.name) # try: # return ImageFont.truetype(str(font) if font.exists() else font.name, size) # except Exception: # download if missing # try: # check_font(font) # return ImageFont.truetype(str(font), size) # except TypeError: # check_requirements('Pillow>=8.4.0') # known issue https://github.com/ultralytics/yolov5/issues/5374 # except URLError: # not online # return ImageFont.load_default() def scale_image(im1_shape, masks, im0_shape, ratio_pad=None): """ img1_shape: model input shape, [h, w] img0_shape: origin pic shape, [h, w, 3] masks: [h, w, num] """ # Rescale coordinates (xyxy) from im1_shape to im0_shape if ratio_pad is None: # calculate from im0_shape gain = min(im1_shape[0] / im0_shape[0], im1_shape[1] / im0_shape[1]) # gain = old / new pad = (im1_shape[1] - im0_shape[1] * gain) / 2, (im1_shape[0] - im0_shape[0] * gain) / 2 # wh padding else: pad = ratio_pad[1] top, left = int(pad[1]), int(pad[0]) # y, x bottom, right = int(im1_shape[0] - pad[1]), int(im1_shape[1] - pad[0]) if len(masks.shape) < 2: raise ValueError(f'"len of masks shape" should be 2 or 3, but got {len(masks.shape)}') masks = masks[top:bottom, left:right] # masks = masks.permute(2, 0, 1).contiguous() # masks = F.interpolate(masks[None], im0_shape[:2], mode='bilinear', align_corners=False)[0] # masks = masks.permute(1, 2, 0).contiguous() masks = cv2.resize(masks, (im0_shape[1], im0_shape[0])) if len(masks.shape) == 2: masks = masks[:, :, None] return masks # class Annotator: # # YOLOv5 Annotator for train/val mosaics and jpgs and detect/hub inference annotations # def __init__(self, im, line_width=None, font_size=None, font='Arial.ttf', pil=False, example='abc'): # assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to Annotator() input images.' # non_ascii = not is_ascii(example) # non-latin labels, i.e. asian, arabic, cyrillic # self.pil = pil or non_ascii # if self.pil: # use PIL # self.im = im if isinstance(im, Image.Image) else Image.fromarray(im) # self.draw = ImageDraw.Draw(self.im) # self.font = check_pil_font(font='Arial.Unicode.ttf' if non_ascii else font, # size=font_size or max(round(sum(self.im.size) / 2 * 0.035), 12)) # else: # use cv2 # self.im = im # self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) # line width # # def box_label(self, box, label='', color=(128, 128, 128), txt_color=(255, 255, 255)): # # Add one xyxy box to image with label # if self.pil or not is_ascii(label): # self.draw.rectangle(box, width=self.lw, outline=color) # box # if label: # w, h = self.font.getsize(label) # text width, height # outside = box[1] - h >= 0 # label fits outside box # self.draw.rectangle( # (box[0], box[1] - h if outside else box[1], box[0] + w + 1, # box[1] + 1 if outside else box[1] + h + 1), # fill=color, # ) # # self.draw.text((box[0], box[1]), label, fill=txt_color, font=self.font, anchor='ls') # for PIL>8.0 # self.draw.text((box[0], box[1] - h if outside else box[1]), label, fill=txt_color, font=self.font) # else: # cv2 # p1, p2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3])) # cv2.rectangle(self.im, p1, p2, color, thickness=self.lw, lineType=cv2.LINE_AA) # if label: # tf = max(self.lw - 1, 1) # font thickness # w, h = cv2.getTextSize(label, 0, fontScale=self.lw / 3, thickness=tf)[0] # text width, height # outside = p1[1] - h >= 3 # p2 = p1[0] + w, p1[1] - h - 3 if outside else p1[1] + h + 3 # cv2.rectangle(self.im, p1, p2, color, -1, cv2.LINE_AA) # filled # cv2.putText(self.im, # label, (p1[0], p1[1] - 2 if outside else p1[1] + h + 2), # 0, # self.lw / 3, # txt_color, # thickness=tf, # lineType=cv2.LINE_AA) # # def masks(self, masks, colors, im_gpu=None, alpha=0.5): # """Plot masks at once. # Args: # masks (tensor): predicted masks on cuda, shape: [n, h, w] # colors (List[List[Int]]): colors for predicted masks, [[r, g, b] * n] # im_gpu (tensor): img is in cuda, shape: [3, h, w], range: [0, 1] # alpha (float): mask transparency: 0.0 fully transparent, 1.0 opaque # """ # if self.pil: # # convert to numpy first # self.im = np.asarray(self.im).copy() # if im_gpu is None: # # Add multiple masks of shape(h,w,n) with colors list([r,g,b], [r,g,b], ...) # if len(masks) == 0: # return # if isinstance(masks, torch.Tensor): # masks = torch.as_tensor(masks, dtype=torch.uint8) # masks = masks.permute(1, 2, 0).contiguous() # masks = masks.cpu().numpy() # # masks = np.ascontiguousarray(masks.transpose(1, 2, 0)) # masks = scale_image(masks.shape[:2], masks, self.im.shape) # masks = np.asarray(masks, dtype=np.float32) # colors = np.asarray(colors, dtype=np.float32) # shape(n,3) # s = masks.sum(2, keepdims=True).clip(0, 1) # add all masks together # masks = (masks @ colors).clip(0, 255) # (h,w,n) @ (n,3) = (h,w,3) # self.im[:] = masks * alpha + self.im * (1 - s * alpha) # else: # if len(masks) == 0: # self.im[:] = im_gpu.permute(1, 2, 0).contiguous().cpu().numpy() * 255 # colors = torch.tensor(colors, device=im_gpu.device, dtype=torch.float32) / 255.0 # colors = colors[:, None, None] # shape(n,1,1,3) # masks = masks.unsqueeze(3) # shape(n,h,w,1) # masks_color = masks * (colors * alpha) # shape(n,h,w,3) # # inv_alph_masks = (1 - masks * alpha).cumprod(0) # shape(n,h,w,1) # mcs = (masks_color * inv_alph_masks).sum(0) * 2 # mask color summand shape(n,h,w,3) # # im_gpu = im_gpu.flip(dims=[0]) # flip channel # im_gpu = im_gpu.permute(1, 2, 0).contiguous() # shape(h,w,3) # im_gpu = im_gpu * inv_alph_masks[-1] + mcs # im_mask = (im_gpu * 255).byte().cpu().numpy() # self.im[:] = scale_image(im_gpu.shape, im_mask, self.im.shape) # if self.pil: # # convert im back to PIL and update draw # self.fromarray(self.im) # # def rectangle(self, xy, fill=None, outline=None, width=1): # # Add rectangle to image (PIL-only) # self.draw.rectangle(xy, fill, outline, width) # # def text(self, xy, text, txt_color=(255, 255, 255), anchor='top'): # # Add text to image (PIL-only) # if anchor == 'bottom': # start y from font bottom # w, h = self.font.getsize(text) # text width, height # xy[1] += 1 - h # self.draw.text(xy, text, fill=txt_color, font=self.font) # # def fromarray(self, im): # # Update self.im from a numpy array # self.im = im if isinstance(im, Image.Image) else Image.fromarray(im) # self.draw = ImageDraw.Draw(self.im) # # def result(self): # # Return annotated image as array # return np.asarray(self.im) @smart_inference_mode() def run(): # Load model device = torch.device('cuda:0') model = DetectMultiBackend(weights='fire.pt', device=device, dnn=False, data=False, fp16=True) stride, names, pt = model.stride, model.names, model.pt #IP摄像头 cap = cv2.VideoCapture("http://admin:admin@10.129.50.72:8081") # URL of the IP camera # 读取图片 while True: ret, img0 = cap.read() #图片旋转 img0 = np.rot90(img0,3) img0 = np.array(img0) img0 = cv2.cvtColor(img0, cv2.COLOR_BGRA2BGR) # 处理图片 im = letterbox(img0, (640, 640), stride=32, auto=True)[0] # padded resize im = im.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB im = np.ascontiguousarray(im) # contiguous im = torch.from_numpy(im).to(model.device) im = im.half() if model.fp16 else im.float() # uint8 to fp16/32 im /= 255 # 0 - 255 to 0.0 - 1.0 if len(im.shape) == 3: im = im[None] # expand for batch dim # 推理 pred = model(im, augment=False, visualize=False) # 非极大值抑制 pred = non_max_suppression(pred, conf_thres=0.4, iou_thres=0.05, classes=None, max_det=1000) # 处理推理内容 for i, det in enumerate(pred): # 画框 annotator = Annotator(img0, line_width=2) if len(det): target_list = [] # 将转换后的图片画框结果转换成原图上的结果 det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], img0.shape).round() for *xyxy, conf, cls in reversed(det): # 处理推理出来每个目标的信息 # 将xyxy(左上角+右下角)格式转为xywh(中心点+宽长)格式,并除上w,h做归一化,转化为列表再保存 xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))).view(-1).tolist() # normalized xywh annotator.box_label(xyxy, label=f'[{names[int(cls)]} {conf:.2f}]', color=(34, 139, 34), txt_color=(0, 191, 255)) target_list.append(xywh) im0 = annotator.result() cv2.imshow('window', im0) cv2.waitKey(1) if __name__ == "__main__": run()