简单整理

yangshi_branch
yuanshi 2 years ago
parent 731e088be8
commit ed239f3912

@ -2,7 +2,7 @@ import math
import threading
import time
import contextlib
import logging
import cv2
import numpy as np
import torch
@ -20,6 +20,10 @@ import os
import nvidia_smi
from ctypes import windll
import math
import platform
import curses
from termcolor import colored, cprint
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0] # YOLOv5 root directory
@ -29,18 +33,54 @@ ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
# 清空命令指示符输出
def clear():
_ = os.system('cls')
def clear():
# 对于非 Windows 系统使用 ANSI 转义序列来清除屏幕
if platform.system() != 'Windows':
print("\033c")
return
try:
# 使用 curses 库来清除屏幕,从而避免使用 os.system() .
stdscr = curses.initscr()
curses.curs_set(0) # 隐藏光标
stdscr.clear() # 清空屏幕
stdscr.refresh() # 刷新屏幕
time.sleep(0.1) # 等待一会儿以确保清屏成功
except Exception as e:
logging.error(f"Clear screen failed with error: {e}")
# 引发异常以向调用者报告错误
finally:
# 恢复 curses 库的原始设置
curses.endwin()
# 添加一些额外的效果来增强用户体验(可选)
cprint(colored('屏幕已被清除!', 'green', attrs=['bold', 'underline']))
time.sleep(0.5)
cprint(colored('请稍等...', 'cyan', attrs=['blink', 'reverse']))
_cache = None
# 检查是否为管理员权限
def is_admin():
global _cache
if _cache is not None:
# 若缓存可用,则立即返回缓存结果
return _cache
try:
return windll.shell32.IsUserAnAdmin()
# 检查当前平台是否支持获取管理员权限
if os.name != 'nt':
raise OSError('Unsupported platform')
# 检查当前用户是否为管理员
is_admin = (os.getuid() == 0) or (os.system('net session >nul 2>&1') == 0)
_cache = is_admin # 缓存当前结果
return is_admin
except OSError as err:
print('OS error: {0}'.format(err))
return False
logging.error('Failed to check admin status: %s', err)
raise err # 抛出异常以引起关注
# 简单检查gpu是否够格
def check_gpu():
@ -56,124 +96,181 @@ def check_gpu():
return 1
return 0
#获取最接近 x 且可以被除数 divisor 整除的整数
def make_divisible(x, divisor):
# Returns nearest x divisible by divisor
# 取最大值并转换为整数类型
if isinstance(divisor, torch.Tensor):
divisor = int(divisor.max()) # to int
return math.ceil(x / divisor) * divisor
divisor = int(divisor.max().item())
# 计算最接近 x 且可以被除数 divisor 整除的整数
return math.ceil(x / divisor) * divisor
def check_img_size(imgsz, s=32, floor=0):
# Verify image size is a multiple of stride s in each dimension
if isinstance(imgsz, int): # integer i.e. img_size=640
# 验证图像大小在每个维度上是否都是 stride s 的倍数
if isinstance(imgsz, int):
# 整数类型,例如 img_size=640
new_size = max(make_divisible(imgsz, int(s)), floor)
else: # list i.e. img_size=[640, 480]
imgsz = list(imgsz) # convert to list if tuple
elif isinstance(imgsz, (list, tuple)) and len(imgsz) == 2:
# 列表或元组类型,例如 img_size=[640, 480]
new_size = [max(make_divisible(x, int(s)), floor) for x in imgsz]
else:
raise TypeError("imgsz 应该是一个整数或包含两个元素的列表或元组。")
if isinstance(new_size, int):
# 如果 new_size 是整数类型,则构造一个只有一个元素的列表
new_size = [new_size]
if new_size != imgsz:
LOGGER.warning(f'WARNING ⚠️ --img-size {imgsz} must be multiple of max stride {s}, updating to {new_size}')
LOGGER.warning(f'警告⚠️ -- 图像大小 {imgsz} 必须是 {s} 的倍数,已更新为 {new_size}')
return new_size
#将边界框 (xyxy 格式) 限制在图像大小内
def clip_boxes(boxes, shape):
# Clip boxes (xyxy) to image shape (height, width)
if isinstance(boxes, torch.Tensor): # faster individually
boxes[:, 0].clamp_(0, shape[1]) # x1
boxes[:, 1].clamp_(0, shape[0]) # y1
boxes[:, 2].clamp_(0, shape[1]) # x2
boxes[:, 3].clamp_(0, shape[0]) # y2
else: # np.array (faster grouped)
boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) # x1, x2
boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) # y1, y2
if isinstance(boxes, torch.Tensor):
# 判断输入类型是否为 torch.Tensor以提高处理速度
# 使用 torch.split 方法将 tensor 分割成 x_min、y_min、x_max、y_max 四个部分
x_min, y_min, x_max, y_max = torch.split(boxes, 1, dim=1)
# 使用 clamp_ 方法将 x_min、x_max、y_min、y_max 限制在给定形状范围内
x_min, x_max = x_min.clip(0, shape[1]), x_max.clip(0, shape[1])
y_min, y_max = y_min.clip(0, shape[0]), y_max.clip(0, shape[0])
# 使用 torch.cat 方法将四个部分拼接成新的 tensor
boxes = torch.cat([x_min, y_min, x_max, y_max], dim=1)
else:
# 对于 np.ndarray 类型,可以直接使用 numpy 的 vectorizing 方法进行限制范围
# 使用 clip 函数将 x_min、x_max、y_min、y_max 限制在给定形状范围内
boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], a_min=0, a_max=shape[1]) # x1, x2
boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], a_min=0, a_max=shape[0]) # y1, y2
return boxes
#将边界框 (xyxy 格式) 从 img1_shape 缩放到 img0_shape
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
# Rescale boxes (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
# 检查输入参数的正确性
if not isinstance(img1_shape, tuple) or len(img1_shape) != 2:
raise TypeError("img1_shape 应该是包含两个元素的元组,分别表示图像的高度和宽度。")
if not isinstance(img0_shape, tuple) or len(img0_shape) != 2:
raise TypeError("img0_shape 应该是包含两个元素的元组,分别表示图像的高度和宽度。")
if not isinstance(boxes, np.ndarray) or boxes.ndim != 2 or boxes.shape[1] != 4:
raise ValueError("boxes 应该是一个二维 numpy 数组,其形状为 [N, 4],其中 N 表示边界框数量。")
if ratio_pad is not None:
if not isinstance(ratio_pad, tuple) or len(ratio_pad) != 2:
raise ValueError("ratio_pad 应该是一个元组,包含两个元素,分别表示宽高比和填充大小。")
if not isinstance(ratio_pad[0], (int, float)):
raise TypeError("ratio_pad[0] 应该是一个整数或浮点数,用于表示缩放比例。")
if not isinstance(ratio_pad[1], tuple) or len(ratio_pad[1]) != 2:
raise ValueError("ratio_pad[1] 应该是一个包含两个元素的元组,分别表示宽度和高度填充大小。")
if not all(isinstance(i, (int, float)) for i in ratio_pad[1]):
raise TypeError("ratio_pad[1] 中的两个元素应该均为整数或浮点数,用于表示填充大小。")
# 复制边界框数组,以避免修改原始数据
boxes = boxes.copy()
# 计算宽高比和填充大小
if ratio_pad is None:
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])
pad_w = (img1_shape[1] - img0_shape[1] * gain) / 2
pad_h = (img1_shape[0] - img0_shape[0] * gain) / 2
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
gain = ratio_pad[0]
pad_w, pad_h = ratio_pad[1]
boxes[:, [0, 2]] -= pad[0] # x padding
boxes[:, [1, 3]] -= pad[1] # y padding
# 对边界框进行填充和缩放操作
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes[:, :4] /= gain
clip_boxes(boxes, img0_shape)
return boxes
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better val mAP)
r = min(r, 1.0)
# 对边界框的坐标进行限制范围,确保它们不会超出目标图像的大小
boxes = clip_boxes(boxes, img0_shape)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im, ratio, (dw, dh)
return boxes
#调整图像大小并填充边框以适应模型输入尺寸
def letterbox(image, target_size=(640, 640), color=(114, 114, 114), auto=True, scale_fill=False, scale_up=True, stride=32):
# 计算新的图像比例
height, width = image.shape[:2]
target_h, target_w = target_size
scale = min(target_h / height, target_w / width)
if not scale_up:
scale = min(scale, 1.0)
# 计算填充和缩放后的宽度和高度
new_w = round(width * scale)
new_h = round(height * scale)
dw = target_w - new_w
dh = target_h - new_h
# 如果需要,调整填充以便其尺寸是步幅的倍数
if auto:
dw = dw % stride
dh = dh % stride
# 如果需要,进行缩放和拉伸来填充目标形状
if scale_fill:
target_h = max(target_h, new_h)
target_w = max(target_w, new_w)
dw = (target_w - new_w) / 2
dh = (target_h - new_h) / 2
# 计算填充边框
top = round(dh - 0.1)
bottom = round(dh + 0.1)
left = round(dw - 0.1)
right = round(dw + 0.1)
# 进行填充并返回结果
image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
ratio = (new_w / width, new_h / height)
padding = (dw, dh)
return image, ratio, padding
#选择推理设备
def select_device(device='', batch_size=0, newline=True):
# device = None or 'cpu' or 0 or '0' or '0,1,2,3'
s = f'torch-{torch.__version__} '
device = str(device).strip().lower().replace('cuda:', '').replace('none', '') # to string, 'cuda:0' to '0'
# 转换 device 参数为字符串,'cuda:0' -> '0'
device = str(device).strip().lower().replace('cuda:', '').replace('none', '')
# 如果请求的是 CPU 或 MPS 等非 GPU 设备
cpu = device == 'cpu'
mps = device == 'mps' # Apple Metal Performance Shaders (MPS)
mps = device == 'mps'
if cpu or mps:
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # force torch.cuda.is_available() = False
elif device: # non-cpu device requested
os.environ['CUDA_VISIBLE_DEVICES'] = device # set environment variable - must be before assert is_available()
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # 禁止使用 GPU 加速
elif device:
# 请求的是 GPU 设备
os.environ['CUDA_VISIBLE_DEVICES'] = device # 设置 CUDA_VISIBLE_DEVICES 环境变量
assert torch.cuda.is_available() and torch.cuda.device_count() >= len(device.replace(',', '')), \
f"Invalid CUDA '--device {device}' requested, use '--device cpu' or pass valid CUDA device(s)"
if not cpu and not mps and torch.cuda.is_available(): # prefer GPU if available
devices = device.split(',') if device else '0' # range(torch.cuda.device_count()) # i.e. 0,1,6,7
n = len(devices) # device count
if n > 1 and batch_size > 0: # check batch_size is divisible by device_count
# 选择计算设备
if not cpu and not mps and torch.cuda.is_available(): # 优先使用 GPU
devices = device.split(',') if device else '0' # 可选设备编号列表,例如 '0, 1'
n = len(devices) # 设备数量
if n > 1 and batch_size > 0: # 确保 batch_size 是设备数量的倍数
assert batch_size % n == 0, f'batch-size {batch_size} not multiple of GPU count {n}'
space = ' ' * (len(s) + 1)
for i, d in enumerate(devices):
p = torch.cuda.get_device_properties(i)
s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / (1 << 20):.0f}MiB)\n" # bytes to MB
s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / (1 << 20):.0f}MiB)\n"
arg = 'cuda:0'
elif mps and getattr(torch, 'has_mps', False) and torch.backends.mps.is_available(): # prefer MPS if available
elif mps and getattr(torch, 'has_mps', False) and torch.backends.mps.is_available(): # 如果可用,优先使用 MPS
s += 'MPS\n'
arg = 'mps'
else: # revert to CPU
else: # 否则回退到 CPU
s += 'CPU\n'
arg = 'cpu'
# 日志输出设备信息
if not newline:
s = s.rstrip()
LOGGER.info(s)
return torch.device(arg)
print(s)
# 返回所选设备的 PyTorch 设备对象
return torch.device(arg)
class YOLO:
# 将参数初始化工作提取出来
def __init__(self,
path,
device,
@ -186,7 +283,7 @@ class YOLO:
dnn=False,
agnostic_nms=False):
self.half = half
self.device = torch.device('cuda:0')
self.device = torch.device(device)
self.conf = conf
self.iou_thres = iou
self.agnostic_nms = agnostic_nms
@ -198,55 +295,43 @@ class YOLO:
self.img_size = check_img_size(imgsz, s=self.stride) # check image size
if self.pt:
model.model.half() if half else model.model.float()
if half:
dtype = torch.float16
else:
dtype = torch.float32
dtype = torch.float16 if half else torch.float32
model(torch.zeros(1, 3, *self.img_size).to(device).type(dtype)) # warmup
self.model = model
self.classes = classes
@torch.no_grad()
def predict(self, im):
def predict(self, im, window_name='UAV'):
# Load model
src_shape = im.shape
model = self.model
# Half
half = self.half # half precision only supported by PyTorch on CUDA
device = self.device
img = letterbox(im, self.img_size, stride=self.stride, auto=True)[0]
# Convert
img = img.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB
img = np.ascontiguousarray(img)
# 图像预处理
img = preprocess_image(im, self.img_size, self.stride)
im = torch.from_numpy(img).to(device)
im = im.half() if half else im.float() # uint8 to fp16/32
im /= 255 # 0 - 255 to 0.0 - 1.0
im = im.half() if half else im.float()
im /= 255
if len(im.shape) == 3:
im = im[None] # expand for batch dim
im = im[None]
# Inference
pred = model(im)
# NMS
pred = non_max_suppression(pred, self.conf, self.iou_thres, self.classes, self.agnostic_nms,
max_det=self.max_det)
# if not len(det):
# return [], [], []
# det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], im.shape).round()
# 新建 annotator 对象并在循环内不断更新
annotator = Annotator(im.squeeze(0).copy(), line_width=2)
for i, det in enumerate(pred):
# 画框
annotator = Annotator(img, line_width=2)
if len(det):
target_list = []
result = "fire"
# 将转换后的图片画框结果转换成原图上的结果
det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], img.shape).round()
for *xyxy, conf, cls in reversed(det): # 处理推理出来每个目标的信息
# 将xyxy(左上角+右下角)格式转为xywh(中心点+宽长)格式并除上wh做归一化转化为列表再保存
for *xyxy, conf, cls in reversed(det):
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4))).view(-1).tolist() # normalized xywh
# if names[int(cls)]=='':
# result = "fire"
# type = "Alarming"
annotator.box_label(xyxy, label=f'[{YOLO.names[int(cls)]} {conf:.2f}]',
color=(34, 139, 34),
txt_color=(0, 191, 255))
@ -254,58 +339,175 @@ class YOLO:
print('\033[0;31;40m' + f' 发现火情 ' + '\033[0m')
im0 = annotator.result()
cv2.imshow('UAV', im0)
cv2.imshow(window_name, im0)
cv2.waitKey(1)
return target_list, im0
# 将图像预处理部分提取成函数
def preprocess_image(im, img_size, stride):
src_shape = im.shape
# 修改到 1x3x416x416
img = letterbox(im, img_size, stride=stride, auto=True)[0]
# Convert
img = img.transpose((2, 0, 1))[::-1] # HWC to CHW, BGR to RGB
# 获取 416x416 大小的图片
img = np.ascontiguousarray(img)
return img
class PID:
def __init__(self, p, i, d, set_value):
def __init__(self, p, i, d, set_value, min_out=None, max_out=None):
"""
初始化 PID 控制器参数
:param p: 比例系数
:param i: 积分系数
:param d: 微分系数
:param set_value: 目标值
:param min_out: 输出最小值可选
:param max_out: 输出最大值可选
"""
self.kp = p
self.ki = i
self.kd = d
self.setValue = set_value # 目标值
self.lastErr = 0 # 上一次误差
self.preLastErr = 0 # 临时存误差
self.errSum = 0 # 误差总和
self.set_value = set_value
self.min_out = min_out
self.max_out = max_out
self.last_err = 0 # 上一次误差
self.err_sum = 0 # 误差总和
self.cur_time = time.monotonic() # 当前时间
# 增量式PID
def pid_increment(self, cur_value):
"""
实现增量式 PID 控制
:param cur_value: 当前值
:return: PID 输出
"""
err = self.set_value - cur_value
self.err_sum += err
diff_err = err - self.last_err
self.last_err = err
p_out = self.kp * err
i_out = self.ki * self.err_sum
d_out = self.kd * diff_err
out_pid = p_out + i_out + d_out
# 对输出进行限幅操作
if self.min_out is not None and out_pid < self.min_out:
out_pid = self.min_out
if self.max_out is not None and out_pid > self.max_out:
out_pid = self.max_out
return out_pid
# 位置式PID
def pidPosition(self, curValue):
err = self.setValue - curValue
dErr = err - self.lastErr
self.preLastErr = self.lastErr
self.lastErr = err
self.errSum += err
outPID = self.kp * err + (self.ki * self.errSum) + (self.kd * dErr)
return outPID
def pid_position(self, cur_value):
"""
实现位置式 PID 控制
:param cur_value: 当前值
:return: PID 输出
"""
err = self.set_value - cur_value
d_err = (err - self.last_err) / (time.monotonic() - self.cur_time) # 计算微分项
self.err_sum += err
out_pid = self.kp * err + self.ki * self.err_sum + self.kd * d_err
# 对输出进行限幅操作
if self.min_out is not None and out_pid < self.min_out:
out_pid = self.min_out
if self.max_out is not None and out_pid > self.max_out:
out_pid = self.max_out
# #设置时延
def delayMsecond(t): # t的单位0.1ms
start, end = 0, 0
start = time.perf_counter() * pow(10, 7)
while (end - start < t * pow(10, 3)):
end = time.perf_counter() * pow(10, 7)
self.last_err = err
self.cur_time = time.monotonic()
return out_pid
# #设置时延
def delay_milliseconds(t):
"""
延时函数参数 t 表示延时毫秒数
"""
start = time.perf_counter()
while True:
end = time.perf_counter()
if (end - start) * 1000 >= t:
break
# 连接摄像头类
class Capture:
def __init__(self,
ip="http://admin:admin@192.168.8.126:8081"):
self.ip = ip
self.cap = cv2.VideoCapture(self.ip)
def __init__(self, url='http://admin:admin@192.168.8.126:8081'):
self.url = url
self.cap = None
def open(self):
if self.cap is None:
self.cap = cv2.VideoCapture(self.url)
if not self.cap.isOpened():
raise Exception(f"Cannot open video stream from {self.url}")
def close(self):
if self.cap is not None:
self.cap.release()
self.cap = None
def read(self):
if self.cap is None:
self.open()
ret, img = self.cap.read()
if not ret:
# 发生错误时尝试重连一次
self.close()
self.open()
ret, img = self.cap.read()
if not ret:
raise Exception("Failed to read video frame")
return img
def __del__(self):
self.close()
#图像色彩通道转换
def my_cvtColor(img, code):
choice = {
0: cv2.COLOR_BGRA2BGR,
1: cv2.COLOR_BGR2GRAY,
2: cv2.COLOR_BGRA2RGB,
3: cv2.COLOR_BGRA2RGBA
}
if not isinstance(img, np.ndarray):
raise TypeError("The input image is not a numpy array")
if code not in choice.keys():
raise ValueError("Invalid color conversion code")
# 先判断原图是否为 BGRA/RGBA 格式,在进行颜色转换
if img.ndim == 3 and img.shape[2] == 4:
img = cv2.cvtColor(img, choice[code])
else:
img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
img = cv2.cvtColor(img, choice[code])
def my_cvtColo(img, code):
choice = ["COLOR_BGRA2BGR", "cv2.COLOR_BGR2GRAY", "COLOR_BGRA2RGB", "COLOR_BGRA2RGBA"]
img = cv2.cvtColor(img,choice[code])
return img
#检查数据正确性
def check_data(arr):
try:
iter(arr) # 检查是否可迭代
if len(arr) == 0: # 检查长度是否为0
return True
else:
return False
except TypeError: # 不可迭代的情况
return False
def main():
# cap = cv2.VideoCapture("http://admin:admin@192.168.8.126:8081")
# print("图像加载成功")
@ -322,20 +524,31 @@ def main():
img = cap.read()
img = np.rot90(img, 0)
img = np.array(img)
img = my_cvtColo(img,1)
img = my_cvtColor(img,1)
target, im0 = predict.predict(img)
img_b64 = base64.b64encode(im0).decode('utf-8')
print(img_b64)
data = {
"img": img_b64,
"type": "Alarming",
"fire_flag": 'fire'
}
json_data = json.dumps(data).encode('utf-8')
cs.send(json_data)
delayMsecond(100)
if check_data(target):
Fire_centX = target[0][0]
Fire_centY = target[0][1]
Fire_W = target[0][2]
Fire_H = target[0][3]
# print(img_b64)
data = {
"img": img_b64,
"type": "Alarming",
"fire_flag": "fire",
"cent_x": Fire_centX,
"cent_y": Fire_centY,
"length": Fire_W,
"width": Fire_H
}
json_data = json.dumps(data).encode('utf-8')
cs.send(json_data)
delay_milliseconds(100)
else:
continue
if __name__ == "__main__":
IP = int(input("请输入服务器地址:"))

Loading…
Cancel
Save