标定运行

This commit is contained in:
2025-12-12 09:49:13 +08:00
parent 9257824716
commit c51757f66b
103 changed files with 2485 additions and 1 deletions

View File

@@ -0,0 +1,6 @@
from .fisheye_camera import FisheyeCameraModel
from .imagebuffer import MultiBufferManager
from .capture_thread import CaptureThread
from .process_thread import CameraProcessingThread
from .simple_gui import display_image, PointSelector
from .birdview import BirdView, ProjectedImageBuffer

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,52 @@
from queue import Queue
import cv2
from PyQt5.QtCore import (QThread, QTime, QMutex, pyqtSignal, QMutexLocker)
from .structures import ThreadStatisticsData
class BaseThread(QThread):
"""
Base class for all types of threads (capture, processing, stitching, ...,
etc). Mainly for collecting statistics of the threads.
"""
FPS_STAT_QUEUE_LENGTH = 32
update_statistics_gui = pyqtSignal(ThreadStatisticsData)
def __init__(self, parent=None):
super(BaseThread, self).__init__(parent)
self.init_commons()
def init_commons(self):
self.stopped = False
self.stop_mutex = QMutex()
self.clock = QTime()
self.fps = Queue()
self.processing_time = 0
self.processing_mutex = QMutex()
self.fps_sum = 0
self.stat_data = ThreadStatisticsData()
def stop(self):
with QMutexLocker(self.stop_mutex):
self.stopped = True
def update_fps(self, dt):
# add instantaneous fps value to queue
if dt > 0:
self.fps.put(1000 / dt)
# discard redundant items in the fps queue
if self.fps.qsize() > self.FPS_STAT_QUEUE_LENGTH:
self.fps.get()
# update statistics
if self.fps.qsize() == self.FPS_STAT_QUEUE_LENGTH:
while not self.fps.empty():
self.fps_sum += self.fps.get()
self.stat_data.average_fps = round(self.fps_sum / self.FPS_STAT_QUEUE_LENGTH, 2)
self.fps_sum = 0

339
surround_view/birdview.py Normal file
View File

@@ -0,0 +1,339 @@
import os
import numpy as np
import cv2
from PIL import Image
from PyQt5.QtCore import QMutex, QWaitCondition, QMutexLocker
from .base_thread import BaseThread
from .imagebuffer import Buffer
from . import param_settings as settings
from .param_settings import xl, xr, yt, yb
from . import utils
class ProjectedImageBuffer(object):
"""
Class for synchronizing processing threads from different cameras.
"""
def __init__(self, drop_if_full=True, buffer_size=8):
self.drop_if_full = drop_if_full
self.buffer = Buffer(buffer_size)
self.sync_devices = set()
self.wc = QWaitCondition()
self.mutex = QMutex()
self.arrived = 0
self.current_frames = dict()
def bind_thread(self, thread):
with QMutexLocker(self.mutex):
self.sync_devices.add(thread.device_id)
name = thread.camera_model.camera_name
shape = settings.project_shapes[name]
self.current_frames[thread.device_id] = np.zeros(shape[::-1] + (3,), np.uint8)
thread.proc_buffer_manager = self
def get(self):
return self.buffer.get()
def set_frame_for_device(self, device_id, frame):
if device_id not in self.sync_devices:
raise ValueError("Device not held by the buffer: {}".format(device_id))
self.current_frames[device_id] = frame
def sync(self, device_id):
# only perform sync if enabled for specified device/stream
self.mutex.lock()
if device_id in self.sync_devices:
# increment arrived count
self.arrived += 1
# we are the last to arrive: wake all waiting threads
if self.arrived == len(self.sync_devices):
self.buffer.add(self.current_frames, self.drop_if_full)
self.wc.wakeAll()
# still waiting for other streams to arrive: wait
else:
self.wc.wait(self.mutex)
# decrement arrived count
self.arrived -= 1
self.mutex.unlock()
def wake_all(self):
with QMutexLocker(self.mutex):
self.wc.wakeAll()
def __contains__(self, device_id):
return device_id in self.sync_devices
def __str__(self):
return (self.__class__.__name__ + ":\n" + \
"devices: {}\n".format(self.sync_devices))
def FI(front_image):
return front_image[:, :xl]
def FII(front_image):
return front_image[:, xr:]
def FM(front_image):
return front_image[:, xl:xr]
def BIII(back_image):
return back_image[:, :xl]
def BIV(back_image):
return back_image[:, xr:]
def BM(back_image):
return back_image[:, xl:xr]
def LI(left_image):
return left_image[:yt, :]
def LIII(left_image):
return left_image[yb:, :]
def LM(left_image):
return left_image[yt:yb, :]
def RII(right_image):
return right_image[:yt, :]
def RIV(right_image):
return right_image[yb:, :]
def RM(right_image):
return right_image[yt:yb, :]
class BirdView(BaseThread):
def __init__(self,
proc_buffer_manager=None,
drop_if_full=True,
buffer_size=8,
parent=None):
super(BirdView, self).__init__(parent)
self.proc_buffer_manager = proc_buffer_manager
self.drop_if_full = drop_if_full
self.buffer = Buffer(buffer_size)
self.image = np.zeros((settings.total_h, settings.total_w, 3), np.uint8)
self.weights = None
self.masks = None
self.car_image = settings.car_image
self.frames = None
def get(self):
return self.buffer.get()
def update_frames(self, images):
self.frames = images
def load_weights_and_masks(self, weights_image, masks_image):
GMat = np.asarray(Image.open(weights_image).convert("RGBA"), dtype=np.float) / 255.0
self.weights = [np.stack((GMat[:, :, k],
GMat[:, :, k],
GMat[:, :, k]), axis=2)
for k in range(4)]
Mmat = np.asarray(Image.open(masks_image).convert("RGBA"), dtype=np.float)
Mmat = utils.convert_binary_to_bool(Mmat)
self.masks = [Mmat[:, :, k] for k in range(4)]
def merge(self, imA, imB, k):
G = self.weights[k]
return (imA * G + imB * (1 - G)).astype(np.uint8)
@property
def FL(self):
return self.image[:yt, :xl]
@property
def F(self):
return self.image[:yt, xl:xr]
@property
def FR(self):
return self.image[:yt, xr:]
@property
def BL(self):
return self.image[yb:, :xl]
@property
def B(self):
return self.image[yb:, xl:xr]
@property
def BR(self):
return self.image[yb:, xr:]
@property
def L(self):
return self.image[yt:yb, :xl]
@property
def R(self):
return self.image[yt:yb, xr:]
@property
def C(self):
return self.image[yt:yb, xl:xr]
def stitch_all_parts(self):
front, back, left, right = self.frames
np.copyto(self.F, FM(front))
np.copyto(self.B, BM(back))
np.copyto(self.L, LM(left))
np.copyto(self.R, RM(right))
np.copyto(self.FL, self.merge(FI(front), LI(left), 0))
np.copyto(self.FR, self.merge(FII(front), RII(right), 1))
np.copyto(self.BL, self.merge(BIII(back), LIII(left), 2))
np.copyto(self.BR, self.merge(BIV(back), RIV(right), 3))
def copy_car_image(self):
np.copyto(self.C, self.car_image)
def make_luminance_balance(self):
def tune(x):
if x >= 1:
return x * np.exp((1 - x) * 0.5)
else:
return x * np.exp((1 - x) * 0.8)
front, back, left, right = self.frames
m1, m2, m3, m4 = self.masks
Fb, Fg, Fr = cv2.split(front)
Bb, Bg, Br = cv2.split(back)
Lb, Lg, Lr = cv2.split(left)
Rb, Rg, Rr = cv2.split(right)
a1 = utils.mean_luminance_ratio(RII(Rb), FII(Fb), m2)
a2 = utils.mean_luminance_ratio(RII(Rg), FII(Fg), m2)
a3 = utils.mean_luminance_ratio(RII(Rr), FII(Fr), m2)
b1 = utils.mean_luminance_ratio(BIV(Bb), RIV(Rb), m4)
b2 = utils.mean_luminance_ratio(BIV(Bg), RIV(Rg), m4)
b3 = utils.mean_luminance_ratio(BIV(Br), RIV(Rr), m4)
c1 = utils.mean_luminance_ratio(LIII(Lb), BIII(Bb), m3)
c2 = utils.mean_luminance_ratio(LIII(Lg), BIII(Bg), m3)
c3 = utils.mean_luminance_ratio(LIII(Lr), BIII(Br), m3)
d1 = utils.mean_luminance_ratio(FI(Fb), LI(Lb), m1)
d2 = utils.mean_luminance_ratio(FI(Fg), LI(Lg), m1)
d3 = utils.mean_luminance_ratio(FI(Fr), LI(Lr), m1)
t1 = (a1 * b1 * c1 * d1)**0.25
t2 = (a2 * b2 * c2 * d2)**0.25
t3 = (a3 * b3 * c3 * d3)**0.25
x1 = t1 / (d1 / a1)**0.5
x2 = t2 / (d2 / a2)**0.5
x3 = t3 / (d3 / a3)**0.5
x1 = tune(x1)
x2 = tune(x2)
x3 = tune(x3)
Fb = utils.adjust_luminance(Fb, x1)
Fg = utils.adjust_luminance(Fg, x2)
Fr = utils.adjust_luminance(Fr, x3)
y1 = t1 / (b1 / c1)**0.5
y2 = t2 / (b2 / c2)**0.5
y3 = t3 / (b3 / c3)**0.5
y1 = tune(y1)
y2 = tune(y2)
y3 = tune(y3)
Bb = utils.adjust_luminance(Bb, y1)
Bg = utils.adjust_luminance(Bg, y2)
Br = utils.adjust_luminance(Br, y3)
z1 = t1 / (c1 / d1)**0.5
z2 = t2 / (c2 / d2)**0.5
z3 = t3 / (c3 / d3)**0.5
z1 = tune(z1)
z2 = tune(z2)
z3 = tune(z3)
Lb = utils.adjust_luminance(Lb, z1)
Lg = utils.adjust_luminance(Lg, z2)
Lr = utils.adjust_luminance(Lr, z3)
w1 = t1 / (a1 / b1)**0.5
w2 = t2 / (a2 / b2)**0.5
w3 = t3 / (a3 / b3)**0.5
w1 = tune(w1)
w2 = tune(w2)
w3 = tune(w3)
Rb = utils.adjust_luminance(Rb, w1)
Rg = utils.adjust_luminance(Rg, w2)
Rr = utils.adjust_luminance(Rr, w3)
self.frames = [cv2.merge((Fb, Fg, Fr)),
cv2.merge((Bb, Bg, Br)),
cv2.merge((Lb, Lg, Lr)),
cv2.merge((Rb, Rg, Rr))]
return self
def get_weights_and_masks(self, images):
front, back, left, right = images
G0, M0 = utils.get_weight_mask_matrix(FI(front), LI(left))
G1, M1 = utils.get_weight_mask_matrix(FII(front), RII(right))
G2, M2 = utils.get_weight_mask_matrix(BIII(back), LIII(left))
G3, M3 = utils.get_weight_mask_matrix(BIV(back), RIV(right))
self.weights = [np.stack((G, G, G), axis=2) for G in (G0, G1, G2, G3)]
self.masks = [(M / 255.0).astype(int) for M in (M0, M1, M2, M3)]
return np.stack((G0, G1, G2, G3), axis=2), np.stack((M0, M1, M2, M3), axis=2)
def make_white_balance(self):
self.image = utils.make_white_balance(self.image)
def run(self):
if self.proc_buffer_manager is None:
raise ValueError("This thread requires a buffer of projected images to run")
while True:
self.stop_mutex.lock()
if self.stopped:
self.stopped = False
self.stop_mutex.unlock()
break
self.stop_mutex.unlock()
self.processing_time = self.clock.elapsed()
self.clock.start()
self.processing_mutex.lock()
self.update_frames(self.proc_buffer_manager.get().values())
self.make_luminance_balance().stitch_all_parts()
self.make_white_balance()
self.copy_car_image()
self.buffer.add(self.image.copy(), self.drop_if_full)
self.processing_mutex.unlock()
# update statistics
self.update_fps(self.processing_time)
self.stat_data.frames_processed_count += 1
# inform GUI of updated statistics
self.update_statistics_gui.emit(self.stat_data)

View File

@@ -0,0 +1,108 @@
import cv2
from PyQt5.QtCore import qDebug
from .base_thread import BaseThread
from .structures import ImageFrame
from .utils import gstreamer_pipeline
class CaptureThread(BaseThread):
def __init__(self,
device_id,
# flip_method=2,
drop_if_full=True,
api_preference=cv2.CAP_ANY,
resolution=None,
# use_gst=None,
parent=None):
"""
device_id: device number of the camera.
flip_method: 0 for identity, 2 for 180 degree rotation (if the camera is installed
up-side-down).
drop_if_full: drop the frame if buffer is full.
api_preference: cv2.CAP_GSTREAMER for csi cameras, usually cv2.CAP_ANY would suffice.
resolution: camera resolution (width, height).
"""
super(CaptureThread, self).__init__(parent)
self.device_id = device_id
# self.flip_method = flip_method
# self.use_gst = None
self.drop_if_full = drop_if_full
self.api_preference = api_preference
self.resolution = resolution
self.cap = cv2.VideoCapture()
# an instance of the MultiBufferManager object,
# for synchronizing this thread with other cameras.
self.buffer_manager = None
def run(self):
if self.buffer_manager is None:
raise ValueError("This thread has not been binded to any buffer manager yet")
while True:
self.stop_mutex.lock()
if self.stopped:
self.stopped = False
self.stop_mutex.unlock()
break
self.stop_mutex.unlock()
# save capture time
self.processing_time = self.clock.elapsed()
# start timer (used to calculate capture rate)
self.clock.start()
# synchronize with other streams (if enabled for this stream)
self.buffer_manager.sync(self.device_id)
if not self.cap.grab():
continue
# retrieve frame and add it to buffer
_, frame = self.cap.retrieve()
img_frame = ImageFrame(self.clock.msecsSinceStartOfDay(), frame)
self.buffer_manager.get_device(self.device_id).add(img_frame, self.drop_if_full)
# update statistics
self.update_fps(self.processing_time)
self.stat_data.frames_processed_count += 1
# inform GUI of updated statistics
self.update_statistics_gui.emit(self.stat_data)
qDebug("Stopping capture thread...")
def connect_camera(self):
self.cap.open(self.device_id)
# return false if failed to open camera
if not self.cap.isOpened():
qDebug("Cannot open camera {}".format(self.device_id))
return False
else:
# try to set camera resolution
if self.resolution is not None:
width, height = self.resolution
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# some camera may become closed if the resolution is not supported
if not self.cap.isOpened():
qDebug("Resolution not supported by camera device: {}".format(self.resolution))
return False
return True
def disconnect_camera(self):
# disconnect camera if it's already opened.
if self.cap.isOpened():
self.cap.release()
return True
# else do nothing and return
else:
return False
def is_camera_connected(self):
return self.cap.isOpened()

View File

@@ -0,0 +1,105 @@
import os
import numpy as np
import cv2
from . import param_settings as settings
class FisheyeCameraModel(object):
"""
Fisheye camera model, for undistorting, projecting and flipping camera frames.
"""
def __init__(self, camera_param_file, camera_name):
if not os.path.isfile(camera_param_file):
raise ValueError("Cannot find camera param file")
if camera_name not in settings.camera_names:
raise ValueError("Unknown camera name: {}".format(camera_name))
self.camera_file = camera_param_file
self.camera_name = camera_name
self.scale_xy = (1.0, 1.0)
self.shift_xy = (0, 0)
self.undistort_maps = None
self.project_matrix = None
self.project_shape = settings.project_shapes[self.camera_name]
self.load_camera_params()
def load_camera_params(self):
fs = cv2.FileStorage(self.camera_file, cv2.FILE_STORAGE_READ)
self.camera_matrix = fs.getNode("camera_matrix").mat()
self.dist_coeffs = fs.getNode("dist_coeffs").mat()
self.resolution = fs.getNode("resolution").mat().flatten()
scale_xy = fs.getNode("scale_xy").mat()
if scale_xy is not None:
self.scale_xy = scale_xy
shift_xy = fs.getNode("shift_xy").mat()
if shift_xy is not None:
self.shift_xy = shift_xy
project_matrix = fs.getNode("project_matrix").mat()
if project_matrix is not None:
self.project_matrix = project_matrix
fs.release()
self.update_undistort_maps()
def update_undistort_maps(self):
new_matrix = self.camera_matrix.copy()
new_matrix[0, 0] *= self.scale_xy[0]
new_matrix[1, 1] *= self.scale_xy[1]
new_matrix[0, 2] += self.shift_xy[0]
new_matrix[1, 2] += self.shift_xy[1]
width, height = self.resolution
self.undistort_maps = cv2.fisheye.initUndistortRectifyMap(
self.camera_matrix,
self.dist_coeffs,
np.eye(3),
new_matrix,
(width, height),
cv2.CV_16SC2
)
return self
def set_scale_and_shift(self, scale_xy=(1.0, 1.0), shift_xy=(0, 0)):
self.scale_xy = scale_xy
self.shift_xy = shift_xy
self.update_undistort_maps()
return self
def undistort(self, image):
result = cv2.remap(image, *self.undistort_maps, interpolation=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT)
return result
def project(self, image):
result = cv2.warpPerspective(image, self.project_matrix, self.project_shape)
return result
def flip(self, image):
if self.camera_name == "front":
return image.copy()
elif self.camera_name == "back":
return image.copy()[::-1, ::-1, :]
elif self.camera_name == "left":
return cv2.transpose(image)[::-1]
else:
return np.flip(cv2.transpose(image), 1)
def save_data(self):
fs = cv2.FileStorage(self.camera_file, cv2.FILE_STORAGE_WRITE)
fs.write("camera_matrix", self.camera_matrix)
fs.write("dist_coeffs", self.dist_coeffs)
fs.write("resolution", self.resolution)
fs.write("project_matrix", self.project_matrix)
fs.write("scale_xy", np.float32(self.scale_xy))
fs.write("shift_xy", np.float32(self.shift_xy))
fs.release()

View File

@@ -0,0 +1,161 @@
from PyQt5.QtCore import QSemaphore, QMutex
from PyQt5.QtCore import QMutexLocker, QWaitCondition
from queue import Queue
class Buffer(object):
def __init__(self, buffer_size=5):
self.buffer_size = buffer_size
self.free_slots = QSemaphore(self.buffer_size)
self.used_slots = QSemaphore(0)
self.clear_buffer_add = QSemaphore(1)
self.clear_buffer_get = QSemaphore(1)
self.queue_mutex = QMutex()
self.queue = Queue(self.buffer_size)
def add(self, data, drop_if_full=False):
self.clear_buffer_add.acquire()
if drop_if_full:
if self.free_slots.tryAcquire():
self.queue_mutex.lock()
self.queue.put(data)
self.queue_mutex.unlock()
self.used_slots.release()
else:
self.free_slots.acquire()
self.queue_mutex.lock()
self.queue.put(data)
self.queue_mutex.unlock()
self.used_slots.release()
self.clear_buffer_add.release()
def get(self):
# acquire semaphores
self.clear_buffer_get.acquire()
self.used_slots.acquire()
self.queue_mutex.lock()
data = self.queue.get()
self.queue_mutex.unlock()
# release semaphores
self.free_slots.release()
self.clear_buffer_get.release()
# return item to caller
return data
def clear(self):
# check if buffer contains items
if self.queue.qsize() > 0:
# stop adding items to buffer (will return false if an item is currently being added to the buffer)
if self.clear_buffer_add.tryAcquire():
# stop taking items from buffer (will return false if an item is currently being taken from the buffer)
if self.clear_buffer_get.tryAcquire():
# release all remaining slots in queue
self.free_slots.release(self.queue.qsize())
# acquire all queue slots
self.free_slots.acquire(self.buffer_size)
# reset used_slots to zero
self.used_slots.acquire(self.queue.qsize())
# clear buffer
for _ in range(self.queue.qsize()):
self.queue.get()
# release all slots
self.free_slots.release(self.buffer_size)
# allow get method to resume
self.clear_buffer_get.release()
else:
return False
# allow add method to resume
self.clear_buffer_add.release()
return True
else:
return False
else:
return False
def size(self):
return self.queue.qsize()
def maxsize(self):
return self.buffer_size
def isfull(self):
return self.queue.qsize() == self.buffer_size
def isempty(self):
return self.queue.qsize() == 0
class MultiBufferManager(object):
"""
Class for synchronizing capture threads from different cameras.
"""
def __init__(self, do_sync=True):
self.sync_devices = set()
self.do_sync = do_sync
self.wc = QWaitCondition()
self.mutex = QMutex()
self.arrived = 0
self.buffer_maps = dict()
def bind_thread(self, thread, buffer_size, sync=True):
self.create_buffer_for_device(thread.device_id, buffer_size, sync)
thread.buffer_manager = self
def create_buffer_for_device(self, device_id, buffer_size, sync=True):
if sync:
with QMutexLocker(self.mutex):
self.sync_devices.add(device_id)
self.buffer_maps[device_id] = Buffer(buffer_size)
def get_device(self, device_id):
return self.buffer_maps[device_id]
def remove_device(self, device_id):
self.buffer_maps.pop(device_id)
with QMutexLocker(self.mutex):
if device_id in self.sync_devices:
self.sync_devices.remove(device_id)
self.wc.wakeAll()
def sync(self, device_id):
# only perform sync if enabled for specified device/stream
self.mutex.lock()
if device_id in self.sync_devices:
# increment arrived count
self.arrived += 1
# we are the last to arrive: wake all waiting threads
if self.do_sync and self.arrived == len(self.sync_devices):
self.wc.wakeAll()
# still waiting for other streams to arrive: wait
else:
self.wc.wait(self.mutex)
# decrement arrived count
self.arrived -= 1
self.mutex.unlock()
def wake_all(self):
with QMutexLocker(self.mutex):
self.wc.wakeAll()
def set_sync(self, enable):
self.do_sync = enable
def sync_enabled(self):
return self.do_sync
def sync_enabled_for_device(self, device_id):
return device_id in self.sync_devices
def __contains__(self, device_id):
return device_id in self.buffer_maps
def __str__(self):
return (self.__class__.__name__ + ":\n" + \
"sync: {}\n".format(self.do_sync) + \
"devices: {}\n".format(tuple(self.buffer_maps.keys())) + \
"sync enabled devices: {}".format(self.sync_devices))

View File

@@ -0,0 +1,63 @@
import os
import cv2
camera_names = ["front", "back", "left", "right"]
# --------------------------------------------------------------------
# (shift_width, shift_height): how far away the birdview looks outside
# of the calibration pattern in horizontal and vertical directions
shift_w = 300
shift_h = 300
# size of the gap between the calibration pattern and the car
# in horizontal and vertical directions
inn_shift_w = 20
inn_shift_h = 50
# total width/height of the stitched image
total_w = 600 + 2 * shift_w
total_h = 1000 + 2 * shift_h
# four corners of the rectangular region occupied by the car
# top-left (x_left, y_top), bottom-right (x_right, y_bottom)
xl = shift_w + 180 + inn_shift_w
xr = total_w - xl
yt = shift_h + 200 + inn_shift_h
yb = total_h - yt
# --------------------------------------------------------------------
project_shapes = {
"front": (total_w, yt),
"back": (total_w, yt),
"left": (total_h, xl),
"right": (total_h, xl)
}
# pixel locations of the four points to be chosen.
# you must click these pixels in the same order when running
# the get_projection_map.py script
project_keypoints = {
"front": [(shift_w + 120, shift_h),
(shift_w + 480, shift_h),
(shift_w + 120, shift_h + 160),
(shift_w + 480, shift_h + 160)],
"back": [(shift_w + 120, shift_h),
(shift_w + 480, shift_h),
(shift_w + 120, shift_h + 160),
(shift_w + 480, shift_h + 160)],
"left": [(shift_h + 280, shift_w),
(shift_h + 840, shift_w),
(shift_h + 280, shift_w + 160),
(shift_h + 840, shift_w + 160)],
"right": [(shift_h + 160, shift_w),
(shift_h + 720, shift_w),
(shift_h + 160, shift_w + 160),
(shift_h + 720, shift_w + 160)]
}
car_image = cv2.imread(os.path.join(os.getcwd(), "images", "car.png"))
car_image = cv2.resize(car_image, (xr - xl, yb - yt))

View File

@@ -0,0 +1,62 @@
import cv2
from PyQt5.QtCore import qDebug, QMutex
from .base_thread import BaseThread
class CameraProcessingThread(BaseThread):
"""
Thread for processing individual camera images, i.e. undistort, project and flip.
"""
def __init__(self,
capture_buffer_manager,
device_id,
camera_model,
drop_if_full=True,
parent=None):
"""
capture_buffer_manager: an instance of the `MultiBufferManager` object.
device_id: device number of the camera to be processed.
camera_model: an instance of the `FisheyeCameraModel` object.
drop_if_full: drop if the buffer is full.
"""
super(CameraProcessingThread, self).__init__(parent)
self.capture_buffer_manager = capture_buffer_manager
self.device_id = device_id
self.camera_model = camera_model
self.drop_if_full = drop_if_full
# an instance of the `ProjectedImageBuffer` object
self.proc_buffer_manager = None
def run(self):
if self.proc_buffer_manager is None:
raise ValueError("This thread has not been binded to any processing thread yet")
while True:
self.stop_mutex.lock()
if self.stopped:
self.stopped = False
self.stop_mutex.unlock()
break
self.stop_mutex.unlock()
self.processing_time = self.clock.elapsed()
self.clock.start()
self.processing_mutex.lock()
raw_frame = self.capture_buffer_manager.get_device(self.device_id).get()
und_frame = self.camera_model.undistort(raw_frame.image)
pro_frame = self.camera_model.project(und_frame)
flip_frame = self.camera_model.flip(pro_frame)
self.processing_mutex.unlock()
self.proc_buffer_manager.sync(self.device_id)
self.proc_buffer_manager.set_frame_for_device(self.device_id, flip_frame)
# update statistics
self.update_fps(self.processing_time)
self.stat_data.frames_processed_count += 1
# inform GUI of updated statistics
self.update_statistics_gui.emit(self.stat_data)

131
surround_view/simple_gui.py Normal file
View File

@@ -0,0 +1,131 @@
import cv2
import numpy as np
# return -1 if user press 'q'. return 1 if user press 'Enter'.
def display_image(window_title, image):
cv2.imshow(window_title, image)
while True:
click = cv2.getWindowProperty(window_title, cv2.WND_PROP_AUTOSIZE)
if click < 0:
return -1
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
return -1
# 'Enter' key is detected!
if key == 13:
return 1
class PointSelector(object):
"""
---------------------------------------------------
| A simple gui point selector. |
| Usage: |
| |
| 1. call the `loop` method to show the image. |
| 2. click on the image to select key points, |
| press `d` to delete the last points. |
| 3. press `q` to quit, press `Enter` to confirm. |
---------------------------------------------------
"""
POINT_COLOR = (0, 0, 255)
FILL_COLOR = (0, 255, 255)
def __init__(self, image, title="PointSelector"):
self.image = image
self.title = title
self.keypoints = []
def draw_image(self):
"""
Display the selected keypoints and draw the convex hull.
"""
# the trick: draw on another new image
new_image = self.image.copy()
# draw the selected keypoints
for i, pt in enumerate(self.keypoints):
cv2.circle(new_image, pt, 6, self.POINT_COLOR, -1)
cv2.putText(new_image, str(i), (pt[0], pt[1] - 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.POINT_COLOR, 2)
# draw a line if there are two points
if len(self.keypoints) == 2:
p1, p2 = self.keypoints
cv2.line(new_image, p1, p2, self.POINT_COLOR, 2)
# draw the convex hull if there are more than two points
if len(self.keypoints) > 2:
mask = self.create_mask_from_pixels(self.keypoints,
self.image.shape)
new_image = self.draw_mask_on_image(new_image, mask)
cv2.imshow(self.title, new_image)
def onclick(self, event, x, y, flags, param):
"""
Click on a point (x, y) will add this points to the list
and re-draw the image.
"""
if event == cv2.EVENT_LBUTTONDOWN:
print("click ({}, {})".format(x, y))
self.keypoints.append((x, y))
self.draw_image()
def loop(self):
"""
Press "q" will exist the gui and return False
press "d" will delete the last selected point.
Press "Enter" will exist the gui and return True.
"""
cv2.namedWindow(self.title)
cv2.setMouseCallback(self.title, self.onclick, param=())
cv2.imshow(self.title, self.image)
while True:
click = cv2.getWindowProperty(self.title, cv2.WND_PROP_AUTOSIZE)
if click < 0:
return False
key = cv2.waitKey(1) & 0xFF
# press q to return False
if key == ord("q"):
return False
# press d to delete the last point
if key == ord("d"):
if len(self.keypoints) > 0:
x, y = self.keypoints.pop()
print("Delete ({}, {})".format(x, y))
self.draw_image()
# press Enter to confirm
if key == 13:
return True
def create_mask_from_pixels(self, pixels, image_shape):
"""
Create mask from the convex hull of a list of pixels.
"""
pixels = np.int32(pixels).reshape(-1, 2)
hull = cv2.convexHull(pixels)
mask = np.zeros(image_shape[:2], np.int8)
cv2.fillConvexPoly(mask, hull, 1, lineType=8, shift=0)
mask = mask.astype(bool)
return mask
def draw_mask_on_image(self, image, mask):
"""
Paint the region defined by a given mask on an image.
"""
new_image = np.zeros_like(image)
new_image[:, :] = self.FILL_COLOR
mask = np.array(mask, dtype=np.uint8)
new_mask = cv2.bitwise_and(new_image, new_image, mask=mask)
cv2.addWeighted(image, 1.0, new_mask, 0.5, 0.0, image)
return image

View File

@@ -0,0 +1,12 @@
class ImageFrame(object):
def __init__(self, timestamp, image):
self.timestamp = timestamp
self.image = image
class ThreadStatisticsData(object):
def __init__(self):
self.average_fps = 0
self.frames_processed_count = 0

137
surround_view/utils.py Normal file
View File

@@ -0,0 +1,137 @@
import cv2
import numpy as np
def gstreamer_pipeline(cam_id=0,
capture_width=1920,
capture_height=1080,
framerate=30,
format="YUYV",
):
"""
Use libgstreamer to open csi-cameras.
"""
return (
f"v4l2src device=/dev/video{cam_id} ! "
f"video/x-raw,format={format},width={capture_width},height={capture_height},framerate={framerate}/1 ! "
"videoconvert ! "
"video/x-raw,format=YUYV ! " # 转为 OpenCV 能直接用的 BGR 格式
"appsink"
)
def convert_binary_to_bool(mask):
"""
Convert a binary image (only one channel and pixels are 0 or 255) to
a bool one (all pixels are 0 or 1).
"""
return (mask.astype(np.float) / 255.0).astype(int)
def adjust_luminance(gray, factor):
"""
Adjust the luminance of a grayscale image by a factor.
"""
return np.minimum((gray * factor), 255).astype(np.uint8)
def get_mean_statistisc(gray, mask):
"""
Get the total values of a gray image in a region defined by a mask matrix.
The mask matrix must have values either 0 or 1.
"""
return np.sum(gray * mask)
def mean_luminance_ratio(grayA, grayB, mask):
return get_mean_statistisc(grayA, mask) / get_mean_statistisc(grayB, mask)
def get_mask(img):
"""
Convert an image to a mask array.
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
ret, mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY)
return mask
def get_overlap_region_mask(imA, imB):
"""
Given two images of the save size, get their overlapping region and
convert this region to a mask array.
"""
overlap = cv2.bitwise_and(imA, imB)
mask = get_mask(overlap)
mask = cv2.dilate(mask, np.ones((2, 2), np.uint8), iterations=2)
return mask
def get_outmost_polygon_boundary(img):
"""
Given a mask image with the mask describes the overlapping region of
two images, get the outmost contour of this region.
"""
mask = get_mask(img)
mask = cv2.dilate(mask, np.ones((2, 2), np.uint8), iterations=2)
cnts, hierarchy = cv2.findContours(
mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)[-2:]
# get the contour with largest aera
C = sorted(cnts, key=lambda x: cv2.contourArea(x), reverse=True)[0]
# polygon approximation
polygon = cv2.approxPolyDP(C, 0.009 * cv2.arcLength(C, True), True)
return polygon
def get_weight_mask_matrix(imA, imB, dist_threshold=5):
"""
Get the weight matrix G that combines two images imA, imB smoothly.
"""
overlapMask = get_overlap_region_mask(imA, imB)
overlapMaskInv = cv2.bitwise_not(overlapMask)
indices = np.where(overlapMask == 255)
imA_diff = cv2.bitwise_and(imA, imA, mask=overlapMaskInv)
imB_diff = cv2.bitwise_and(imB, imB, mask=overlapMaskInv)
G = get_mask(imA).astype(np.float32) / 255.0
polyA = get_outmost_polygon_boundary(imA_diff)
polyB = get_outmost_polygon_boundary(imB_diff)
for y, x in zip(*indices):
# opencv requires a tuple of ints
xy_tuple = tuple([int(x), int(y)])
distToB = cv2.pointPolygonTest(polyB, xy_tuple, True)
if distToB < dist_threshold:
distToA = cv2.pointPolygonTest(polyA, xy_tuple, True)
distToB *= distToB
distToA *= distToA
G[y, x] = distToB / (distToA + distToB)
return G, overlapMask
def make_white_balance(image):
"""
Adjust white balance of an image base on the means of its channels.
"""
B, G, R = cv2.split(image)
m1 = np.mean(B)
m2 = np.mean(G)
m3 = np.mean(R)
K = (m1 + m2 + m3) / 3
c1 = K / m1
c2 = K / m2
c3 = K / m3
B = adjust_luminance(B, c1)
G = adjust_luminance(G, c2)
R = adjust_luminance(R, c3)
return cv2.merge((B, G, R))