增加CPP
This commit is contained in:
190
web.py
190
web.py
@@ -5,9 +5,65 @@ import sys
|
||||
import os
|
||||
import argparse
|
||||
import numpy as np
|
||||
import serial
|
||||
import time
|
||||
from surround_view import FisheyeCameraModel, BirdView
|
||||
import surround_view.param_settings as settings
|
||||
|
||||
# 串口雷达配置
|
||||
RADAR_SERIAL_PORT = '/dev/ttyS0'
|
||||
RADAR_BAUDRATE = 115200 # 根据实际雷达配置调整
|
||||
RADAR_TIMEOUT = 0.1
|
||||
|
||||
# 距离阈值配置(单位:mm)
|
||||
DISTANCE_THRESHOLD = 3000 # 距离过近阈值,可以根据需要调整
|
||||
|
||||
# 雷达ID与方向映射
|
||||
RADAR_ID_MAP = {1: 'front', 2: 'left', 3: 'back', 4: 'right'}
|
||||
|
||||
def parse_radar_data(raw_data):
|
||||
"""
|
||||
解析雷达原始数据
|
||||
数据格式: 0xA5 0x5A [雷达ID-距离高字节-距离低字节]... 0x5A 0xA5
|
||||
"""
|
||||
try:
|
||||
# 查找帧头位置
|
||||
frame_start = raw_data.find(b'\xA5\x5A')
|
||||
if frame_start == -1:
|
||||
return None
|
||||
|
||||
# 查找帧尾位置
|
||||
frame_end = raw_data.find(b'\x5A\xA5', frame_start)
|
||||
if frame_end == -1:
|
||||
return None
|
||||
|
||||
# 提取完整帧数据
|
||||
frame_data = raw_data[frame_start+2:frame_end]
|
||||
|
||||
# 检查数据长度是否正确(每个雷达3字节 × 4雷达 = 12字节)
|
||||
if len(frame_data) != 12:
|
||||
return None
|
||||
|
||||
# 解析每个雷达数据
|
||||
radar_distances = {}
|
||||
for i in range(4):
|
||||
radar_id = frame_data[i*3]
|
||||
if radar_id not in RADAR_ID_MAP:
|
||||
continue
|
||||
|
||||
distance_high = frame_data[i*3+1]
|
||||
distance_low = frame_data[i*3+2]
|
||||
distance = (distance_high << 8) + distance_low
|
||||
|
||||
direction = RADAR_ID_MAP[radar_id]
|
||||
radar_distances[direction] = distance
|
||||
|
||||
return radar_distances
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 雷达数据解析失败: {e}")
|
||||
return None
|
||||
|
||||
sys.path.append(os.path.dirname(__file__)) # 确保能导入 py_utils
|
||||
from py_utils.coco_utils import COCO_test_helper
|
||||
from py_utils.rknn_executor import RKNN_model_container # 假设使用 RKNN
|
||||
@@ -268,6 +324,17 @@ class MultiCameraBirdView:
|
||||
"left": False,
|
||||
"right": False
|
||||
}
|
||||
|
||||
# 雷达数据
|
||||
self.radar_distances = {
|
||||
"front": 0,
|
||||
"back": 0,
|
||||
"left": 0,
|
||||
"right": 0
|
||||
}
|
||||
self.radar_serial = None
|
||||
self.radar_thread = None
|
||||
self.radar_running = False
|
||||
|
||||
# === 新增:YOLO 人体检测模型 ===
|
||||
try:
|
||||
@@ -281,6 +348,9 @@ class MultiCameraBirdView:
|
||||
|
||||
self.shared_buffer = SharedFrameBuffer() # 👈 新增
|
||||
|
||||
# 初始化雷达串口
|
||||
self._init_radar_serial()
|
||||
|
||||
def overlay_alert(self, birdview_img):
|
||||
"""在鸟瞰图上叠加半透明红色预警区域"""
|
||||
h, w = birdview_img.shape[:2]
|
||||
@@ -369,6 +439,63 @@ class MultiCameraBirdView:
|
||||
print(f"[ERROR] YOLO检测失败: {e}")
|
||||
return image, [], []
|
||||
|
||||
def _init_radar_serial(self):
|
||||
"""
|
||||
初始化雷达串口并启动数据读取线程
|
||||
"""
|
||||
try:
|
||||
self.radar_serial = serial.Serial(
|
||||
port=RADAR_SERIAL_PORT,
|
||||
baudrate=RADAR_BAUDRATE,
|
||||
timeout=RADAR_TIMEOUT
|
||||
)
|
||||
print(f"✅ 已打开雷达串口 {RADAR_SERIAL_PORT}")
|
||||
|
||||
# 启动雷达数据读取线程
|
||||
self.radar_running = True
|
||||
self.radar_thread = threading.Thread(target=self._read_radar_data, daemon=True)
|
||||
self.radar_thread.start()
|
||||
print("[INFO] 雷达数据读取线程已启动")
|
||||
|
||||
except serial.SerialException as e:
|
||||
print(f"❌ 无法打开雷达串口 {RADAR_SERIAL_PORT}: {e}")
|
||||
self.radar_serial = None
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 雷达初始化失败: {e}")
|
||||
self.radar_serial = None
|
||||
|
||||
def _read_radar_data(self):
|
||||
"""
|
||||
雷达数据读取线程函数
|
||||
"""
|
||||
buffer = b''
|
||||
while self.radar_running and self.radar_serial and self.radar_serial.is_open:
|
||||
try:
|
||||
# 读取可用数据
|
||||
if self.radar_serial.in_waiting > 0:
|
||||
data = self.radar_serial.read(self.radar_serial.in_waiting)
|
||||
buffer += data
|
||||
|
||||
# 尝试解析数据
|
||||
parsed_data = parse_radar_data(buffer)
|
||||
|
||||
if parsed_data:
|
||||
print(f"[RADAR] 前:{parsed_data.get('front', 0):4d}cm 后:{parsed_data.get('back', 0):4d}cm 左:{parsed_data.get('left', 0):4d}cm 右:{parsed_data.get('right', 0):4d}cm")
|
||||
|
||||
# 更新雷达距离数据
|
||||
self.radar_distances.update(parsed_data)
|
||||
# 清空已解析的缓冲区
|
||||
buffer = buffer[buffer.find(b'\x5A\xA5')+2:]
|
||||
|
||||
time.sleep(0.01) # 降低CPU占用
|
||||
|
||||
except serial.SerialException as e:
|
||||
print(f"❌ 雷达串口异常: {e}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 雷达数据读取失败: {e}")
|
||||
time.sleep(0.1)
|
||||
|
||||
def _initialize_weights(self):
|
||||
try:
|
||||
images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names]
|
||||
@@ -404,6 +531,9 @@ class MultiCameraBirdView:
|
||||
current_view = "front"
|
||||
frame_count = 0
|
||||
detection_interval = 3 # 每5帧进行一次检测,避免性能问题
|
||||
|
||||
# 雷达触发视图切换的方向优先级(距离过近时优先显示哪个方向)
|
||||
radar_priority = ['front', 'left', 'right', 'back']
|
||||
|
||||
while self.running:
|
||||
raw_frames = {}
|
||||
@@ -414,7 +544,7 @@ class MultiCameraBirdView:
|
||||
ret, frame = cap.read()
|
||||
|
||||
raw_frames[name] = frame
|
||||
# self.shared_buffer.update(raw_frames[current_view])
|
||||
|
||||
|
||||
p_frame = self.process_frame_once(frame, model)
|
||||
|
||||
@@ -427,6 +557,18 @@ class MultiCameraBirdView:
|
||||
self.birdview.make_white_balance()
|
||||
self.birdview.copy_car_image()
|
||||
|
||||
# 检查雷达距离,自动切换视图
|
||||
auto_switch_view = None
|
||||
for direction in radar_priority:
|
||||
distance = self.radar_distances.get(direction, 0)
|
||||
if distance > 0 and distance < DISTANCE_THRESHOLD:
|
||||
auto_switch_view = direction
|
||||
break
|
||||
|
||||
if auto_switch_view:
|
||||
current_view = auto_switch_view
|
||||
print(f"[INFO] 雷达检测到{auto_switch_view}方向距离过近 ({self.radar_distances[auto_switch_view]}cm),自动切换视图")
|
||||
|
||||
# 获取单路图像(仅去畸变)
|
||||
single_img = self.process_frame_undistort(
|
||||
raw_frames[current_view],
|
||||
@@ -465,49 +607,23 @@ class MultiCameraBirdView:
|
||||
bird_resized = cv2.resize(birdview_with_alert, (w_bird, h_display))
|
||||
single_resized = cv2.resize(single_img, (w_single, h_display))
|
||||
display = np.hstack((bird_resized, single_resized))
|
||||
|
||||
|
||||
|
||||
|
||||
# 在显示窗口上添加状态信息
|
||||
# info_text = f"View: {current_view} | Persons detected: {len(person_boxes) if 'person_boxes' in locals() else 0}"
|
||||
# cv2.putText(display, info_text, (10, 30),
|
||||
# cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
|
||||
|
||||
self.shared_buffer.update(display)
|
||||
# 全屏显示
|
||||
cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN)
|
||||
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
||||
cv2.imshow("Video", display)
|
||||
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
# if key == ord('q'):
|
||||
# self.running = False
|
||||
# break
|
||||
# elif key == ord('1'):
|
||||
# current_view = "front"
|
||||
# elif key == ord('2'):
|
||||
# current_view = "back"
|
||||
# elif key == ord('3'):
|
||||
# current_view = "left"
|
||||
# elif key == ord('4'):
|
||||
# current_view = "right"
|
||||
# # 新增:预警控制
|
||||
# elif key == ord('5'):
|
||||
# self.alerts["front"] = True
|
||||
# elif key == ord('6'):
|
||||
# self.alerts["back"] = True
|
||||
# elif key == ord('7'):
|
||||
# self.alerts["left"] = True
|
||||
# elif key == ord('8'):
|
||||
# self.alerts["right"] = True
|
||||
# elif key == ord('0'):
|
||||
# # 清除所有预警
|
||||
# for k in self.alerts:
|
||||
# self.alerts[k] = False
|
||||
# elif key == ord('d'):
|
||||
# # 手动触发一次检测
|
||||
# single_img, person_boxes, person_scores = self.detect_persons(single_img)
|
||||
|
||||
|
||||
# 清理雷达资源
|
||||
self.radar_running = False
|
||||
if self.radar_thread and self.radar_thread.is_alive():
|
||||
self.radar_thread.join(timeout=1.0)
|
||||
if self.radar_serial and self.radar_serial.is_open:
|
||||
self.radar_serial.close()
|
||||
print("🔌 雷达串口已关闭")
|
||||
|
||||
for cap in self.caps:
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
Reference in New Issue
Block a user