489 lines
17 KiB
Python
489 lines
17 KiB
Python
# from turtle import right
|
||
import cv2
|
||
import threading
|
||
import sys
|
||
import os
|
||
import argparse
|
||
import numpy as np
|
||
from surround_view import FisheyeCameraModel, BirdView
|
||
import surround_view.param_settings as settings
|
||
|
||
sys.path.append(os.path.dirname(__file__)) # 确保能导入 py_utils
|
||
from py_utils.coco_utils import COCO_test_helper
|
||
from py_utils.rknn_executor import RKNN_model_container # 假设使用 RKNN
|
||
|
||
|
||
|
||
# ------YOLO 配置-----------
|
||
|
||
# YOLO 配置
|
||
YOLO_MODEL_PATH = './yolov5s-640-640.rknn'
|
||
OBJ_THRESH = 0.6
|
||
NMS_THRESH = 0.6
|
||
IMG_SIZE = (640, 640) # (w, h)
|
||
CLASSES = ("person",) # 只关心 person
|
||
|
||
# 加载 anchors
|
||
ANCHORS_FILE = './model/anchors_yolov5.txt'
|
||
with open(ANCHORS_FILE, 'r') as f:
|
||
values = [float(_v) for _v in f.readlines()]
|
||
ANCHORS = np.array(values).reshape(3, -1, 2).tolist()
|
||
|
||
|
||
# ---------- YOLO 处理函数 ----------
|
||
def filter_boxes(boxes, box_confidences, box_class_probs):
|
||
box_confidences = box_confidences.reshape(-1)
|
||
class_max_score = np.max(box_class_probs, axis=-1)
|
||
classes = np.argmax(box_class_probs, axis=-1)
|
||
|
||
_class_pos = np.where(class_max_score * box_confidences >= OBJ_THRESH)
|
||
scores = (class_max_score * box_confidences)[_class_pos]
|
||
|
||
boxes = boxes[_class_pos]
|
||
classes = classes[_class_pos]
|
||
|
||
return boxes, classes, scores
|
||
|
||
def nms_boxes(boxes, scores):
|
||
x = boxes[:, 0]
|
||
y = boxes[:, 1]
|
||
w = boxes[:, 2] - boxes[:, 0]
|
||
h = boxes[:, 3] - boxes[:, 1]
|
||
|
||
areas = w * h
|
||
order = scores.argsort()[::-1]
|
||
|
||
keep = []
|
||
while order.size > 0:
|
||
i = order[0]
|
||
keep.append(i)
|
||
|
||
xx1 = np.maximum(x[i], x[order[1:]])
|
||
yy1 = np.maximum(y[i], y[order[1:]])
|
||
xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]])
|
||
yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]])
|
||
|
||
w1 = np.maximum(0.0, xx2 - xx1 + 0.00001)
|
||
h1 = np.maximum(0.0, yy2 - yy1 + 0.00001)
|
||
inter = w1 * h1
|
||
|
||
ovr = inter / (areas[i] + areas[order[1:]] - inter)
|
||
inds = np.where(ovr <= NMS_THRESH)[0]
|
||
order = order[inds + 1]
|
||
keep = np.array(keep)
|
||
return keep
|
||
|
||
def box_process(position, anchors):
|
||
grid_h, grid_w = position.shape[2:4]
|
||
col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h))
|
||
col = col.reshape(1, 1, grid_h, grid_w)
|
||
row = row.reshape(1, 1, grid_h, grid_w)
|
||
grid = np.concatenate((col, row), axis=1)
|
||
stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1, 2, 1, 1)
|
||
|
||
col = col.repeat(len(anchors), axis=0)
|
||
row = row.repeat(len(anchors), axis=0)
|
||
anchors = np.array(anchors)
|
||
anchors = anchors.reshape(*anchors.shape, 1, 1)
|
||
|
||
box_xy = position[:, :2, :, :] * 2 - 0.5
|
||
box_wh = pow(position[:, 2:4, :, :] * 2, 2) * anchors
|
||
|
||
box_xy += grid
|
||
box_xy *= stride
|
||
box = np.concatenate((box_xy, box_wh), axis=1)
|
||
|
||
xyxy = np.copy(box)
|
||
xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2
|
||
xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2
|
||
xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2
|
||
xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2
|
||
|
||
return xyxy
|
||
|
||
def post_process(input_data, anchors):
|
||
boxes, scores, classes_conf = [], [], []
|
||
input_data = [_in.reshape([len(anchors[0]), -1] + list(_in.shape[-2:])) for _in in input_data]
|
||
for i in range(len(input_data)):
|
||
boxes.append(box_process(input_data[i][:, :4, :, :], anchors[i]))
|
||
scores.append(input_data[i][:, 4:5, :, :])
|
||
classes_conf.append(input_data[i][:, 5:, :, :])
|
||
|
||
def sp_flatten(_in):
|
||
ch = _in.shape[1]
|
||
_in = _in.transpose(0, 2, 3, 1)
|
||
return _in.reshape(-1, ch)
|
||
|
||
boxes = [sp_flatten(_v) for _v in boxes]
|
||
classes_conf = [sp_flatten(_v) for _v in classes_conf]
|
||
scores = [sp_flatten(_v) for _v in scores]
|
||
|
||
boxes = np.concatenate(boxes)
|
||
classes_conf = np.concatenate(classes_conf)
|
||
scores = np.concatenate(scores)
|
||
|
||
boxes, classes, scores = filter_boxes(boxes, scores, classes_conf)
|
||
|
||
nboxes, nclasses, nscores = [], [], []
|
||
for c in set(classes):
|
||
inds = np.where(classes == c)
|
||
b = boxes[inds]
|
||
c = classes[inds]
|
||
s = scores[inds]
|
||
keep = nms_boxes(b, s)
|
||
|
||
if len(keep) != 0:
|
||
nboxes.append(b[keep])
|
||
nclasses.append(c[keep])
|
||
nscores.append(s[keep])
|
||
|
||
if not nclasses and not nscores:
|
||
return None, None, None
|
||
|
||
boxes = np.concatenate(nboxes)
|
||
classes = np.concatenate(nclasses)
|
||
scores = np.concatenate(nscores)
|
||
|
||
return boxes, classes, scores
|
||
|
||
def draw_detections(image, boxes, scores, classes):
|
||
"""在图像上绘制检测框"""
|
||
if boxes is None:
|
||
return image
|
||
|
||
for box, score, cl in zip(boxes, scores, classes):
|
||
# 只绘制人的检测框
|
||
if CLASSES[cl] != "person":
|
||
continue
|
||
|
||
top, left, right, bottom = [int(_b) for _b in box]
|
||
|
||
# 绘制矩形框
|
||
cv2.rectangle(image, (top, left), (right, bottom), (0, 255, 0), 20)
|
||
|
||
# 绘制标签背景
|
||
label = f'person: {score:.2f}'
|
||
(label_width, label_height), baseline = cv2.getTextSize(
|
||
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 15
|
||
)
|
||
|
||
# 绘制标签矩形
|
||
cv2.rectangle(
|
||
image,
|
||
(top, left - label_height - 5),
|
||
(top + label_width, left),
|
||
(0, 255, 0),
|
||
-1
|
||
)
|
||
|
||
# # 绘制标签文字
|
||
cv2.putText(
|
||
image,
|
||
label,
|
||
(top, left - 5),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.5,
|
||
(0, 0, 0),
|
||
2
|
||
)
|
||
|
||
return image
|
||
|
||
# ------------------------
|
||
|
||
class MultiCameraBirdView:
|
||
def __init__(self):
|
||
self.running = True
|
||
self.names = settings.camera_names # e.g., ['front', 'back', 'left', 'right']
|
||
self.yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in self.names]
|
||
self.camera_models = [
|
||
FisheyeCameraModel(camera_file, camera_name)
|
||
for camera_file, camera_name in zip(self.yamls, self.names)
|
||
]
|
||
|
||
# 摄像头设备映射
|
||
self.which_cameras = {"front": 0, "back": 2, "left": 1, "right": 3}
|
||
self.caps = []
|
||
|
||
print("[INFO] 初始化摄像头...")
|
||
for name in self.names:
|
||
cap = cv2.VideoCapture(self.which_cameras[name], cv2.CAP_ANY)
|
||
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
|
||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
|
||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
|
||
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 最小缓冲,降低延迟
|
||
if not cap.isOpened():
|
||
print(f"[ERROR] 无法打开 {name} 摄像头 (设备 {self.which_cameras[name]})", file=sys.stderr)
|
||
self.running = False
|
||
return
|
||
self.caps.append(cap)
|
||
|
||
|
||
self.birdview = BirdView()
|
||
self._initialize_weights()
|
||
|
||
# 新增:预警状态
|
||
self.alerts = {
|
||
"front": False,
|
||
"back": False,
|
||
"left": False,
|
||
"right": False
|
||
}
|
||
|
||
# === 新增:YOLO 人体检测模型 ===
|
||
try:
|
||
self.yolo_model = RKNN_model_container(YOLO_MODEL_PATH, target='rk3588')
|
||
print("[INFO] YOLO 模型加载成功")
|
||
# 初始化COCO助手用于图像预处理
|
||
self.co_helper = COCO_test_helper(enable_letter_box=True)
|
||
except Exception as e:
|
||
print(f"[ERROR] YOLO 模型加载失败: {e}")
|
||
self.yolo_model = None
|
||
|
||
def overlay_alert(self, birdview_img):
|
||
"""在鸟瞰图上叠加半透明红色预警区域"""
|
||
h, w = birdview_img.shape[:2]
|
||
overlay = birdview_img.copy()
|
||
|
||
alpha = 0.2 # 透明度
|
||
red = (0, 0, 200)
|
||
|
||
margin_f_b = int(min(h, w) * 0.07) # 预警区域宽度(约7%)前后
|
||
margin_l_r = int(min(h, w) * 0.15) # 预警区域宽度(约15%)左右
|
||
|
||
if self.alerts["front"]:
|
||
cv2.rectangle(overlay, (0, 0), (w, margin_f_b), red, -1)
|
||
if self.alerts["back"]:
|
||
cv2.rectangle(overlay, (0, h - margin_f_b), (w, h), red, -1)
|
||
if self.alerts["left"]:
|
||
cv2.rectangle(overlay, (0, 0), (margin_l_r, h), red, -1)
|
||
if self.alerts["right"]:
|
||
cv2.rectangle(overlay, (w - margin_l_r, 0), (w, h), red, -1)
|
||
|
||
# 混合原图与覆盖层
|
||
blended = cv2.addWeighted(birdview_img, 1 - alpha, overlay, alpha, 0)
|
||
return blended
|
||
|
||
def detect_persons(self, image):
|
||
"""使用YOLO模型检测图像中的人体"""
|
||
if self.yolo_model is None:
|
||
return image, [], []
|
||
|
||
try:
|
||
# 保存原始图像尺寸
|
||
orig_h, orig_w = image.shape[:2]
|
||
|
||
# 预处理图像
|
||
pad_color = (0, 0, 0)
|
||
img_preprocessed = self.co_helper.letter_box(
|
||
im=image.copy(),
|
||
new_shape=(IMG_SIZE[1], IMG_SIZE[0]),
|
||
pad_color=pad_color
|
||
)
|
||
img_preprocessed = cv2.cvtColor(img_preprocessed, cv2.COLOR_BGR2RGB)
|
||
|
||
# 推理
|
||
outputs = self.yolo_model.run([np.expand_dims(img_preprocessed, 0)])
|
||
|
||
# 后处理
|
||
boxes, classes, scores = post_process(outputs, ANCHORS)
|
||
|
||
if boxes is not None:
|
||
# 将检测框转换回原始图像坐标
|
||
real_boxes = self.co_helper.get_real_box(boxes)
|
||
|
||
# 筛选出人的检测框
|
||
person_boxes = []
|
||
person_scores = []
|
||
|
||
for i in range(len(real_boxes)):
|
||
if classes[i] < len(CLASSES) and CLASSES[classes[i]] == "person":
|
||
# 确保坐标在图像范围内
|
||
box = real_boxes[i].copy()
|
||
box[0] = max(0, min(box[0], orig_w))
|
||
box[1] = max(0, min(box[1], orig_h))
|
||
box[2] = max(0, min(box[2], orig_w))
|
||
box[3] = max(0, min(box[3], orig_h))
|
||
|
||
person_boxes.append(box)
|
||
person_scores.append(scores[i])
|
||
|
||
# 在图像上绘制检测框
|
||
if person_boxes:
|
||
image = draw_detections(image, np.array(person_boxes),
|
||
np.array(person_scores),
|
||
np.zeros(len(person_boxes), dtype=int))
|
||
|
||
# 打印检测信息
|
||
# print(f"[YOLO] 检测到 {len(person_boxes)} 个人体")
|
||
# for box, score in zip(person_boxes, person_scores):
|
||
# print(f" 位置: ({int(box[0])}, {int(box[1])}, {int(box[2])}, {int(box[3])}), 置信度: {score:.2f}")
|
||
|
||
return image, person_boxes, person_scores
|
||
else:
|
||
return image, [], []
|
||
|
||
except Exception as e:
|
||
print(f"[ERROR] YOLO检测失败: {e}")
|
||
return image, [], []
|
||
|
||
def _initialize_weights(self):
|
||
try:
|
||
images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names]
|
||
static_frames = []
|
||
for img_path, cam_model in zip(images, self.camera_models):
|
||
img = cv2.imread(img_path)
|
||
if img is None:
|
||
print(f"[WARN] 静态图缺失: {img_path},用黑图代替")
|
||
img = np.zeros((1080, 1920, 3), dtype=np.uint8)
|
||
img = cam_model.undistort(img)
|
||
img = cam_model.project(img)
|
||
img = cam_model.flip(img)
|
||
static_frames.append(img)
|
||
if len(static_frames) == 4:
|
||
self.birdview.get_weights_and_masks(static_frames)
|
||
print("[INFO] 权重矩阵初始化成功")
|
||
except Exception as e:
|
||
print(f"[ERROR] 权重初始化失败: {e}", file=sys.stderr)
|
||
|
||
def process_frame_once(self, frame, model):
|
||
"""只处理一次:去畸变 + 投影 + 翻转"""
|
||
frame = model.undistort(frame)
|
||
frame = model.project(frame)
|
||
frame = model.flip(frame)
|
||
return frame
|
||
|
||
def process_frame_undistort(self, frame, model):
|
||
"""只处理一次:去畸变 + 投影 + 翻转"""
|
||
frame = model.undistort(frame)
|
||
return frame
|
||
|
||
def run(self):
|
||
current_view = "front"
|
||
frame_count = 0
|
||
detection_interval = 3 # 每5帧进行一次检测,避免性能问题
|
||
|
||
while self.running:
|
||
raw_frames = {}
|
||
processed_frames = []
|
||
valid = True
|
||
|
||
for i, (cap, model, name) in enumerate(zip(self.caps, self.camera_models, self.names)):
|
||
ret, frame = cap.read()
|
||
if not ret or frame is None:
|
||
print(f"[WARN] 跳过 {name} 帧")
|
||
valid = False
|
||
break
|
||
raw_frames[name] = frame.copy()
|
||
p_frame = self.process_frame_once(frame, model)
|
||
processed_frames.append(p_frame)
|
||
|
||
if not valid or len(processed_frames) != 4:
|
||
continue
|
||
|
||
# 更新鸟瞰图
|
||
self.birdview.update_frames(processed_frames)
|
||
self.birdview.stitch_all_parts()
|
||
self.birdview.make_white_balance()
|
||
self.birdview.copy_car_image()
|
||
|
||
# 获取单路图像(仅去畸变)
|
||
single_img = self.process_frame_undistort(
|
||
raw_frames[current_view],
|
||
self.camera_models[self.names.index(current_view)]
|
||
)
|
||
|
||
# 在单路图像上进行人体检测
|
||
frame_count += 1
|
||
if frame_count % detection_interval == 0 and self.yolo_model is not None:
|
||
single_img, person_boxes, person_scores = self.detect_persons(single_img)
|
||
|
||
# 根据检测结果自动触发预警
|
||
if person_boxes:
|
||
# 可以根据人体的位置和数量来触发预警
|
||
# 这里简单示例:只要检测到人就触发当前视图的预警
|
||
self.alerts[current_view] = True
|
||
# 重置其他视图的预警
|
||
for view in self.alerts:
|
||
if view != current_view:
|
||
self.alerts[view] = False
|
||
else:
|
||
# 没有检测到人,清除所有预警
|
||
for view in self.alerts:
|
||
self.alerts[view] = False
|
||
|
||
birdview_img = self.birdview.image.copy()
|
||
|
||
# 叠加预警区域
|
||
birdview_with_alert = self.overlay_alert(birdview_img)
|
||
|
||
# 拼接显示:左侧鸟瞰图(1/3),右侧单路(2/3)
|
||
h_display, w_display = 720, 1280
|
||
w_bird = w_display // 3
|
||
w_single = w_display - w_bird
|
||
|
||
bird_resized = cv2.resize(birdview_with_alert, (w_bird, h_display))
|
||
single_resized = cv2.resize(single_img, (w_single, h_display))
|
||
display = np.hstack((bird_resized, single_resized))
|
||
|
||
# 在显示窗口上添加状态信息
|
||
info_text = f"View: {current_view} | Persons detected: {len(person_boxes) if 'person_boxes' in locals() else 0}"
|
||
cv2.putText(display, info_text, (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
|
||
|
||
# 全屏显示
|
||
cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN)
|
||
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
||
cv2.imshow("Video", display)
|
||
|
||
key = cv2.waitKey(1) & 0xFF
|
||
if key == ord('q'):
|
||
self.running = False
|
||
break
|
||
elif key == ord('1'):
|
||
current_view = "front"
|
||
elif key == ord('2'):
|
||
current_view = "back"
|
||
elif key == ord('3'):
|
||
current_view = "left"
|
||
elif key == ord('4'):
|
||
current_view = "right"
|
||
# 新增:预警控制
|
||
elif key == ord('5'):
|
||
self.alerts["front"] = True
|
||
elif key == ord('6'):
|
||
self.alerts["back"] = True
|
||
elif key == ord('7'):
|
||
self.alerts["left"] = True
|
||
elif key == ord('8'):
|
||
self.alerts["right"] = True
|
||
elif key == ord('0'):
|
||
# 清除所有预警
|
||
for k in self.alerts:
|
||
self.alerts[k] = False
|
||
elif key == ord('d'):
|
||
# 手动触发一次检测
|
||
single_img, person_boxes, person_scores = self.detect_persons(single_img)
|
||
|
||
for cap in self.caps:
|
||
cap.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
|
||
def main():
|
||
print("🚀 启动实时四路环视系统...")
|
||
print("操作说明:")
|
||
print(" 1-4: 切换单路视图(前/后/左/右)")
|
||
print(" 5-8: 触发前/后/左/右 接近预警")
|
||
print(" 0 : 清除所有预警")
|
||
print(" d : 手动触发人体检测")
|
||
print(" q : 退出程序")
|
||
multi_cam = MultiCameraBirdView()
|
||
if multi_cam.running:
|
||
multi_cam.run()
|
||
else:
|
||
print("[ERROR] 摄像头初始化失败")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |