# shared_buffer.py import threading import numpy as np class CameraFrameBuffer: """线程安全的摄像头帧缓冲区(去畸变后)""" def __init__(self, cam_names): self.frames = {name: None for name in cam_names} self.locks = {name: threading.Lock() for name in cam_names} self.updated = {name: False for name in cam_names} def update(self, name, frame): with self.locks[name]: self.frames[name] = frame.copy() self.updated[name] = True def get(self, name): with self.locks[name]: if self.updated[name] and self.frames[name] is not None: return True, self.frames[name].copy() else: return False, None class DetectionResultBuffer: """线程安全的检测结果缓冲区""" def __init__(self, cam_names): self.results = {name: (None, [], []) for name in cam_names} # (image, boxes, scores) self.lock = threading.Lock() def update(self, name, image, boxes, scores): with self.lock: self.results[name] = (image.copy() if image is not None else None, list(boxes), list(scores)) def get(self, name): with self.lock: img, boxes, scores = self.results[name] if img is not None: return img.copy(), list(boxes), list(scores) return None, [], []