# from turtle import right import cv2 import threading import sys import os import argparse import numpy as np from surround_view import FisheyeCameraModel, BirdView import surround_view.param_settings as settings sys.path.append(os.path.dirname(__file__)) # 确保能导入 py_utils from py_utils.coco_utils import COCO_test_helper from py_utils.rknn_executor import RKNN_model_container # 假设使用 RKNN from mjpeg_streamer import MJPEGServer from multiprocessing import Pool, Manager import cv2 # 启用 OpenCL(Mali-G610) if cv2.ocl.haveOpenCL(): cv2.ocl.setUseOpenCL(True) print("✅ OpenCL is ON — using Mali-G610 GPU for acceleration") else: print("⚠️ OpenCL not available") # ------YOLO 配置----------- # YOLO 配置 YOLO_MODEL_PATH = './yolov5s-640-640.rknn' OBJ_THRESH = 0.6 NMS_THRESH = 0.6 IMG_SIZE = (640, 640) # (w, h) CLASSES = ("person",) # 只关心 person # 加载 anchors ANCHORS_FILE = './model/anchors_yolov5.txt' with open(ANCHORS_FILE, 'r') as f: values = [float(_v) for _v in f.readlines()] ANCHORS = np.array(values).reshape(3, -1, 2).tolist() import threading import numpy as np class SharedFrameBuffer: def __init__(self): self._frame = None self._lock = threading.Lock() self._has_frame = False def update(self, frame: np.ndarray): """主线程调用:更新最新帧""" with self._lock: self._frame = frame.copy() self._has_frame = True def get_frame(self): """YUYV 线程调用:获取最新帧""" with self._lock: if self._has_frame and self._frame is not None: return True, self._frame.copy() else: return False, None # ---------- YOLO 处理函数 ---------- def filter_boxes(boxes, box_confidences, box_class_probs): box_confidences = box_confidences.reshape(-1) class_max_score = np.max(box_class_probs, axis=-1) classes = np.argmax(box_class_probs, axis=-1) _class_pos = np.where(class_max_score * box_confidences >= OBJ_THRESH) scores = (class_max_score * box_confidences)[_class_pos] boxes = boxes[_class_pos] classes = classes[_class_pos] return boxes, classes, scores def nms_boxes(boxes, scores): x = boxes[:, 0] y = boxes[:, 1] w = boxes[:, 2] - boxes[:, 0] h = boxes[:, 3] - boxes[:, 1] areas = w * h order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x[i], x[order[1:]]) yy1 = np.maximum(y[i], y[order[1:]]) xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]]) yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]]) w1 = np.maximum(0.0, xx2 - xx1 + 0.00001) h1 = np.maximum(0.0, yy2 - yy1 + 0.00001) inter = w1 * h1 ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= NMS_THRESH)[0] order = order[inds + 1] keep = np.array(keep) return keep def box_process(position, anchors): grid_h, grid_w = position.shape[2:4] col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h)) col = col.reshape(1, 1, grid_h, grid_w) row = row.reshape(1, 1, grid_h, grid_w) grid = np.concatenate((col, row), axis=1) stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1, 2, 1, 1) col = col.repeat(len(anchors), axis=0) row = row.repeat(len(anchors), axis=0) anchors = np.array(anchors) anchors = anchors.reshape(*anchors.shape, 1, 1) box_xy = position[:, :2, :, :] * 2 - 0.5 box_wh = pow(position[:, 2:4, :, :] * 2, 2) * anchors box_xy += grid box_xy *= stride box = np.concatenate((box_xy, box_wh), axis=1) xyxy = np.copy(box) xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2 xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2 xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2 xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2 return xyxy def post_process(input_data, anchors): boxes, scores, classes_conf = [], [], [] input_data = [_in.reshape([len(anchors[0]), -1] + list(_in.shape[-2:])) for _in in input_data] for i in range(len(input_data)): boxes.append(box_process(input_data[i][:, :4, :, :], anchors[i])) scores.append(input_data[i][:, 4:5, :, :]) classes_conf.append(input_data[i][:, 5:, :, :]) def sp_flatten(_in): ch = _in.shape[1] _in = _in.transpose(0, 2, 3, 1) return _in.reshape(-1, ch) boxes = [sp_flatten(_v) for _v in boxes] classes_conf = [sp_flatten(_v) for _v in classes_conf] scores = [sp_flatten(_v) for _v in scores] boxes = np.concatenate(boxes) classes_conf = np.concatenate(classes_conf) scores = np.concatenate(scores) boxes, classes, scores = filter_boxes(boxes, scores, classes_conf) nboxes, nclasses, nscores = [], [], [] for c in set(classes): inds = np.where(classes == c) b = boxes[inds] c = classes[inds] s = scores[inds] keep = nms_boxes(b, s) if len(keep) != 0: nboxes.append(b[keep]) nclasses.append(c[keep]) nscores.append(s[keep]) if not nclasses and not nscores: return None, None, None boxes = np.concatenate(nboxes) classes = np.concatenate(nclasses) scores = np.concatenate(nscores) return boxes, classes, scores def draw_detections(image, boxes, scores, classes): """在图像上绘制检测框""" if boxes is None: return image for box, score, cl in zip(boxes, scores, classes): # 只绘制人的检测框 if CLASSES[cl] != "person": continue top, left, right, bottom = [int(_b) for _b in box] # 绘制矩形框 cv2.rectangle(image, (top, left), (right, bottom), (0, 255, 0), 20) # 绘制标签背景 label = f'person: {score:.2f}' (label_width, label_height), baseline = cv2.getTextSize( label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 15 ) # 绘制标签矩形 cv2.rectangle( image, (top, left - label_height - 5), (top + label_width, left), (0, 255, 0), -1 ) # # 绘制标签文字 cv2.putText( image, label, (top, left - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2 ) return image # ------------------------ class MultiCameraBirdView: def __init__(self): self.running = True self.names = settings.camera_names # e.g., ['front', 'back', 'left', 'right'] self.yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in self.names] self.camera_models = [ FisheyeCameraModel(camera_file, camera_name) for camera_file, camera_name in zip(self.yamls, self.names) ] # 摄像头设备映射 self.which_cameras = {"front": 0, "back": 2, "left": 1, "right": 3} self.caps = [] print("[INFO] 初始化摄像头...") for name in self.names: cap = cv2.VideoCapture(self.which_cameras[name], cv2.CAP_V4L2) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV")) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 最小缓冲,降低延迟 # cap.set(cv2.CAP_PROP_FPS, 30) # 设置帧率为 30 FPS if not cap.isOpened(): print(f"[ERROR] 无法打开 {name} 摄像头 (设备 {self.which_cameras[name]})", file=sys.stderr) self.running = False return self.caps.append(cap) self.birdview = BirdView() self._initialize_weights() # 新增:预警状态 self.alerts = { "front": False, "back": False, "left": False, "right": False } # === 新增:YOLO 人体检测模型 === try: self.yolo_model = RKNN_model_container(YOLO_MODEL_PATH, target='rk3588') print("[INFO] YOLO 模型加载成功") # 初始化COCO助手用于图像预处理 self.co_helper = COCO_test_helper(enable_letter_box=True) except Exception as e: print(f"[ERROR] YOLO 模型加载失败: {e}") self.yolo_model = None self.shared_buffer = SharedFrameBuffer() # 👈 新增 def overlay_alert(self, birdview_img): """在鸟瞰图上叠加半透明红色预警区域""" h, w = birdview_img.shape[:2] overlay = birdview_img.copy() alpha = 0.2 # 透明度 red = (0, 0, 200) margin_f_b = int(min(h, w) * 0.07) # 预警区域宽度(约7%)前后 margin_l_r = int(min(h, w) * 0.15) # 预警区域宽度(约15%)左右 if self.alerts["front"]: cv2.rectangle(overlay, (0, 0), (w, margin_f_b), red, -1) if self.alerts["back"]: cv2.rectangle(overlay, (0, h - margin_f_b), (w, h), red, -1) if self.alerts["left"]: cv2.rectangle(overlay, (0, 0), (margin_l_r, h), red, -1) if self.alerts["right"]: cv2.rectangle(overlay, (w - margin_l_r, 0), (w, h), red, -1) # 混合原图与覆盖层 blended = cv2.addWeighted(birdview_img, 1 - alpha, overlay, alpha, 0) return blended def detect_persons(self, image): """使用YOLO模型检测图像中的人体""" if self.yolo_model is None: return image, [], [] try: # 保存原始图像尺寸 orig_h, orig_w = image.shape[:2] # 预处理图像 pad_color = (0, 0, 0) img_preprocessed = self.co_helper.letter_box( im=image.copy(), new_shape=(IMG_SIZE[1], IMG_SIZE[0]), pad_color=pad_color ) # img_preprocessed = cv2.cvtColor(img_preprocessed, cv2.COLOR_BGR2RGB) # 推理 outputs = self.yolo_model.run([np.expand_dims(img_preprocessed, 0)]) # 后处理 boxes, classes, scores = post_process(outputs, ANCHORS) if boxes is not None: # 将检测框转换回原始图像坐标 real_boxes = self.co_helper.get_real_box(boxes) # 筛选出人的检测框 person_boxes = [] person_scores = [] for i in range(len(real_boxes)): if classes[i] < len(CLASSES) and CLASSES[classes[i]] == "person": # 确保坐标在图像范围内 box = real_boxes[i].copy() box[0] = max(0, min(box[0], orig_w)) box[1] = max(0, min(box[1], orig_h)) box[2] = max(0, min(box[2], orig_w)) box[3] = max(0, min(box[3], orig_h)) person_boxes.append(box) person_scores.append(scores[i]) # 在图像上绘制检测框 if person_boxes: image = draw_detections(image, np.array(person_boxes), np.array(person_scores), np.zeros(len(person_boxes), dtype=int)) # 打印检测信息 # print(f"[YOLO] 检测到 {len(person_boxes)} 个人体") # for box, score in zip(person_boxes, person_scores): # print(f" 位置: ({int(box[0])}, {int(box[1])}, {int(box[2])}, {int(box[3])}), 置信度: {score:.2f}") return image, person_boxes, person_scores else: return image, [], [] except Exception as e: print(f"[ERROR] YOLO检测失败: {e}") return image, [], [] def _initialize_weights(self): try: images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names] static_frames = [] for img_path, cam_model in zip(images, self.camera_models): img = cv2.imread(img_path) if img is None: print(f"[WARN] 静态图缺失: {img_path},用黑图代替") img = np.zeros((1080, 1920, 3), dtype=np.uint8) img = cam_model.undistort(img) img = cam_model.project(img) img = cam_model.flip(img) static_frames.append(img) if len(static_frames) == 4: self.birdview.get_weights_and_masks(static_frames) print("[INFO] 权重矩阵初始化成功") except Exception as e: print(f"[ERROR] 权重初始化失败: {e}", file=sys.stderr) def process_frame_once(self, frame, model): """只处理一次:去畸变 + 投影 + 翻转""" frame = model.undistort(frame) frame = model.project(frame) frame = model.flip(frame) return frame def process_frame_undistort(self, frame, model): """只处理一次:去畸变""" frame = model.undistort(frame) return frame def run(self): current_view = "front" frame_count = 0 detection_interval = 3 # 每5帧进行一次检测,避免性能问题 while self.running: raw_frames = {} processed_frames = [] for i, (cap, model, name) in enumerate(zip(self.caps, self.camera_models, self.names)): ret, frame = cap.read() raw_frames[name] = frame # self.shared_buffer.update(raw_frames[current_view]) p_frame = self.process_frame_once(frame, model) processed_frames.append(p_frame) # 更新鸟瞰图 self.birdview.update_frames(processed_frames) self.birdview.stitch_all_parts() self.birdview.make_white_balance() self.birdview.copy_car_image() # 获取单路图像(仅去畸变) single_img = self.process_frame_undistort( raw_frames[current_view], self.camera_models[self.names.index(current_view)] ) # 在单路图像上进行人体检测 frame_count += 1 if frame_count % detection_interval == 0 and self.yolo_model is not None: single_img, person_boxes, person_scores = self.detect_persons(single_img) # 根据检测结果自动触发预警 if person_boxes: # 可以根据人体的位置和数量来触发预警 self.alerts[current_view] = True # 重置其他视图的预警 for view in self.alerts: if view != current_view: self.alerts[view] = False else: # 没有检测到人,清除所有预警 for view in self.alerts: self.alerts[view] = False # 叠加预警区域 birdview_with_alert = self.overlay_alert(self.birdview.image) # 拼接显示:左侧鸟瞰图(1/3),右侧单路(2/3) h_display, w_display = 720, 1280 w_bird = w_display // 3 w_single = w_display - w_bird bird_resized = cv2.resize(birdview_with_alert, (w_bird, h_display)) single_resized = cv2.resize(single_img, (w_single, h_display)) display = np.hstack((bird_resized, single_resized)) # 在显示窗口上添加状态信息 # info_text = f"View: {current_view} | Persons detected: {len(person_boxes) if 'person_boxes' in locals() else 0}" # cv2.putText(display, info_text, (10, 30), # cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) # 全屏显示 cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) cv2.imshow("Video", display) key = cv2.waitKey(1) & 0xFF # if key == ord('q'): # self.running = False # break # elif key == ord('1'): # current_view = "front" # elif key == ord('2'): # current_view = "back" # elif key == ord('3'): # current_view = "left" # elif key == ord('4'): # current_view = "right" # # 新增:预警控制 # elif key == ord('5'): # self.alerts["front"] = True # elif key == ord('6'): # self.alerts["back"] = True # elif key == ord('7'): # self.alerts["left"] = True # elif key == ord('8'): # self.alerts["right"] = True # elif key == ord('0'): # # 清除所有预警 # for k in self.alerts: # self.alerts[k] = False # elif key == ord('d'): # # 手动触发一次检测 # single_img, person_boxes, person_scores = self.detect_persons(single_img) for cap in self.caps: cap.release() cv2.destroyAllWindows() def main(): print("🚀 启动实时四路环视系统...") print("操作说明:") print(" 1-4: 切换单路视图(前/后/左/右)") print(" 5-8: 触发前/后/左/右 接近预警") print(" 0 : 清除所有预警") print(" d : 手动触发人体检测") print(" q : 退出程序") multi_cam = MultiCameraBirdView() # ===== 启动视频流 ===== try: from mjpeg_streamer import MJPEGServer mjpeg_server = MJPEGServer(multi_cam.shared_buffer, host="0.0.0.0", port=8080) mjpeg_server.start() except Exception as e: print(f"[WARN] YUYV 流启动失败: {e}") if multi_cam.running: multi_cam.run() else: print("[ERROR] 摄像头初始化失败") if __name__ == "__main__": main()