Files
LJ360/saveimg.py
2025-12-23 09:18:32 +08:00

403 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# main.py
import cv2
import threading
import sys
import os
import argparse
import numpy as np
import time
from surround_view import FisheyeCameraModel, BirdView
import surround_view.param_settings as settings
sys.path.append(os.path.dirname(__file__))
from py_utils.coco_utils import COCO_test_helper
from py_utils.rknn_executor import RKNN_model_container
from mjpeg_streamer import MJPEGServer
# --- 新增 ---
from shared_buffer import CameraFrameBuffer, DetectionResultBuffer
# 启用 OpenCLMali-G610
if cv2.ocl.haveOpenCL():
cv2.ocl.setUseOpenCL(True)
print("✅ OpenCL is ON — using Mali-G610 GPU for acceleration")
else:
print("⚠️ OpenCL not available")
# ------ YOLO 配置 -----------
YOLO_MODEL_PATH = './yolov5s-640-640.rknn'
OBJ_THRESH = 0.6
NMS_THRESH = 0.6
IMG_SIZE = (640, 640)
CLASSES = ("person",)
ANCHORS_FILE = './model/anchors_yolov5.txt'
with open(ANCHORS_FILE, 'r') as f:
values = [float(_v) for _v in f.readlines()]
ANCHORS = np.array(values).reshape(3, -1, 2).tolist()
# ========== YOLO 后处理函数(保持不变)==========
def filter_boxes(boxes, box_confidences, box_class_probs):
box_confidences = box_confidences.reshape(-1)
class_max_score = np.max(box_class_probs, axis=-1)
classes = np.argmax(box_class_probs, axis=-1)
_class_pos = np.where(class_max_score * box_confidences >= OBJ_THRESH)
scores = (class_max_score * box_confidences)[_class_pos]
boxes = boxes[_class_pos]
classes = classes[_class_pos]
return boxes, classes, scores
def nms_boxes(boxes, scores):
x = boxes[:, 0]; y = boxes[:, 1]; w = boxes[:, 2] - boxes[:, 0]; h = boxes[:, 3] - boxes[:, 1]
areas = w * h; order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]; keep.append(i)
xx1 = np.maximum(x[i], x[order[1:]]); yy1 = np.maximum(y[i], y[order[1:]])
xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]]); yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]])
w1 = np.maximum(0.0, xx2 - xx1 + 0.00001); h1 = np.maximum(0.0, yy2 - yy1 + 0.00001)
inter = w1 * h1
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= NMS_THRESH)[0]
order = order[inds + 1]
return np.array(keep)
def box_process(position, anchors):
grid_h, grid_w = position.shape[2:4]
col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h))
col = col.reshape(1, 1, grid_h, grid_w); row = row.reshape(1, 1, grid_h, grid_w)
grid = np.concatenate((col, row), axis=1)
stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1, 2, 1, 1)
col = col.repeat(len(anchors), axis=0); row = row.repeat(len(anchors), axis=0)
anchors = np.array(anchors).reshape(*anchors.shape, 1, 1)
box_xy = position[:, :2, :, :] * 2 - 0.5
box_wh = pow(position[:, 2:4, :, :] * 2, 2) * anchors
box_xy += grid; box_xy *= stride
box = np.concatenate((box_xy, box_wh), axis=1)
xyxy = np.copy(box)
xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2
xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2
xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2
xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2
return xyxy
def post_process(input_data, anchors):
boxes, scores, classes_conf = [], [], []
input_data = [_in.reshape([len(anchors[0]), -1] + list(_in.shape[-2:])) for _in in input_data]
for i in range(len(input_data)):
boxes.append(box_process(input_data[i][:, :4, :, :], anchors[i]))
scores.append(input_data[i][:, 4:5, :, :])
classes_conf.append(input_data[i][:, 5:, :, :])
def sp_flatten(_in):
ch = _in.shape[1]; _in = _in.transpose(0, 2, 3, 1); return _in.reshape(-1, ch)
boxes = [sp_flatten(_v) for _v in boxes]
classes_conf = [sp_flatten(_v) for _v in classes_conf]
scores = [sp_flatten(_v) for _v in scores]
boxes = np.concatenate(boxes); classes_conf = np.concatenate(classes_conf); scores = np.concatenate(scores)
boxes, classes, scores = filter_boxes(boxes, scores, classes_conf)
nboxes, nclasses, nscores = [], [], []
for c in set(classes):
inds = np.where(classes == c)
b = boxes[inds]; c = classes[inds]; s = scores[inds]
keep = nms_boxes(b, s)
if len(keep) != 0:
nboxes.append(b[keep]); nclasses.append(c[keep]); nscores.append(s[keep])
if not nclasses: return None, None, None
boxes = np.concatenate(nboxes); classes = np.concatenate(nclasses); scores = np.concatenate(nscores)
return boxes, classes, scores
def draw_detections(image, boxes, scores, classes):
if boxes is None: return image
for box, score, cl in zip(boxes, scores, classes):
if CLASSES[cl] != "person": continue
top, left, right, bottom = [int(_b) for _b in box]
cv2.rectangle(image, (top, left), (right, bottom), (0, 255, 0), 2)
label = f'person: {score:.2f}'
(w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(image, (top, left - h - 5), (top + w, left), (0, 255, 0), -1)
cv2.putText(image, label, (top, left - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
return image
# ========== 主类 ==========
class MultiCameraBirdView:
def __init__(self):
self.running = True
self.names = settings.camera_names
self.yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in self.names]
self.camera_models = [
FisheyeCameraModel(camera_file, camera_name)
for camera_file, camera_name in zip(self.yamls, self.names)
]
self.which_cameras = {"front": 0, "back": 2, "left": 1, "right": 3}
self.caps = []
print("[INFO] 初始化摄像头...")
for name in self.names:
cap = cv2.VideoCapture(self.which_cameras[name], cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not cap.isOpened():
print(f"[ERROR] 无法打开 {name} 摄像头", file=sys.stderr)
self.running = False
return
self.caps.append(cap)
self.birdview = BirdView()
self._initialize_weights()
# 预警状态
self.alerts = {name: False for name in self.names}
self.alert_lock = threading.Lock()
# YOLO 模型
try:
self.yolo_model = RKNN_model_container(YOLO_MODEL_PATH, target='rk3588')
self.co_helper = COCO_test_helper(enable_letter_box=True)
print("[INFO] YOLO 模型加载成功")
except Exception as e:
print(f"[ERROR] YOLO 模型加载失败: {e}")
self.yolo_model = None
# 共享缓冲区
self.undistorted_buffer = CameraFrameBuffer(self.names)
self.detection_buffer = DetectionResultBuffer(self.names)
self.shared_display_buffer = SharedFrameBuffer() # 用于 MJPEG
# 当前视图控制
self.current_view = "front"
self.current_view_lock = threading.Lock()
# 启动摄像头线程
self.camera_threads = []
for cap, model, name in zip(self.caps, self.camera_models, self.names):
thread = threading.Thread(target=self.camera_reader_thread, args=(cap, model, name), daemon=True)
thread.start()
self.camera_threads.append(thread)
# 启动 AI 检测线程
if self.yolo_model is not None:
self.ai_thread = threading.Thread(target=self.ai_detection_thread, daemon=True)
self.ai_thread.start()
else:
self.ai_thread = None
def _initialize_weights(self):
try:
images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names]
static_frames = []
for img_path, cam_model in zip(images, self.camera_models):
img = cv2.imread(img_path)
if img is None:
img = np.zeros((1080, 1920, 3), dtype=np.uint8)
img = cam_model.undistort(img)
img = cam_model.project(img)
img = cam_model.flip(img)
static_frames.append(img)
if len(static_frames) == 4:
self.birdview.get_weights_and_masks(static_frames)
print("[INFO] 权重矩阵初始化成功")
except Exception as e:
print(f"[ERROR] 权重初始化失败: {e}")
def camera_reader_thread(self, cap, model, name):
"""摄像头读取 + 去畸变线程"""
while self.running:
ret, frame = cap.read()
if ret:
undistorted = model.undistort(frame)
self.undistorted_buffer.update(name, undistorted)
else:
print(f"[WARN] {name} 摄像头读取失败")
break
def detect_persons(self, image):
if self.yolo_model is None:
return image, [], []
try:
orig_h, orig_w = image.shape[:2]
pad_color = (0, 0, 0)
img_preprocessed = self.co_helper.letter_box(
im=image.copy(),
new_shape=(IMG_SIZE[1], IMG_SIZE[0]),
pad_color=pad_color
)
outputs = self.yolo_model.run([np.expand_dims(img_preprocessed, 0)])
boxes, classes, scores = post_process(outputs, ANCHORS)
if boxes is not None:
real_boxes = self.co_helper.get_real_box(boxes)
person_boxes, person_scores = [], []
for i in range(len(real_boxes)):
if classes[i] < len(CLASSES) and CLASSES[classes[i]] == "person":
box = real_boxes[i].copy()
box = np.clip(box, [0, 0, 0, 0], [orig_w, orig_h, orig_w, orig_h])
person_boxes.append(box)
person_scores.append(scores[i])
if person_boxes:
image = draw_detections(image, np.array(person_boxes), np.array(person_scores), np.zeros(len(person_boxes), dtype=int))
return image, person_boxes, person_scores
else:
return image, [], []
except Exception as e:
print(f"[ERROR] YOLO检测失败: {e}")
return image, [], []
def ai_detection_thread(self):
detection_interval = 3
frame_count = 0
while self.running:
with self.current_view_lock:
view = self.current_view
success, frame = self.undistorted_buffer.get(view)
if success:
frame_count += 1
if frame_count % detection_interval == 0:
img_with_det, boxes, scores = self.detect_persons(frame)
self.detection_buffer.update(view, img_with_det, boxes, scores)
with self.alert_lock:
if boxes:
self.alerts[view] = True
for v in self.alerts:
if v != view:
self.alerts[v] = False
else:
for v in self.alerts:
self.alerts[v] = False
else:
self.detection_buffer.update(view, frame, [], [])
time.sleep(0.001)
def overlay_alert(self, birdview_img):
h, w = birdview_img.shape[:2]
overlay = birdview_img.copy()
alpha = 0.2
red = (0, 0, 200)
margin_f_b = int(min(h, w) * 0.07)
margin_l_r = int(min(h, w) * 0.15)
with self.alert_lock:
alerts = self.alerts.copy()
if alerts["front"]: cv2.rectangle(overlay, (0, 0), (w, margin_f_b), red, -1)
if alerts["back"]: cv2.rectangle(overlay, (0, h - margin_f_b), (w, h), red, -1)
if alerts["left"]: cv2.rectangle(overlay, (0, 0), (margin_l_r, h), red, -1)
if alerts["right"]: cv2.rectangle(overlay, (w - margin_l_r, 0), (w, h), red, -1)
return cv2.addWeighted(birdview_img, 1 - alpha, overlay, alpha, 0)
def run(self):
h_display, w_display = 720, 1280
w_bird = w_display // 3
w_single = w_display - w_bird
while self.running:
# 1. 获取四路去畸变帧 → project → 拼接鸟瞰图
processed_frames = []
for name in self.names:
success, undist_frame = self.undistorted_buffer.get(name)
if success:
# 使用 UMat 加速 warpPerspective
uimg = cv2.UMat(undist_frame)
uresult = cv2.warpPerspective(
uimg,
self.camera_models[self.names.index(name)].project_matrix,
self.camera_models[self.names.index(name)].project_shape,
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT
)
p_frame = uresult.get()
p_frame = self.camera_models[self.names.index(name)].flip(p_frame)
processed_frames.append(p_frame)
else:
processed_frames.append(np.zeros((600, 800, 3), dtype=np.uint8))
self.birdview.update_frames(processed_frames)
self.birdview.stitch_all_parts()
self.birdview.make_white_balance()
self.birdview.copy_car_image()
# 2. 获取当前视图的 AI 检测结果
with self.current_view_lock:
view = self.current_view
det_img, _, _ = self.detection_buffer.get(view)
if det_img is None:
det_img = np.zeros((h_display, w_single, 3), dtype=np.uint8)
# 3. 拼接显示
bird_resized = cv2.resize(self.overlay_alert(self.birdview.image), (w_bird, h_display))
single_resized = cv2.resize(det_img, (w_single, h_display))
display = np.hstack((bird_resized, single_resized))
self.shared_display_buffer.update(display)
cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
cv2.imshow("Video", display)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
self.running = False
elif key == ord('1'):
with self.current_view_lock:
self.current_view = "front"
elif key == ord('2'):
with self.current_view_lock:
self.current_view = "back"
elif key == ord('3'):
with self.current_view_lock:
self.current_view = "left"
elif key == ord('4'):
with self.current_view_lock:
self.current_view = "right"
elif key == ord('0'):
with self.alert_lock:
for k in self.alerts:
self.alerts[k] = False
for cap in self.caps:
cap.release()
cv2.destroyAllWindows()
# ========== 辅助类 ==========
class SharedFrameBuffer:
def __init__(self):
self._frame = None
self._lock = threading.Lock()
self._has_frame = False
def update(self, frame: np.ndarray):
with self._lock:
self._frame = frame.copy()
self._has_frame = True
def get_frame(self):
with self._lock:
if self._has_frame and self._frame is not None:
return True, self._frame.copy()
else:
return False, None
# ========== 主函数 ==========
def main():
print("🚀 启动实时四路环视系统...")
multi_cam = MultiCameraBirdView()
if not multi_cam.running:
print("[ERROR] 摄像头初始化失败")
return
# 启动 MJPEG 流
try:
mjpeg_server = MJPEGServer(multi_cam.shared_display_buffer, host="0.0.0.0", port=8080)
mjpeg_server.start()
print("[INFO] MJPEG 流已启动: http://<IP>:8080")
except Exception as e:
print(f"[WARN] MJPEG 流启动失败: {e}")
multi_cam.run()
if __name__ == "__main__":
main()