import cv2 import threading import sys import os import argparse import numpy as np from surround_view import FisheyeCameraModel, BirdView import surround_view.param_settings as settings class MultiCameraBirdView: def __init__(self): self.running = True self.names = settings.camera_names self.yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in self.names] self.camera_models = [FisheyeCameraModel(camera_file, camera_name) for camera_file, camera_name in zip(self.yamls, self.names)] # 初始化摄像头 self.caps = [] self.which_cameras = { "front": 0, "back": 2, "left": 1, "right": 3 } # 初始化摄像头捕获 for name in self.names: cap = cv2.VideoCapture(self.which_cameras[name], cv2.CAP_ANY) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV")) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) if not cap.isOpened(): print(f"[ERROR] Cannot open {name} camera", file=sys.stderr) self.running = False return self.caps.append(cap) # 初始化鸟瞰图 self.birdview = BirdView() # 预先加载一次静态图像来初始化权重矩阵 self._initialize_weights() def _initialize_weights(self): """使用静态图像初始化权重矩阵""" try: # 加载静态图像用于初始化权重 images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names] static_frames = [] for image_file, camera_model in zip(images, self.camera_models): img = cv2.imread(image_file) if img is None: print(f"[ERROR] Cannot load static image: {image_file}") continue img = camera_model.undistort(img) img = camera_model.project(img) img = camera_model.flip(img) static_frames.append(img) if len(static_frames) == 4: # 初始化权重和掩码矩阵 Gmat, Mmat = self.birdview.get_weights_and_masks(static_frames) print("[INFO] Weights and masks initialized successfully") else: print("[WARNING] Could not load all static images for weight initialization") except Exception as e: print(f"[ERROR] Failed to initialize weights: {e}", file=sys.stderr) def process_frame(self, img, camera_model): """处理单帧图像:去畸变 -> 投影 -> 翻转""" img = camera_model.undistort(img) img = camera_model.project(img) img = camera_model.flip(img) return img def run(self): while self.running: frames = [] for i, (cap, camera_model) in enumerate(zip(self.caps, self.camera_models)): ret, frame = cap.read() if not ret or frame is None: print(f"[ERROR] Failed to read frame from camera {self.names[i]}") continue processed_frame = self.process_frame(frame, camera_model) frames.append(processed_frame) if len(frames) == 4: # 确保所有摄像头都正常工作 try: # 更新鸟瞰图(权重矩阵已预先初始化) self.birdview.update_frames(frames) self.birdview.stitch_all_parts() self.birdview.make_white_balance() self.birdview.copy_car_image() # 显示鸟瞰图 birdview_display = cv2.resize(self.birdview.image, (750, 1000)) cv2.imshow("Real-time Bird's Eye View", birdview_display) if cv2.waitKey(1) & 0xFF == ord('q'): self.running = False break except Exception as e: print(f"[ERROR] Processing error: {e}", file=sys.stderr) import traceback traceback.print_exc() continue # 释放资源 for cap in self.caps: cap.release() cv2.destroyAllWindows() def main(): parser = argparse.ArgumentParser(description="Real-time 4-Camera Bird's Eye View") parser.add_argument("--mode", type=str, required=True, choices=["realtime", "static"], help="Mode: 'realtime' for live video, 'static' for static images") args = parser.parse_args() if args.mode == "static": # 原有的静态图像处理代码 names = settings.camera_names images = [os.path.join(os.getcwd(), "images", name + ".png") for name in names] yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in names] camera_models = [FisheyeCameraModel(camera_file, camera_name) for camera_file, camera_name in zip(yamls, names)] projected = [] for image_file, camera in zip(images, camera_models): img = cv2.imread(image_file) img = camera.undistort(img) img = camera.project(img) img = camera.flip(img) projected.append(img) birdview = BirdView() Gmat, Mmat = birdview.get_weights_and_masks(projected) # 初始化权重矩阵 birdview.update_frames(projected) birdview.stitch_all_parts() birdview.make_white_balance() birdview.copy_car_image() img_small = cv2.resize(birdview.image, (300, 600)) cv2.imshow("BirdView Result", img_small) while True: key = cv2.waitKey(1) & 0xFF if key == ord("q"): cv2.destroyAllWindows() return -1 elif args.mode == "realtime": # 实时视频处理 print("Starting real-time 4-camera bird's eye view...") print("Press 'q' to quit") multi_cam = MultiCameraBirdView() if multi_cam.running: multi_cam.run() else: print("[ERROR] Failed to initialize cameras") if __name__ == "__main__": main()