import math
import threading
import time
import contextlib
import cv2
import numpy as np
import torch
import torchvision
from models.common import DetectMultiBackend
from utils.plots import Annotator
import json
import base64
import socket
from pathlib import Path
from utils.general import (LOGGER, Profile, check_file, check_imshow, check_requirements, colorstr, cv2,
increment_path, non_max_suppression, print_args, scale_boxes, strip_optimizer, xyxy2xywh)
import sys
import os
import nvidia_smi
from ctypes import windll
import math
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0] # YOLOv5 root directory
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT)) # add ROOT to PATH
ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
# 清空命令指示符输出
def clear():
_ = os.system('cls')
# 检查是否为管理员权限
def is_admin():
return windll.shell32.IsUserAnAdmin()
except OSError as err:
print('OS error: {0}'.format(err))
return False
# 简单检查gpu是否够格
def check_gpu():
gpu_handle = nvidia_smi.nvmlDeviceGetHandleByIndex(0) # 默认卡1
gpu_name = nvidia_smi.nvmlDeviceGetName(gpu_handle)
memory_info = nvidia_smi.nvmlDeviceGetMemoryInfo(gpu_handle)
if b'RTX' in gpu_name:
return 2
memory_total = memory_info.total / 1024 / 1024
if memory_total > 3000:
return 1
return 0
def make_divisible(x, divisor):
# Returns nearest x divisible by divisor
if isinstance(divisor, torch.Tensor):
divisor = int(divisor.max()) # to int
return math.ceil(x / divisor) * divisor
def check_img_size(imgsz, s=32, floor=0):
# Verify image size is a multiple of stride s in each dimension
if isinstance(imgsz, int): # integer i.e. img_size=640
new_size = max(make_divisible(imgsz, int(s)), floor)
else: # list i.e. img_size=[640, 480]
imgsz = list(imgsz) # convert to list if tuple
new_size = [max(make_divisible(x, int(s)), floor) for x in imgsz]
if new_size != imgsz:
LOGGER.warning(f'WARNING ⚠️ --img-size {imgsz} must be multiple of max stride {s}, updating to {new_size}')
return new_size
def clip_boxes(boxes, shape):
# Clip boxes (xyxy) to image shape (height, width)
if isinstance(boxes, torch.Tensor): # faster individually
boxes[:, 0].clamp_(0, shape[1]) # x1
boxes[:, 1].clamp_(0, shape[0]) # y1
boxes[:, 2].clamp_(0, shape[1]) # x2
boxes[:, 3].clamp_(0, shape[0]) # y2
else: # np.array (faster grouped)
boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) # x1, x2
boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) # y1, y2
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
# Rescale boxes (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
gain = ratio_pad[0][0]
pad = ratio_pad[1]
boxes[:, [0, 2]] -= pad[0] # x padding
boxes[:, [1, 3]] -= pad[1] # y padding
boxes[:, :4] /= gain
clip_boxes(boxes, img0_shape)
return boxes
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better val mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im, ratio, (dw, dh)
def select_device(device='', batch_size=0, newline=True):
# device = None or 'cpu' or 0 or '0' or '0,1,2,3'
s = f'torch-{torch.__version__} '
device = str(device).strip().lower().replace('cuda:', '').replace('none', '') # to string, 'cuda:0' to '0'
cpu = device == 'cpu'
mps = device == 'mps' # Apple Metal Performance Shaders (MPS)
if cpu or mps:
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # force torch.cuda.is_available() = False
elif device: # non-cpu device requested
os.environ['CUDA_VISIBLE_DEVICES'] = device # set environment variable - must be before assert is_available()
assert torch.cuda.is_available() and torch.cuda.device_count() >= len(device.replace(',', '')), \
f"Invalid CUDA '--device {device}' requested, use '--device cpu' or pass valid CUDA device(s)"
if not cpu and not mps and torch.cuda.is_available(): # prefer GPU if available
devices = device.split(',') if device else '0' # range(torch.cuda.device_count()) # i.e. 0,1,6,7
n = len(devices) # device count
if n > 1 and batch_size > 0: # check batch_size is divisible by device_count
assert batch_size % n == 0, f'batch-size {batch_size} not multiple of GPU count {n}'
space = ' ' * (len(s) + 1)
for i, d in enumerate(devices):
p = torch.cuda.get_device_properties(i)
s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / (1 << 20):.0f}MiB)\n" # bytes to MB
arg = 'cuda:0'
elif mps and getattr(torch, 'has_mps', False) and torch.backends.mps.is_available(): # prefer MPS if available
s += 'MPS\n'
arg = 'mps'
else: # revert to CPU
s += 'CPU\n'
arg = 'cpu'
if not newline:
s = s.rstrip()
return torch.device(arg)
class YOLO:
def __init__(self,
self.half = half
self.device = torch.device('cuda:0')
self.conf = conf
self.iou_thres = iou
self.agnostic_nms = agnostic_nms
self.max_det = max_det
model = DetectMultiBackend(path, device=self.device, dnn=dnn)
self.stride, self.names, self.pt, self.jit, self.onnx = model.stride, model.names, model.pt, model.jit, model.onnx
imgsz = (imgsz, imgsz) if isinstance(imgsz, int) else imgsz
self.img_size = check_img_size(imgsz, s=self.stride) # check image size
if self.pt:
model.model.half() if half else model.model.float()
if half:
dtype = torch.float16
dtype = torch.float32
model(torch.zeros(1, 3, *self.img_size).to(device).type(dtype)) # warmup
self.model = model
self.classes = classes
def predict(self, im):
# Load model
src_shape = im.shape
model = self.model
# Half
half = self.half # half precision only supported by PyTorch on CUDA
device = self.device
img = letterbox(im, self.img_size, stride=self.stride, auto=True)[0]
# Convert
img = img.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB
img = np.ascontiguousarray(img)
im = torch.from_numpy(img).to(device)
im = im.half() if half else im.float() # uint8 to fp16/32
im /= 255 # 0 - 255 to 0.0 - 1.0
if len(im.shape) == 3:
im = im[None] # expand for batch dim
# Inference
pred = model(im)
pred = non_max_suppression(pred, self.conf, self.iou_thres, self.classes, self.agnostic_nms,
# if not len(det):
# return [], [], []
# det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im.shape).round()
for i, det in enumerate(pred):
# 画框
annotator = Annotator(img, line_width=2)
if len(det):
target_list = []
result = "fire"
# 将转换后的图片画框结果转换成原图上的结果
det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], img.shape).round()
for *xyxy, conf, cls in reversed(det): # 处理推理出来每个目标的信息
# 将xyxy(左上角+右下角)格式转为xywh(中心点+宽长)格式,并除上w,h做归一化,转化为列表再保存
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))).view(-1).tolist() # normalized xywh
# if names[int(cls)]=='':
# result = "fire"
# type = "Alarming"
annotator.box_label(xyxy, label=f'[{YOLO.names[int(cls)]} {conf:.2f}]',
color=(34, 139, 34),
txt_color=(0, 191, 255))
print('\033[0;31;40m' + f' 发现火情 ' + '\033[0m')
im0 = annotator.result()
cv2.imshow('UAV', im0)
return target_list, im0
class PID:
def __init__(self, p, i, d, set_value):
self.kp = p
self.ki = i
self.kd = d
self.setValue = set_value # 目标值
self.lastErr = 0 # 上一次误差
self.preLastErr = 0 # 临时存误差
self.errSum = 0 # 误差总和
# 位置式PID
def pidPosition(self, curValue):
err = self.setValue - curValue
dErr = err - self.lastErr
self.preLastErr = self.lastErr
self.lastErr = err
self.errSum += err
outPID = self.kp * err + (self.ki * self.errSum) + (self.kd * dErr)
return outPID
# #设置时延
def delayMsecond(t): # t的单位0.1ms
start, end = 0, 0
start = time.perf_counter() * pow(10, 7)
while (end - start < t * pow(10, 3)):
end = time.perf_counter() * pow(10, 7)
# 连接摄像头类
class Capture:
def __init__(self,
self.ip = ip
self.cap = cv2.VideoCapture(self.ip)
def read(self):
ret, img = self.cap.read()
return img
def my_cvtColo(img, code):
img = cv2.cvtColor(img,choice[code])
return img
def main():
# cap = cv2.VideoCapture("http://admin:admin@")
# print("图像加载成功")
# 模型路径
path = 'fire.pt'
# 尺寸大小
width, height = 640, 640
ip = input("输入摄像头地址:")
cap = Capture(ip)
conf = float(input("输入置信度:"))
predict = YOLO(path, "cuda:0", imgsz=(width, height), conf=conf, classes=None)
while True:
img = cap.read()
img = np.rot90(img, 0)
img = np.array(img)
img = my_cvtColo(img,1)
target, im0 = predict.predict(img)
img_b64 = base64.b64encode(im0).decode('utf-8')
data = {
"img": img_b64,
"type": "Alarming",
"fire_flag": 'fire'
json_data = json.dumps(data).encode('utf-8')
if __name__ == "__main__":
IP = int(input("请输入服务器地址:"))
port = input("请输入服务器端口号:")
cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cs.connect((IP, port))