Files
LJ360/web.py
2025-12-26 10:02:00 +08:00

657 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# from turtle import right
import cv2
import threading
import sys
import os
import argparse
import numpy as np
import serial
import time
from surround_view import FisheyeCameraModel, BirdView
import surround_view.param_settings as settings
# 串口雷达配置
RADAR_SERIAL_PORT = '/dev/ttyS0'
RADAR_BAUDRATE = 115200 # 根据实际雷达配置调整
RADAR_TIMEOUT = 0.1
# 距离阈值配置单位mm
DISTANCE_THRESHOLD = 3000 # 距离过近阈值,可以根据需要调整
# 雷达ID与方向映射
RADAR_ID_MAP = {1: 'front', 2: 'left', 3: 'back', 4: 'right'}
def parse_radar_data(raw_data):
"""
解析雷达原始数据
数据格式: 0xA5 0x5A [雷达ID-距离高字节-距离低字节]... 0x5A 0xA5
"""
try:
# 查找帧头位置
frame_start = raw_data.find(b'\xA5\x5A')
if frame_start == -1:
return None
# 查找帧尾位置
frame_end = raw_data.find(b'\x5A\xA5', frame_start)
if frame_end == -1:
return None
# 提取完整帧数据
frame_data = raw_data[frame_start+2:frame_end]
# 检查数据长度是否正确每个雷达3字节 × 4雷达 = 12字节
if len(frame_data) != 12:
return None
# 解析每个雷达数据
radar_distances = {}
for i in range(4):
radar_id = frame_data[i*3]
if radar_id not in RADAR_ID_MAP:
continue
distance_high = frame_data[i*3+1]
distance_low = frame_data[i*3+2]
distance = (distance_high << 8) + distance_low
direction = RADAR_ID_MAP[radar_id]
radar_distances[direction] = distance
return radar_distances
except Exception as e:
print(f"[ERROR] 雷达数据解析失败: {e}")
return None
sys.path.append(os.path.dirname(__file__)) # 确保能导入 py_utils
from py_utils.coco_utils import COCO_test_helper
from py_utils.rknn_executor import RKNN_model_container # 假设使用 RKNN
from mjpeg_streamer import MJPEGServer
from multiprocessing import Pool, Manager
import cv2
# 启用 OpenCLMali-G610
if cv2.ocl.haveOpenCL():
cv2.ocl.setUseOpenCL(True)
print("✅ OpenCL is ON — using Mali-G610 GPU for acceleration")
else:
print("⚠️ OpenCL not available")
# ------YOLO 配置-----------
# YOLO 配置
YOLO_MODEL_PATH = './yolov5s-640-640.rknn'
OBJ_THRESH = 0.6
NMS_THRESH = 0.6
IMG_SIZE = (640, 640) # (w, h)
CLASSES = ("person",) # 只关心 person
# 加载 anchors
ANCHORS_FILE = './model/anchors_yolov5.txt'
with open(ANCHORS_FILE, 'r') as f:
values = [float(_v) for _v in f.readlines()]
ANCHORS = np.array(values).reshape(3, -1, 2).tolist()
import threading
import numpy as np
class SharedFrameBuffer:
def __init__(self):
self._frame = None
self._lock = threading.Lock()
self._has_frame = False
def update(self, frame: np.ndarray):
"""主线程调用:更新最新帧"""
with self._lock:
self._frame = frame.copy()
self._has_frame = True
def get_frame(self):
"""YUYV 线程调用:获取最新帧"""
with self._lock:
if self._has_frame and self._frame is not None:
return True, self._frame.copy()
else:
return False, None
# ---------- YOLO 处理函数 ----------
def filter_boxes(boxes, box_confidences, box_class_probs):
box_confidences = box_confidences.reshape(-1)
class_max_score = np.max(box_class_probs, axis=-1)
classes = np.argmax(box_class_probs, axis=-1)
_class_pos = np.where(class_max_score * box_confidences >= OBJ_THRESH)
scores = (class_max_score * box_confidences)[_class_pos]
boxes = boxes[_class_pos]
classes = classes[_class_pos]
return boxes, classes, scores
def nms_boxes(boxes, scores):
x = boxes[:, 0]
y = boxes[:, 1]
w = boxes[:, 2] - boxes[:, 0]
h = boxes[:, 3] - boxes[:, 1]
areas = w * h
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x[i], x[order[1:]])
yy1 = np.maximum(y[i], y[order[1:]])
xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]])
yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]])
w1 = np.maximum(0.0, xx2 - xx1 + 0.00001)
h1 = np.maximum(0.0, yy2 - yy1 + 0.00001)
inter = w1 * h1
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= NMS_THRESH)[0]
order = order[inds + 1]
keep = np.array(keep)
return keep
def box_process(position, anchors):
grid_h, grid_w = position.shape[2:4]
col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h))
col = col.reshape(1, 1, grid_h, grid_w)
row = row.reshape(1, 1, grid_h, grid_w)
grid = np.concatenate((col, row), axis=1)
stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1, 2, 1, 1)
col = col.repeat(len(anchors), axis=0)
row = row.repeat(len(anchors), axis=0)
anchors = np.array(anchors)
anchors = anchors.reshape(*anchors.shape, 1, 1)
box_xy = position[:, :2, :, :] * 2 - 0.5
box_wh = pow(position[:, 2:4, :, :] * 2, 2) * anchors
box_xy += grid
box_xy *= stride
box = np.concatenate((box_xy, box_wh), axis=1)
xyxy = np.copy(box)
xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2
xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2
xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2
xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2
return xyxy
def post_process(input_data, anchors):
boxes, scores, classes_conf = [], [], []
input_data = [_in.reshape([len(anchors[0]), -1] + list(_in.shape[-2:])) for _in in input_data]
for i in range(len(input_data)):
boxes.append(box_process(input_data[i][:, :4, :, :], anchors[i]))
scores.append(input_data[i][:, 4:5, :, :])
classes_conf.append(input_data[i][:, 5:, :, :])
def sp_flatten(_in):
ch = _in.shape[1]
_in = _in.transpose(0, 2, 3, 1)
return _in.reshape(-1, ch)
boxes = [sp_flatten(_v) for _v in boxes]
classes_conf = [sp_flatten(_v) for _v in classes_conf]
scores = [sp_flatten(_v) for _v in scores]
boxes = np.concatenate(boxes)
classes_conf = np.concatenate(classes_conf)
scores = np.concatenate(scores)
boxes, classes, scores = filter_boxes(boxes, scores, classes_conf)
nboxes, nclasses, nscores = [], [], []
for c in set(classes):
inds = np.where(classes == c)
b = boxes[inds]
c = classes[inds]
s = scores[inds]
keep = nms_boxes(b, s)
if len(keep) != 0:
nboxes.append(b[keep])
nclasses.append(c[keep])
nscores.append(s[keep])
if not nclasses and not nscores:
return None, None, None
boxes = np.concatenate(nboxes)
classes = np.concatenate(nclasses)
scores = np.concatenate(nscores)
return boxes, classes, scores
def draw_detections(image, boxes, scores, classes):
"""在图像上绘制检测框"""
if boxes is None:
return image
for box, score, cl in zip(boxes, scores, classes):
# 只绘制人的检测框
if CLASSES[cl] != "person":
continue
top, left, right, bottom = [int(_b) for _b in box]
# 绘制矩形框
cv2.rectangle(image, (top, left), (right, bottom), (0, 255, 0), 20)
# 绘制标签背景
label = f'person: {score:.2f}'
(label_width, label_height), baseline = cv2.getTextSize(
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 15
)
# 绘制标签矩形
cv2.rectangle(
image,
(top, left - label_height - 5),
(top + label_width, left),
(0, 255, 0),
-1
)
# # 绘制标签文字
cv2.putText(
image,
label,
(top, left - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
2
)
return image
# ------------------------
class MultiCameraBirdView:
def __init__(self):
self.running = True
self.names = settings.camera_names # e.g., ['front', 'back', 'left', 'right']
self.yamls = [os.path.join(os.getcwd(), "yaml", name + ".yaml") for name in self.names]
self.camera_models = [
FisheyeCameraModel(camera_file, camera_name)
for camera_file, camera_name in zip(self.yamls, self.names)
]
# 摄像头设备映射
self.which_cameras = {"front": 0, "back": 2, "left": 1, "right": 3}
self.caps = []
print("[INFO] 初始化摄像头...")
for name in self.names:
cap = cv2.VideoCapture(self.which_cameras[name], cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 最小缓冲,降低延迟
# cap.set(cv2.CAP_PROP_FPS, 30) # 设置帧率为 30 FPS
if not cap.isOpened():
print(f"[ERROR] 无法打开 {name} 摄像头 (设备 {self.which_cameras[name]})", file=sys.stderr)
self.running = False
return
self.caps.append(cap)
self.birdview = BirdView()
self._initialize_weights()
# 新增:预警状态
self.alerts = {
"front": False,
"back": False,
"left": False,
"right": False
}
# 雷达数据
self.radar_distances = {
"front": 0,
"back": 0,
"left": 0,
"right": 0
}
self.radar_serial = None
self.radar_thread = None
self.radar_running = False
# === 新增YOLO 人体检测模型 ===
try:
self.yolo_model = RKNN_model_container(YOLO_MODEL_PATH, target='rk3588')
print("[INFO] YOLO 模型加载成功")
# 初始化COCO助手用于图像预处理
self.co_helper = COCO_test_helper(enable_letter_box=True)
except Exception as e:
print(f"[ERROR] YOLO 模型加载失败: {e}")
self.yolo_model = None
self.shared_buffer = SharedFrameBuffer() # 👈 新增
# 初始化雷达串口
self._init_radar_serial()
def overlay_alert(self, birdview_img):
"""在鸟瞰图上叠加半透明红色预警区域"""
h, w = birdview_img.shape[:2]
overlay = birdview_img.copy()
alpha = 0.2 # 透明度
red = (0, 0, 200)
margin_f_b = int(min(h, w) * 0.07) # 预警区域宽度约7%)前后
margin_l_r = int(min(h, w) * 0.15) # 预警区域宽度约15%)左右
if self.alerts["front"]:
cv2.rectangle(overlay, (0, 0), (w, margin_f_b), red, -1)
if self.alerts["back"]:
cv2.rectangle(overlay, (0, h - margin_f_b), (w, h), red, -1)
if self.alerts["left"]:
cv2.rectangle(overlay, (0, 0), (margin_l_r, h), red, -1)
if self.alerts["right"]:
cv2.rectangle(overlay, (w - margin_l_r, 0), (w, h), red, -1)
# 混合原图与覆盖层
blended = cv2.addWeighted(birdview_img, 1 - alpha, overlay, alpha, 0)
return blended
def detect_persons(self, image):
"""使用YOLO模型检测图像中的人体"""
if self.yolo_model is None:
return image, [], []
try:
# 保存原始图像尺寸
orig_h, orig_w = image.shape[:2]
# 预处理图像
pad_color = (0, 0, 0)
img_preprocessed = self.co_helper.letter_box(
im=image.copy(),
new_shape=(IMG_SIZE[1], IMG_SIZE[0]),
pad_color=pad_color
)
# img_preprocessed = cv2.cvtColor(img_preprocessed, cv2.COLOR_BGR2RGB)
# 推理
outputs = self.yolo_model.run([np.expand_dims(img_preprocessed, 0)])
# 后处理
boxes, classes, scores = post_process(outputs, ANCHORS)
if boxes is not None:
# 将检测框转换回原始图像坐标
real_boxes = self.co_helper.get_real_box(boxes)
# 筛选出人的检测框
person_boxes = []
person_scores = []
for i in range(len(real_boxes)):
if classes[i] < len(CLASSES) and CLASSES[classes[i]] == "person":
# 确保坐标在图像范围内
box = real_boxes[i].copy()
box[0] = max(0, min(box[0], orig_w))
box[1] = max(0, min(box[1], orig_h))
box[2] = max(0, min(box[2], orig_w))
box[3] = max(0, min(box[3], orig_h))
person_boxes.append(box)
person_scores.append(scores[i])
# 在图像上绘制检测框
if person_boxes:
image = draw_detections(image, np.array(person_boxes),
np.array(person_scores),
np.zeros(len(person_boxes), dtype=int))
# 打印检测信息
# print(f"[YOLO] 检测到 {len(person_boxes)} 个人体")
# for box, score in zip(person_boxes, person_scores):
# print(f" 位置: ({int(box[0])}, {int(box[1])}, {int(box[2])}, {int(box[3])}), 置信度: {score:.2f}")
return image, person_boxes, person_scores
else:
return image, [], []
except Exception as e:
print(f"[ERROR] YOLO检测失败: {e}")
return image, [], []
def _init_radar_serial(self):
"""
初始化雷达串口并启动数据读取线程
"""
try:
self.radar_serial = serial.Serial(
port=RADAR_SERIAL_PORT,
baudrate=RADAR_BAUDRATE,
timeout=RADAR_TIMEOUT
)
print(f"✅ 已打开雷达串口 {RADAR_SERIAL_PORT}")
# 启动雷达数据读取线程
self.radar_running = True
self.radar_thread = threading.Thread(target=self._read_radar_data, daemon=True)
self.radar_thread.start()
print("[INFO] 雷达数据读取线程已启动")
except serial.SerialException as e:
print(f"❌ 无法打开雷达串口 {RADAR_SERIAL_PORT}: {e}")
self.radar_serial = None
except Exception as e:
print(f"[ERROR] 雷达初始化失败: {e}")
self.radar_serial = None
def _read_radar_data(self):
"""
雷达数据读取线程函数
"""
buffer = b''
while self.radar_running and self.radar_serial and self.radar_serial.is_open:
try:
# 读取可用数据
if self.radar_serial.in_waiting > 0:
data = self.radar_serial.read(self.radar_serial.in_waiting)
buffer += data
# 尝试解析数据
parsed_data = parse_radar_data(buffer)
if parsed_data:
print(f"[RADAR] 前:{parsed_data.get('front', 0):4d}cm 后:{parsed_data.get('back', 0):4d}cm 左:{parsed_data.get('left', 0):4d}cm 右:{parsed_data.get('right', 0):4d}cm")
# 更新雷达距离数据
self.radar_distances.update(parsed_data)
# 清空已解析的缓冲区
buffer = buffer[buffer.find(b'\x5A\xA5')+2:]
time.sleep(0.01) # 降低CPU占用
except serial.SerialException as e:
print(f"❌ 雷达串口异常: {e}")
break
except Exception as e:
print(f"[ERROR] 雷达数据读取失败: {e}")
time.sleep(0.1)
def _initialize_weights(self):
try:
images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names]
static_frames = []
for img_path, cam_model in zip(images, self.camera_models):
img = cv2.imread(img_path)
if img is None:
print(f"[WARN] 静态图缺失: {img_path},用黑图代替")
img = np.zeros((1080, 1920, 3), dtype=np.uint8)
img = cam_model.undistort(img)
img = cam_model.project(img)
img = cam_model.flip(img)
static_frames.append(img)
if len(static_frames) == 4:
self.birdview.get_weights_and_masks(static_frames)
print("[INFO] 权重矩阵初始化成功")
except Exception as e:
print(f"[ERROR] 权重初始化失败: {e}", file=sys.stderr)
def process_frame_once(self, frame, model):
"""只处理一次:去畸变 + 投影 + 翻转"""
frame = model.undistort(frame)
frame = model.project(frame)
frame = model.flip(frame)
return frame
def process_frame_undistort(self, frame, model):
"""只处理一次:去畸变"""
frame = model.undistort(frame)
return frame
def run(self):
current_view = "front"
frame_count = 0
detection_interval = 3 # 每5帧进行一次检测避免性能问题
# 雷达触发视图切换的方向优先级(距离过近时优先显示哪个方向)
radar_priority = ['front', 'left', 'right', 'back']
while self.running:
raw_frames = {}
processed_frames = []
for i, (cap, model, name) in enumerate(zip(self.caps, self.camera_models, self.names)):
ret, frame = cap.read()
raw_frames[name] = frame
p_frame = self.process_frame_once(frame, model)
processed_frames.append(p_frame)
# 更新鸟瞰图
self.birdview.update_frames(processed_frames)
self.birdview.stitch_all_parts()
self.birdview.make_white_balance()
self.birdview.copy_car_image()
# 检查雷达距离,自动切换视图
auto_switch_view = None
for direction in radar_priority:
distance = self.radar_distances.get(direction, 0)
if distance > 0 and distance < DISTANCE_THRESHOLD:
auto_switch_view = direction
break
if auto_switch_view:
current_view = auto_switch_view
print(f"[INFO] 雷达检测到{auto_switch_view}方向距离过近 ({self.radar_distances[auto_switch_view]}cm),自动切换视图")
# 获取单路图像(仅去畸变)
single_img = self.process_frame_undistort(
raw_frames[current_view],
self.camera_models[self.names.index(current_view)]
)
# 在单路图像上进行人体检测
frame_count += 1
if frame_count % detection_interval == 0 and self.yolo_model is not None:
single_img, person_boxes, person_scores = self.detect_persons(single_img)
# 根据检测结果自动触发预警
if person_boxes:
# 可以根据人体的位置和数量来触发预警
self.alerts[current_view] = True
# 重置其他视图的预警
for view in self.alerts:
if view != current_view:
self.alerts[view] = False
else:
# 没有检测到人,清除所有预警
for view in self.alerts:
self.alerts[view] = False
# 叠加预警区域
birdview_with_alert = self.overlay_alert(self.birdview.image)
# 拼接显示左侧鸟瞰图1/3右侧单路2/3
h_display, w_display = 720, 1280
w_bird = w_display // 3
w_single = w_display - w_bird
bird_resized = cv2.resize(birdview_with_alert, (w_bird, h_display))
single_resized = cv2.resize(single_img, (w_single, h_display))
display = np.hstack((bird_resized, single_resized))
self.shared_buffer.update(display)
# 全屏显示
cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
cv2.imshow("Video", display)
key = cv2.waitKey(1) & 0xFF
# 清理雷达资源
self.radar_running = False
if self.radar_thread and self.radar_thread.is_alive():
self.radar_thread.join(timeout=1.0)
if self.radar_serial and self.radar_serial.is_open:
self.radar_serial.close()
print("🔌 雷达串口已关闭")
for cap in self.caps:
cap.release()
cv2.destroyAllWindows()
def main():
print("🚀 启动实时四路环视系统...")
print("操作说明:")
print(" 1-4: 切换单路视图(前/后/左/右)")
print(" 5-8: 触发前/后/左/右 接近预警")
print(" 0 : 清除所有预警")
print(" d : 手动触发人体检测")
print(" q : 退出程序")
multi_cam = MultiCameraBirdView()
# ===== 启动视频流 =====
try:
from mjpeg_streamer import MJPEGServer
mjpeg_server = MJPEGServer(multi_cam.shared_buffer, host="0.0.0.0", port=8080)
mjpeg_server.start()
except Exception as e:
print(f"[WARN] YUYV 流启动失败: {e}")
if multi_cam.running:
multi_cam.run()
else:
print("[ERROR] 摄像头初始化失败")
if __name__ == "__main__":
main()