# main.py import cv2 import threading import sys import os import argparse import numpy as np import time from surround_view import FisheyeCameraModel, BirdView import surround_view.param_settings as settings sys.path.append(os.path.dirname(__file__)) from py_utils.coco_utils import COCO_test_helper from py_utils.rknn_executor import RKNN_model_container from mjpeg_streamer import MJPEGServer # --- 新增 --- from shared_buffer import CameraFrameBuffer, DetectionResultBuffer # 启用 OpenCL(Mali-G610) if cv2.ocl.haveOpenCL(): cv2.ocl.setUseOpenCL(True) print("✅ OpenCL is ON — using Mali-G610 GPU for acceleration") else: print("⚠️ OpenCL not available") # ------ YOLO 配置 ----------- YOLO_MODEL_PATH = './yolov5s-640-640.rknn' OBJ_THRESH = 0.6 NMS_THRESH = 0.6 IMG_SIZE = (640, 640) CLASSES = ("person",) ANCHORS_FILE = './model/anchors_yolov5.txt' with open(ANCHORS_FILE, 'r') as f: values = [float(_v) for _v in f.readlines()] ANCHORS = np.array(values).reshape(3, -1, 2).tolist() # ========== YOLO 后处理函数(保持不变)========== def filter_boxes(boxes, box_confidences, box_class_probs): box_confidences = box_confidences.reshape(-1) class_max_score = np.max(box_class_probs, axis=-1) classes = np.argmax(box_class_probs, axis=-1) _class_pos = np.where(class_max_score * box_confidences >= OBJ_THRESH) scores = (class_max_score * box_confidences)[_class_pos] boxes = boxes[_class_pos] classes = classes[_class_pos] return boxes, classes, scores def nms_boxes(boxes, scores): x = boxes[:, 0]; y = boxes[:, 1]; w = boxes[:, 2] - boxes[:, 0]; h = boxes[:, 3] - boxes[:, 1] areas = w * h; order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0]; keep.append(i) xx1 = np.maximum(x[i], x[order[1:]]); yy1 = np.maximum(y[i], y[order[1:]]) xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]]); yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]]) w1 = np.maximum(0.0, xx2 - xx1 + 0.00001); h1 = np.maximum(0.0, yy2 - yy1 + 0.00001) inter = w1 * h1 ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= NMS_THRESH)[0] order = order[inds + 1] return np.array(keep) def box_process(position, anchors): grid_h, grid_w = position.shape[2:4] col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h)) col = col.reshape(1, 1, grid_h, grid_w); row = row.reshape(1, 1, grid_h, grid_w) grid = np.concatenate((col, row), axis=1) stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1, 2, 1, 1) col = col.repeat(len(anchors), axis=0); row = row.repeat(len(anchors), axis=0) anchors = np.array(anchors).reshape(*anchors.shape, 1, 1) box_xy = position[:, :2, :, :] * 2 - 0.5 box_wh = pow(position[:, 2:4, :, :] * 2, 2) * anchors box_xy += grid; box_xy *= stride box = np.concatenate((box_xy, box_wh), axis=1) xyxy = np.copy(box) xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2 xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2 xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2 xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2 return xyxy def post_process(input_data, anchors): boxes, scores, classes_conf = [], [], [] input_data = [_in.reshape([len(anchors[0]), -1] + list(_in.shape[-2:])) for _in in input_data] for i in range(len(input_data)): boxes.append(box_process(input_data[i][:, :4, :, :], anchors[i])) scores.append(input_data[i][:, 4:5, :, :]) classes_conf.append(input_data[i][:, 5:, :, :]) def sp_flatten(_in): ch = _in.shape[1]; _in = _in.transpose(0, 2, 3, 1); return _in.reshape(-1, ch) boxes = [sp_flatten(_v) for _v in boxes] classes_conf = [sp_flatten(_v) for _v in classes_conf] scores = [sp_flatten(_v) for _v in scores] boxes = np.concatenate(boxes); classes_conf = np.concatenate(classes_conf); scores = np.concatenate(scores) boxes, classes, scores = filter_boxes(boxes, scores, classes_conf) nboxes, nclasses, nscores = [], [], [] for c in set(classes): inds = np.where(classes == c) b = boxes[inds]; c = classes[inds]; s = scores[inds] keep = nms_boxes(b, s) if len(keep) != 0: nboxes.append(b[keep]); nclasses.append(c[keep]); nscores.append(s[keep]) if not nclasses: return None, None, None boxes = np.concatenate(nboxes); classes = np.concatenate(nclasses); scores = np.concatenate(nscores) return boxes, classes, scores def draw_detections(image, boxes, scores, classes): if boxes is None: return image for box, score, cl in zip(boxes, scores, classes): if CLASSES[cl] != "person": continue top, left, right, bottom = [int(_b) for _b in box] cv2.rectangle(image, (top, left), (right, bottom), (0, 255, 0), 2) label = f'person: {score:.2f}' (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(image, (top, left - h - 5), (top + w, left), (0, 255, 0), -1) cv2.putText(image, label, (top, left - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) return image # ========== 主类 ========== class MultiCameraBirdView: def __init__(self): self.running = True self.names = settings.camera_names self.yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in self.names] self.camera_models = [ FisheyeCameraModel(camera_file, camera_name) for camera_file, camera_name in zip(self.yamls, self.names) ] self.which_cameras = {"front": 0, "back": 2, "left": 1, "right": 3} self.caps = [] print("[INFO] 初始化摄像头...") for name in self.names: cap = cv2.VideoCapture(self.which_cameras[name], cv2.CAP_V4L2) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV")) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if not cap.isOpened(): print(f"[ERROR] 无法打开 {name} 摄像头", file=sys.stderr) self.running = False return self.caps.append(cap) self.birdview = BirdView() self._initialize_weights() # 预警状态 self.alerts = {name: False for name in self.names} self.alert_lock = threading.Lock() # YOLO 模型 try: self.yolo_model = RKNN_model_container(YOLO_MODEL_PATH, target='rk3588') self.co_helper = COCO_test_helper(enable_letter_box=True) print("[INFO] YOLO 模型加载成功") except Exception as e: print(f"[ERROR] YOLO 模型加载失败: {e}") self.yolo_model = None # 共享缓冲区 self.undistorted_buffer = CameraFrameBuffer(self.names) self.detection_buffer = DetectionResultBuffer(self.names) self.shared_display_buffer = SharedFrameBuffer() # 用于 MJPEG # 当前视图控制 self.current_view = "front" self.current_view_lock = threading.Lock() # 启动摄像头线程 self.camera_threads = [] for cap, model, name in zip(self.caps, self.camera_models, self.names): thread = threading.Thread(target=self.camera_reader_thread, args=(cap, model, name), daemon=True) thread.start() self.camera_threads.append(thread) # 启动 AI 检测线程 if self.yolo_model is not None: self.ai_thread = threading.Thread(target=self.ai_detection_thread, daemon=True) self.ai_thread.start() else: self.ai_thread = None def _initialize_weights(self): try: images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names] static_frames = [] for img_path, cam_model in zip(images, self.camera_models): img = cv2.imread(img_path) if img is None: img = np.zeros((1080, 1920, 3), dtype=np.uint8) img = cam_model.undistort(img) img = cam_model.project(img) img = cam_model.flip(img) static_frames.append(img) if len(static_frames) == 4: self.birdview.get_weights_and_masks(static_frames) print("[INFO] 权重矩阵初始化成功") except Exception as e: print(f"[ERROR] 权重初始化失败: {e}") def camera_reader_thread(self, cap, model, name): """摄像头读取 + 去畸变线程""" while self.running: ret, frame = cap.read() if ret: undistorted = model.undistort(frame) self.undistorted_buffer.update(name, undistorted) else: print(f"[WARN] {name} 摄像头读取失败") break def detect_persons(self, image): if self.yolo_model is None: return image, [], [] try: orig_h, orig_w = image.shape[:2] pad_color = (0, 0, 0) img_preprocessed = self.co_helper.letter_box( im=image.copy(), new_shape=(IMG_SIZE[1], IMG_SIZE[0]), pad_color=pad_color ) outputs = self.yolo_model.run([np.expand_dims(img_preprocessed, 0)]) boxes, classes, scores = post_process(outputs, ANCHORS) if boxes is not None: real_boxes = self.co_helper.get_real_box(boxes) person_boxes, person_scores = [], [] for i in range(len(real_boxes)): if classes[i] < len(CLASSES) and CLASSES[classes[i]] == "person": box = real_boxes[i].copy() box = np.clip(box, [0, 0, 0, 0], [orig_w, orig_h, orig_w, orig_h]) person_boxes.append(box) person_scores.append(scores[i]) if person_boxes: image = draw_detections(image, np.array(person_boxes), np.array(person_scores), np.zeros(len(person_boxes), dtype=int)) return image, person_boxes, person_scores else: return image, [], [] except Exception as e: print(f"[ERROR] YOLO检测失败: {e}") return image, [], [] def ai_detection_thread(self): detection_interval = 3 frame_count = 0 while self.running: with self.current_view_lock: view = self.current_view success, frame = self.undistorted_buffer.get(view) if success: frame_count += 1 if frame_count % detection_interval == 0: img_with_det, boxes, scores = self.detect_persons(frame) self.detection_buffer.update(view, img_with_det, boxes, scores) with self.alert_lock: if boxes: self.alerts[view] = True for v in self.alerts: if v != view: self.alerts[v] = False else: for v in self.alerts: self.alerts[v] = False else: self.detection_buffer.update(view, frame, [], []) time.sleep(0.001) def overlay_alert(self, birdview_img): h, w = birdview_img.shape[:2] overlay = birdview_img.copy() alpha = 0.2 red = (0, 0, 200) margin_f_b = int(min(h, w) * 0.07) margin_l_r = int(min(h, w) * 0.15) with self.alert_lock: alerts = self.alerts.copy() if alerts["front"]: cv2.rectangle(overlay, (0, 0), (w, margin_f_b), red, -1) if alerts["back"]: cv2.rectangle(overlay, (0, h - margin_f_b), (w, h), red, -1) if alerts["left"]: cv2.rectangle(overlay, (0, 0), (margin_l_r, h), red, -1) if alerts["right"]: cv2.rectangle(overlay, (w - margin_l_r, 0), (w, h), red, -1) return cv2.addWeighted(birdview_img, 1 - alpha, overlay, alpha, 0) def run(self): h_display, w_display = 720, 1280 w_bird = w_display // 3 w_single = w_display - w_bird while self.running: # 1. 获取四路去畸变帧 → project → 拼接鸟瞰图 processed_frames = [] for name in self.names: success, undist_frame = self.undistorted_buffer.get(name) if success: # 使用 UMat 加速 warpPerspective uimg = cv2.UMat(undist_frame) uresult = cv2.warpPerspective( uimg, self.camera_models[self.names.index(name)].project_matrix, self.camera_models[self.names.index(name)].project_shape, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT ) p_frame = uresult.get() p_frame = self.camera_models[self.names.index(name)].flip(p_frame) processed_frames.append(p_frame) else: processed_frames.append(np.zeros((600, 800, 3), dtype=np.uint8)) self.birdview.update_frames(processed_frames) self.birdview.stitch_all_parts() self.birdview.make_white_balance() self.birdview.copy_car_image() # 2. 获取当前视图的 AI 检测结果 with self.current_view_lock: view = self.current_view det_img, _, _ = self.detection_buffer.get(view) if det_img is None: det_img = np.zeros((h_display, w_single, 3), dtype=np.uint8) # 3. 拼接显示 bird_resized = cv2.resize(self.overlay_alert(self.birdview.image), (w_bird, h_display)) single_resized = cv2.resize(det_img, (w_single, h_display)) display = np.hstack((bird_resized, single_resized)) self.shared_display_buffer.update(display) cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) cv2.imshow("Video", display) key = cv2.waitKey(1) & 0xFF if key == ord('q'): self.running = False elif key == ord('1'): with self.current_view_lock: self.current_view = "front" elif key == ord('2'): with self.current_view_lock: self.current_view = "back" elif key == ord('3'): with self.current_view_lock: self.current_view = "left" elif key == ord('4'): with self.current_view_lock: self.current_view = "right" elif key == ord('0'): with self.alert_lock: for k in self.alerts: self.alerts[k] = False for cap in self.caps: cap.release() cv2.destroyAllWindows() # ========== 辅助类 ========== class SharedFrameBuffer: def __init__(self): self._frame = None self._lock = threading.Lock() self._has_frame = False def update(self, frame: np.ndarray): with self._lock: self._frame = frame.copy() self._has_frame = True def get_frame(self): with self._lock: if self._has_frame and self._frame is not None: return True, self._frame.copy() else: return False, None # ========== 主函数 ========== def main(): print("🚀 启动实时四路环视系统...") multi_cam = MultiCameraBirdView() if not multi_cam.running: print("[ERROR] 摄像头初始化失败") return # 启动 MJPEG 流 try: mjpeg_server = MJPEGServer(multi_cam.shared_display_buffer, host="0.0.0.0", port=8080) mjpeg_server.start() print("[INFO] MJPEG 流已启动: http://:8080") except Exception as e: print(f"[WARN] MJPEG 流启动失败: {e}") multi_cam.run() if __name__ == "__main__": main()