diff --git a/py_utils/__init__.py b/py_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/py_utils/__pycache__/__init__.cpython-313.pyc b/py_utils/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..754433c Binary files /dev/null and b/py_utils/__pycache__/__init__.cpython-313.pyc differ diff --git a/py_utils/__pycache__/__init__.cpython-38.pyc b/py_utils/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..4f0b24d Binary files /dev/null and b/py_utils/__pycache__/__init__.cpython-38.pyc differ diff --git a/py_utils/__pycache__/coco_utils.cpython-313.pyc b/py_utils/__pycache__/coco_utils.cpython-313.pyc new file mode 100644 index 0000000..ee73b59 Binary files /dev/null and b/py_utils/__pycache__/coco_utils.cpython-313.pyc differ diff --git a/py_utils/__pycache__/coco_utils.cpython-38.pyc b/py_utils/__pycache__/coco_utils.cpython-38.pyc new file mode 100644 index 0000000..570109b Binary files /dev/null and b/py_utils/__pycache__/coco_utils.cpython-38.pyc differ diff --git a/py_utils/__pycache__/rknn_executor.cpython-313.pyc b/py_utils/__pycache__/rknn_executor.cpython-313.pyc new file mode 100644 index 0000000..8ce0163 Binary files /dev/null and b/py_utils/__pycache__/rknn_executor.cpython-313.pyc differ diff --git a/py_utils/__pycache__/rknn_executor.cpython-38.pyc b/py_utils/__pycache__/rknn_executor.cpython-38.pyc new file mode 100644 index 0000000..f6c2547 Binary files /dev/null and b/py_utils/__pycache__/rknn_executor.cpython-38.pyc differ diff --git a/py_utils/coco_utils.py b/py_utils/coco_utils.py new file mode 100644 index 0000000..713257c --- /dev/null +++ b/py_utils/coco_utils.py @@ -0,0 +1,176 @@ +from copy import copy +import os +import cv2 +import numpy as np +import json + +class Letter_Box_Info(): + def __init__(self, shape, new_shape, w_ratio, h_ratio, dw, dh, pad_color) -> None: + self.origin_shape = shape + self.new_shape = new_shape + self.w_ratio = w_ratio + self.h_ratio = h_ratio + self.dw = dw + self.dh = dh + self.pad_color = pad_color + + +def coco_eval_with_json(anno_json, pred_json): + from pycocotools.coco import COCO + from pycocotools.cocoeval import COCOeval + anno = COCO(anno_json) + pred = anno.loadRes(pred_json) + eval = COCOeval(anno, pred, 'bbox') + # eval.params.useCats = 0 + # eval.params.maxDets = list((100, 300, 1000)) + # a = np.array(list(range(50, 96, 1)))/100 + # eval.params.iouThrs = a + eval.evaluate() + eval.accumulate() + eval.summarize() + map, map50 = eval.stats[:2] # update results (mAP@0.5:0.95, mAP@0.5) + + print('map --> ', map) + print('map50--> ', map50) + print('map75--> ', eval.stats[2]) + print('map85--> ', eval.stats[-2]) + print('map95--> ', eval.stats[-1]) + +class COCO_test_helper(): + def __init__(self, enable_letter_box = False) -> None: + self.record_list = [] + self.enable_ltter_box = enable_letter_box + if self.enable_ltter_box is True: + self.letter_box_info_list = [] + else: + self.letter_box_info_list = None + + def letter_box(self, im, new_shape, pad_color=(0,0,0), info_need=False): + # Resize and pad image while meeting stride-multiple constraints + shape = im.shape[:2] # current shape [height, width] + if isinstance(new_shape, int): + new_shape = (new_shape, new_shape) + + # Scale ratio + r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) + + # Compute padding + ratio = r # width, height ratios + new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) + dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding + + dw /= 2 # divide padding into 2 sides + dh /= 2 + + if shape[::-1] != new_unpad: # resize + im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) + top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) + left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) + im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=pad_color) # add border + + if self.enable_ltter_box is True: + self.letter_box_info_list.append(Letter_Box_Info(shape, new_shape, ratio, ratio, dw, dh, pad_color)) + if info_need is True: + return im, ratio, (dw, dh) + else: + return im + + def direct_resize(self, im, new_shape, info_need=False): + shape = im.shape[:2] + h_ratio = new_shape[0]/ shape[0] + w_ratio = new_shape[1]/ shape[1] + if self.enable_ltter_box is True: + self.letter_box_info_list.append(Letter_Box_Info(shape, new_shape, w_ratio, h_ratio, 0, 0, (0,0,0))) + im = cv2.resize(im, (new_shape[1], new_shape[0])) + return im + + def get_real_box(self, box, in_format='xyxy'): + bbox = copy(box) + if self.enable_ltter_box == True: + # unletter_box result + if in_format=='xyxy': + bbox[:,0] -= self.letter_box_info_list[-1].dw + bbox[:,0] /= self.letter_box_info_list[-1].w_ratio + bbox[:,0] = np.clip(bbox[:,0], 0, self.letter_box_info_list[-1].origin_shape[1]) + + bbox[:,1] -= self.letter_box_info_list[-1].dh + bbox[:,1] /= self.letter_box_info_list[-1].h_ratio + bbox[:,1] = np.clip(bbox[:,1], 0, self.letter_box_info_list[-1].origin_shape[0]) + + bbox[:,2] -= self.letter_box_info_list[-1].dw + bbox[:,2] /= self.letter_box_info_list[-1].w_ratio + bbox[:,2] = np.clip(bbox[:,2], 0, self.letter_box_info_list[-1].origin_shape[1]) + + bbox[:,3] -= self.letter_box_info_list[-1].dh + bbox[:,3] /= self.letter_box_info_list[-1].h_ratio + bbox[:,3] = np.clip(bbox[:,3], 0, self.letter_box_info_list[-1].origin_shape[0]) + return bbox + + def get_real_seg(self, seg): + #! fix side effect + dh = int(self.letter_box_info_list[-1].dh) + dw = int(self.letter_box_info_list[-1].dw) + origin_shape = self.letter_box_info_list[-1].origin_shape + new_shape = self.letter_box_info_list[-1].new_shape + if (dh == 0) and (dw == 0) and origin_shape == new_shape: + return seg + elif dh == 0 and dw != 0: + seg = seg[:, :, dw:-dw] # a[0:-0] = [] + elif dw == 0 and dh != 0 : + seg = seg[:, dh:-dh, :] + seg = np.where(seg, 1, 0).astype(np.uint8).transpose(1,2,0) + seg = cv2.resize(seg, (origin_shape[1], origin_shape[0]), interpolation=cv2.INTER_LINEAR) + if len(seg.shape) < 3: + return seg[None,:,:] + else: + return seg.transpose(2,0,1) + + def add_single_record(self, image_id, category_id, bbox, score, in_format='xyxy', pred_masks = None): + if self.enable_ltter_box == True: + # unletter_box result + if in_format=='xyxy': + bbox[0] -= self.letter_box_info_list[-1].dw + bbox[0] /= self.letter_box_info_list[-1].w_ratio + + bbox[1] -= self.letter_box_info_list[-1].dh + bbox[1] /= self.letter_box_info_list[-1].h_ratio + + bbox[2] -= self.letter_box_info_list[-1].dw + bbox[2] /= self.letter_box_info_list[-1].w_ratio + + bbox[3] -= self.letter_box_info_list[-1].dh + bbox[3] /= self.letter_box_info_list[-1].h_ratio + # bbox = [value/self.letter_box_info_list[-1].ratio for value in bbox] + + if in_format=='xyxy': + # change xyxy to xywh + bbox[2] = bbox[2] - bbox[0] + bbox[3] = bbox[3] - bbox[1] + else: + assert False, "now only support xyxy format, please add code to support others format" + + def single_encode(x): + from pycocotools.mask import encode + rle = encode(np.asarray(x[:, :, None], order="F", dtype="uint8"))[0] + rle["counts"] = rle["counts"].decode("utf-8") + return rle + + if pred_masks is None: + self.record_list.append({"image_id": image_id, + "category_id": category_id, + "bbox":[round(x, 3) for x in bbox], + 'score': round(score, 5), + }) + else: + rles = single_encode(pred_masks) + self.record_list.append({"image_id": image_id, + "category_id": category_id, + "bbox":[round(x, 3) for x in bbox], + 'score': round(score, 5), + 'segmentation': rles, + }) + + def export_to_json(self, path): + with open(path, 'w') as f: + json.dump(self.record_list, f) + diff --git a/py_utils/onnx_executor.py b/py_utils/onnx_executor.py new file mode 100644 index 0000000..63ddc83 --- /dev/null +++ b/py_utils/onnx_executor.py @@ -0,0 +1,103 @@ +import os +import numpy as np +import onnxruntime as rt + +type_map = { + 'tensor(int32)' : np.int32, + 'tensor(int64)' : np.int64, + 'tensor(float32)' : np.float32, + 'tensor(float64)' : np.float64, + 'tensor(float)' : np.float32, +} +if getattr(np, 'bool', False): + type_map['tensor(bool)'] = np.bool +else: + type_map['tensor(bool)'] = bool + +def ignore_dim_with_zero(_shape, _shape_target): + _shape = list(_shape) + _shape_target = list(_shape_target) + for i in range(_shape.count(1)): + _shape.remove(1) + for j in range(_shape_target.count(1)): + _shape_target.remove(1) + if _shape == _shape_target: + return True + else: + return False + + +class ONNX_model_container_py: + def __init__(self, model_path) -> None: + # sess_options= + sp_options = rt.SessionOptions() + sp_options.log_severity_level = 3 + # [1 for info, 2 for warning, 3 for error, 4 for fatal] + self.sess = rt.InferenceSession(model_path, sess_options=sp_options, providers=['CPUExecutionProvider']) + self.model_path = model_path + + def run(self, input_datas): + if len(input_datas) < len(self.sess.get_inputs()): + assert False,'inputs_datas number not match onnx model{} input'.format(self.model_path) + elif len(input_datas) > len(self.sess.get_inputs()): + print('WARNING: input datas number large than onnx input node') + + input_dict = {} + for i, _input in enumerate(self.sess.get_inputs()): + # convert type + if _input.type in type_map and \ + type_map[_input.type] != input_datas[i].dtype: + print('WARNING: force data-{} from {} to {}'.format(i, input_datas[i].dtype, type_map[_input.type])) + input_datas[i] = input_datas[i].astype(type_map[_input.type]) + + # reshape if need + if _input.shape != list(input_datas[i].shape): + if ignore_dim_with_zero(input_datas[i].shape,_input.shape): + input_datas[i] = input_datas[i].reshape(_input.shape) + print("WARNING: reshape inputdata-{}: from {} to {}".format(i, input_datas[i].shape, _input.shape)) + else: + assert False, 'input shape{} not match real data shape{}'.format(_input.shape, input_datas[i].shape) + input_dict[_input.name] = input_datas[i] + + output_list = [] + for i in range(len(self.sess.get_outputs())): + output_list.append(self.sess.get_outputs()[i].name) + + #forward model + res = self.sess.run(output_list, input_dict) + return res + + +class ONNX_model_container_cpp: + def __init__(self, model_path) -> None: + pass + + def run(self, input_datas): + pass + + +def ONNX_model_container(model_path, backend='py'): + if backend == 'py': + return ONNX_model_container_py(model_path) + elif backend == 'cpp': + return ONNX_model_container_cpp(model_path) + + +def reset_onnx_shape(onnx_model_path, output_path, input_shapes): + if isinstance(input_shapes[0], int): + command = "python -m onnxsim {} {} --input-shape {}".format(onnx_model_path, output_path, ','.join([str(v) for v in input_shapes])) + else: + if len(input_shapes)!= 1: + print("RESET ONNX SHAPE with more than one input, try to match input name") + sess = rt.InferenceSession(onnx_model_path) + input_names = [input.name for input in sess.get_inputs()] + command = "python -m onnxsim {} {} --input-shape ".format(onnx_model_path, output_path) + for i, input_name in enumerate(input_names): + command += "{}:{} ".format(input_name, ','.join([str(v) for v in input_shapes[i]])) + else: + command = "python -m onnxsim {} {} --input-shape {}".format(onnx_model_path, output_path, ','.join([str(v) for v in input_shapes[0]])) + + print(command) + os.system(command) + return output_path + \ No newline at end of file diff --git a/py_utils/pytorch_executor.py b/py_utils/pytorch_executor.py new file mode 100644 index 0000000..c145422 --- /dev/null +++ b/py_utils/pytorch_executor.py @@ -0,0 +1,52 @@ +import torch +torch.backends.quantized.engine = 'qnnpack' + +def multi_list_unfold(tl): + def unfold(_inl, target): + if not isinstance(_inl, list) and not isinstance(_inl, tuple): + target.append(_inl) + else: + unfold(_inl) + +def flatten_list(in_list): + flatten = lambda x: [subitem for item in x for subitem in flatten(item)] if type(x) is list else [x] + return flatten(in_list) + +class Torch_model_container: + def __init__(self, model_path, qnnpack=False) -> None: + if qnnpack is True: + torch.backends.quantized.engine = 'qnnpack' + + #! Backends must be set before load model. + self.pt_model = torch.jit.load(model_path) + self.pt_model.eval() + holdon = 1 + + def run(self, input_datas): + assert isinstance(input_datas, list), "input_datas should be a list, like [np.ndarray, np.ndarray]" + + input_datas_torch_type = [] + for _data in input_datas: + input_datas_torch_type.append(torch.tensor(_data)) + + for i,val in enumerate(input_datas_torch_type): + if val.dtype == torch.float64: + input_datas_torch_type[i] = input_datas_torch_type[i].float() + + result = self.pt_model(*input_datas_torch_type) + + if isinstance(result, tuple): + result = list(result) + if not isinstance(result, list): + result = [result] + + result = flatten_list(result) + + for i in range(len(result)): + result[i] = torch.dequantize(result[i]) + + for i in range(len(result)): + # TODO support quantized_output + result[i] = result[i].cpu().detach().numpy() + + return result \ No newline at end of file diff --git a/py_utils/rknn_executor (copy 1).py b/py_utils/rknn_executor (copy 1).py new file mode 100644 index 0000000..db463d7 --- /dev/null +++ b/py_utils/rknn_executor (copy 1).py @@ -0,0 +1,31 @@ +from rknn.api import RKNN + + +class RKNN_model_container(): + def __init__(self, model_path, target=None, device_id=None) -> None: + rknn = RKNN() + + # Direct Load RKNN Model + rknn.load_rknn(model_path) + + print('--> Init runtime environment') + if target==None: + ret = rknn.init_runtime() + else: + ret = rknn.init_runtime(target=target, device_id=device_id) + if ret != 0: + print('Init runtime environment failed') + exit(ret) + print('done') + + self.rknn = rknn + + def run(self, inputs): + if isinstance(inputs, list) or isinstance(inputs, tuple): + pass + else: + inputs = [inputs] + + result = self.rknn.inference(inputs=inputs) + + return result \ No newline at end of file diff --git a/py_utils/rknn_executor.py b/py_utils/rknn_executor.py new file mode 100644 index 0000000..038d460 --- /dev/null +++ b/py_utils/rknn_executor.py @@ -0,0 +1,26 @@ +from rknnlite.api import RKNNLite as RKNN + +class RKNN_model_container(): + def __init__(self, model_path, target=None, device_id=None) -> None: + rknn = RKNN() + rknn.load_rknn(model_path) + ret = rknn.init_runtime() + self.rknn = rknn + + def run(self, inputs): + if self.rknn is None: + print("ERROR: rknn has been released") + return [] + + if isinstance(inputs, list) or isinstance(inputs, tuple): + pass + else: + inputs = [inputs] + + result = self.rknn.inference(inputs=inputs) + + return result + + def release(self): + self.rknn.release() + self.rknn = None diff --git a/web.py b/web.py index 3991678..23b3bcc 100644 --- a/web.py +++ b/web.py @@ -8,7 +8,188 @@ import numpy as np from surround_view import FisheyeCameraModel, BirdView import surround_view.param_settings as settings -right_frame = None +sys.path.append(os.path.dirname(__file__)) # 确保能导入 py_utils +from py_utils.coco_utils import COCO_test_helper +from py_utils.rknn_executor import RKNN_model_container # 假设使用 RKNN + + + +# ------YOLO 配置----------- + +# YOLO 配置 +YOLO_MODEL_PATH = './yolov5s-640-640.rknn' +OBJ_THRESH = 0.6 +NMS_THRESH = 0.6 +IMG_SIZE = (640, 640) # (w, h) +CLASSES = ("person",) # 只关心 person + +# 加载 anchors +ANCHORS_FILE = './model/anchors_yolov5.txt' +with open(ANCHORS_FILE, 'r') as f: + values = [float(_v) for _v in f.readlines()] + ANCHORS = np.array(values).reshape(3, -1, 2).tolist() + + +# ---------- YOLO 处理函数 ---------- +def filter_boxes(boxes, box_confidences, box_class_probs): + box_confidences = box_confidences.reshape(-1) + class_max_score = np.max(box_class_probs, axis=-1) + classes = np.argmax(box_class_probs, axis=-1) + + _class_pos = np.where(class_max_score * box_confidences >= OBJ_THRESH) + scores = (class_max_score * box_confidences)[_class_pos] + + boxes = boxes[_class_pos] + classes = classes[_class_pos] + + return boxes, classes, scores + +def nms_boxes(boxes, scores): + x = boxes[:, 0] + y = boxes[:, 1] + w = boxes[:, 2] - boxes[:, 0] + h = boxes[:, 3] - boxes[:, 1] + + areas = w * h + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + + xx1 = np.maximum(x[i], x[order[1:]]) + yy1 = np.maximum(y[i], y[order[1:]]) + xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]]) + yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]]) + + w1 = np.maximum(0.0, xx2 - xx1 + 0.00001) + h1 = np.maximum(0.0, yy2 - yy1 + 0.00001) + inter = w1 * h1 + + ovr = inter / (areas[i] + areas[order[1:]] - inter) + inds = np.where(ovr <= NMS_THRESH)[0] + order = order[inds + 1] + keep = np.array(keep) + return keep + +def box_process(position, anchors): + grid_h, grid_w = position.shape[2:4] + col, row = np.meshgrid(np.arange(0, grid_w), np.arange(0, grid_h)) + col = col.reshape(1, 1, grid_h, grid_w) + row = row.reshape(1, 1, grid_h, grid_w) + grid = np.concatenate((col, row), axis=1) + stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1, 2, 1, 1) + + col = col.repeat(len(anchors), axis=0) + row = row.repeat(len(anchors), axis=0) + anchors = np.array(anchors) + anchors = anchors.reshape(*anchors.shape, 1, 1) + + box_xy = position[:, :2, :, :] * 2 - 0.5 + box_wh = pow(position[:, 2:4, :, :] * 2, 2) * anchors + + box_xy += grid + box_xy *= stride + box = np.concatenate((box_xy, box_wh), axis=1) + + xyxy = np.copy(box) + xyxy[:, 0, :, :] = box[:, 0, :, :] - box[:, 2, :, :] / 2 + xyxy[:, 1, :, :] = box[:, 1, :, :] - box[:, 3, :, :] / 2 + xyxy[:, 2, :, :] = box[:, 0, :, :] + box[:, 2, :, :] / 2 + xyxy[:, 3, :, :] = box[:, 1, :, :] + box[:, 3, :, :] / 2 + + return xyxy + +def post_process(input_data, anchors): + boxes, scores, classes_conf = [], [], [] + input_data = [_in.reshape([len(anchors[0]), -1] + list(_in.shape[-2:])) for _in in input_data] + for i in range(len(input_data)): + boxes.append(box_process(input_data[i][:, :4, :, :], anchors[i])) + scores.append(input_data[i][:, 4:5, :, :]) + classes_conf.append(input_data[i][:, 5:, :, :]) + + def sp_flatten(_in): + ch = _in.shape[1] + _in = _in.transpose(0, 2, 3, 1) + return _in.reshape(-1, ch) + + boxes = [sp_flatten(_v) for _v in boxes] + classes_conf = [sp_flatten(_v) for _v in classes_conf] + scores = [sp_flatten(_v) for _v in scores] + + boxes = np.concatenate(boxes) + classes_conf = np.concatenate(classes_conf) + scores = np.concatenate(scores) + + boxes, classes, scores = filter_boxes(boxes, scores, classes_conf) + + nboxes, nclasses, nscores = [], [], [] + for c in set(classes): + inds = np.where(classes == c) + b = boxes[inds] + c = classes[inds] + s = scores[inds] + keep = nms_boxes(b, s) + + if len(keep) != 0: + nboxes.append(b[keep]) + nclasses.append(c[keep]) + nscores.append(s[keep]) + + if not nclasses and not nscores: + return None, None, None + + boxes = np.concatenate(nboxes) + classes = np.concatenate(nclasses) + scores = np.concatenate(nscores) + + return boxes, classes, scores + +def draw_detections(image, boxes, scores, classes): + """在图像上绘制检测框""" + if boxes is None: + return image + + for box, score, cl in zip(boxes, scores, classes): + # 只绘制人的检测框 + if CLASSES[cl] != "person": + continue + + top, left, right, bottom = [int(_b) for _b in box] + + # 绘制矩形框 + cv2.rectangle(image, (top, left), (right, bottom), (0, 255, 0), 20) + + # 绘制标签背景 + label = f'person: {score:.2f}' + (label_width, label_height), baseline = cv2.getTextSize( + label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 15 + ) + + # 绘制标签矩形 + cv2.rectangle( + image, + (top, left - label_height - 5), + (top + label_width, left), + (0, 255, 0), + -1 + ) + + # # 绘制标签文字 + cv2.putText( + image, + label, + (top, left - 5), + cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 0, 0), + 2 + ) + + return image + +# ------------------------ class MultiCameraBirdView: def __init__(self): @@ -36,10 +217,116 @@ class MultiCameraBirdView: self.running = False return self.caps.append(cap) + self.birdview = BirdView() self._initialize_weights() + # 新增:预警状态 + self.alerts = { + "front": False, + "back": False, + "left": False, + "right": False + } + + # === 新增:YOLO 人体检测模型 === + try: + self.yolo_model = RKNN_model_container(YOLO_MODEL_PATH, target='rk3588') + print("[INFO] YOLO 模型加载成功") + # 初始化COCO助手用于图像预处理 + self.co_helper = COCO_test_helper(enable_letter_box=True) + except Exception as e: + print(f"[ERROR] YOLO 模型加载失败: {e}") + self.yolo_model = None + + def overlay_alert(self, birdview_img): + """在鸟瞰图上叠加半透明红色预警区域""" + h, w = birdview_img.shape[:2] + overlay = birdview_img.copy() + + alpha = 0.2 # 透明度 + red = (0, 0, 200) + + margin_f_b = int(min(h, w) * 0.07) # 预警区域宽度(约7%)前后 + margin_l_r = int(min(h, w) * 0.15) # 预警区域宽度(约15%)左右 + + if self.alerts["front"]: + cv2.rectangle(overlay, (0, 0), (w, margin_f_b), red, -1) + if self.alerts["back"]: + cv2.rectangle(overlay, (0, h - margin_f_b), (w, h), red, -1) + if self.alerts["left"]: + cv2.rectangle(overlay, (0, 0), (margin_l_r, h), red, -1) + if self.alerts["right"]: + cv2.rectangle(overlay, (w - margin_l_r, 0), (w, h), red, -1) + + # 混合原图与覆盖层 + blended = cv2.addWeighted(birdview_img, 1 - alpha, overlay, alpha, 0) + return blended + + def detect_persons(self, image): + """使用YOLO模型检测图像中的人体""" + if self.yolo_model is None: + return image, [], [] + + try: + # 保存原始图像尺寸 + orig_h, orig_w = image.shape[:2] + + # 预处理图像 + pad_color = (0, 0, 0) + img_preprocessed = self.co_helper.letter_box( + im=image.copy(), + new_shape=(IMG_SIZE[1], IMG_SIZE[0]), + pad_color=pad_color + ) + img_preprocessed = cv2.cvtColor(img_preprocessed, cv2.COLOR_BGR2RGB) + + # 推理 + outputs = self.yolo_model.run([np.expand_dims(img_preprocessed, 0)]) + + # 后处理 + boxes, classes, scores = post_process(outputs, ANCHORS) + + if boxes is not None: + # 将检测框转换回原始图像坐标 + real_boxes = self.co_helper.get_real_box(boxes) + + # 筛选出人的检测框 + person_boxes = [] + person_scores = [] + + for i in range(len(real_boxes)): + if classes[i] < len(CLASSES) and CLASSES[classes[i]] == "person": + # 确保坐标在图像范围内 + box = real_boxes[i].copy() + box[0] = max(0, min(box[0], orig_w)) + box[1] = max(0, min(box[1], orig_h)) + box[2] = max(0, min(box[2], orig_w)) + box[3] = max(0, min(box[3], orig_h)) + + person_boxes.append(box) + person_scores.append(scores[i]) + + # 在图像上绘制检测框 + if person_boxes: + image = draw_detections(image, np.array(person_boxes), + np.array(person_scores), + np.zeros(len(person_boxes), dtype=int)) + + # 打印检测信息 + # print(f"[YOLO] 检测到 {len(person_boxes)} 个人体") + # for box, score in zip(person_boxes, person_scores): + # print(f" 位置: ({int(box[0])}, {int(box[1])}, {int(box[2])}, {int(box[3])}), 置信度: {score:.2f}") + + return image, person_boxes, person_scores + else: + return image, [], [] + + except Exception as e: + print(f"[ERROR] YOLO检测失败: {e}") + return image, [], [] + def _initialize_weights(self): try: images = [os.path.join(os.getcwd(), "images", name + ".png") for name in self.names] @@ -69,28 +356,25 @@ class MultiCameraBirdView: def process_frame_undistort(self, frame, model): """只处理一次:去畸变 + 投影 + 翻转""" frame = model.undistort(frame) - # frame = model.project(frame) - # frame = model.flip(frame) return frame def run(self): - current_view = "front" # 默认显示前视图 + current_view = "front" + frame_count = 0 + detection_interval = 3 # 每5帧进行一次检测,避免性能问题 while self.running: raw_frames = {} processed_frames = [] valid = True - # 读取并处理所有摄像头帧 for i, (cap, model, name) in enumerate(zip(self.caps, self.camera_models, self.names)): ret, frame = cap.read() if not ret or frame is None: print(f"[WARN] 跳过 {name} 帧") valid = False break - # 保存原始帧(用于右侧显示) raw_frames[name] = frame.copy() - # 处理用于鸟瞰图的帧 p_frame = self.process_frame_once(frame, model) processed_frames.append(p_frame) @@ -103,21 +387,51 @@ class MultiCameraBirdView: self.birdview.make_white_balance() self.birdview.copy_car_image() - # 获取当前选中的单路图像(已校正) - single_img = self.process_frame_undistort(raw_frames[current_view], self.camera_models[self.names.index(current_view)]) - birdview_img = self.birdview.image + # 获取单路图像(仅去畸变) + single_img = self.process_frame_undistort( + raw_frames[current_view], + self.camera_models[self.names.index(current_view)] + ) + + # 在单路图像上进行人体检测 + frame_count += 1 + if frame_count % detection_interval == 0 and self.yolo_model is not None: + single_img, person_boxes, person_scores = self.detect_persons(single_img) + + # 根据检测结果自动触发预警 + if person_boxes: + # 可以根据人体的位置和数量来触发预警 + # 这里简单示例:只要检测到人就触发当前视图的预警 + self.alerts[current_view] = True + # 重置其他视图的预警 + for view in self.alerts: + if view != current_view: + self.alerts[view] = False + else: + # 没有检测到人,清除所有预警 + for view in self.alerts: + self.alerts[view] = False + + birdview_img = self.birdview.image.copy() + + # 叠加预警区域 + birdview_with_alert = self.overlay_alert(birdview_img) # 拼接显示:左侧鸟瞰图(1/3),右侧单路(2/3) h_display, w_display = 720, 1280 w_bird = w_display // 3 w_single = w_display - w_bird - bird_resized = cv2.resize(birdview_img, (w_bird, h_display)) + bird_resized = cv2.resize(birdview_with_alert, (w_bird, h_display)) single_resized = cv2.resize(single_img, (w_single, h_display)) display = np.hstack((bird_resized, single_resized)) + # 在显示窗口上添加状态信息 + info_text = f"View: {current_view} | Persons detected: {len(person_boxes) if 'person_boxes' in locals() else 0}" + cv2.putText(display, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) - + # 全屏显示 cv2.namedWindow('Video', cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) cv2.imshow("Video", display) @@ -134,8 +448,23 @@ class MultiCameraBirdView: current_view = "left" elif key == ord('4'): current_view = "right" + # 新增:预警控制 + elif key == ord('5'): + self.alerts["front"] = True + elif key == ord('6'): + self.alerts["back"] = True + elif key == ord('7'): + self.alerts["left"] = True + elif key == ord('8'): + self.alerts["right"] = True + elif key == ord('0'): + # 清除所有预警 + for k in self.alerts: + self.alerts[k] = False + elif key == ord('d'): + # 手动触发一次检测 + single_img, person_boxes, person_scores = self.detect_persons(single_img) - # 释放资源 for cap in self.caps: cap.release() cv2.destroyAllWindows() @@ -144,13 +473,17 @@ class MultiCameraBirdView: def main(): print("🚀 启动实时四路环视系统...") print("操作说明:") - print(" 1 - 前视图 | 2 - 后视图 | 3 - 左视图 | 4 - 右视图") - print(" q - 退出程序") + print(" 1-4: 切换单路视图(前/后/左/右)") + print(" 5-8: 触发前/后/左/右 接近预警") + print(" 0 : 清除所有预警") + print(" d : 手动触发人体检测") + print(" q : 退出程序") multi_cam = MultiCameraBirdView() if multi_cam.running: multi_cam.run() else: print("[ERROR] 摄像头初始化失败") + if __name__ == "__main__": main() \ No newline at end of file