191 lines
6.2 KiB
Python
191 lines
6.2 KiB
Python
import cv2
|
||
import threading
|
||
import sys
|
||
from datetime import datetime
|
||
import os
|
||
import argparse
|
||
import numpy as np
|
||
|
||
# 全局变量
|
||
frame = None
|
||
running = True
|
||
which_camera = 0
|
||
W, H = 1920, 1080 # 相机分辨率
|
||
|
||
def load_camera_params(camera_name):
|
||
"""
|
||
从YAML文件加载相机参数
|
||
"""
|
||
yaml_file = os.path.join("yaml", f"{camera_name}.yaml")
|
||
|
||
if not os.path.exists(yaml_file):
|
||
raise FileNotFoundError(f"YAML file not found: {yaml_file}")
|
||
|
||
# 使用OpenCV读取YAML文件
|
||
fs = cv2.FileStorage(yaml_file, cv2.FILE_STORAGE_READ)
|
||
|
||
# 读取相机内参矩阵
|
||
camera_matrix = fs.getNode("camera_matrix").mat()
|
||
|
||
# 读取畸变系数
|
||
dist_coeffs = fs.getNode("dist_coeffs").mat()
|
||
|
||
# 读取投影矩阵
|
||
project_matrix = fs.getNode("project_matrix").mat()
|
||
|
||
# 读取缩放参数
|
||
scale_xy_node = fs.getNode("scale_xy")
|
||
if scale_xy_node.empty():
|
||
scale_xy = np.array([1.0, 1.0])
|
||
else:
|
||
scale_xy = scale_xy_node.mat().flatten()
|
||
|
||
# 读取偏移参数
|
||
shift_xy_node = fs.getNode("shift_xy")
|
||
if shift_xy_node.empty():
|
||
shift_xy = np.array([0.0, 0.0])
|
||
else:
|
||
shift_xy = shift_xy_node.mat().flatten()
|
||
|
||
fs.release()
|
||
|
||
return camera_matrix, dist_coeffs, project_matrix, scale_xy, shift_xy
|
||
|
||
def video_thread():
|
||
global frame, running
|
||
|
||
# 动态加载当前摄像头的参数
|
||
try:
|
||
K, D, front_proj_matrix, scale_xy, shift_xy = load_camera_params(args.i.lower())
|
||
print(f"[INFO] Loaded parameters for {args.i} camera")
|
||
except Exception as e:
|
||
print(f"[ERROR] Failed to load camera parameters: {e}", file=sys.stderr)
|
||
running = False
|
||
return
|
||
|
||
cap = cv2.VideoCapture(which_camera, cv2.CAP_ANY)
|
||
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
|
||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
|
||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
|
||
|
||
if not cap.isOpened():
|
||
print("[ERROR] Cannot open camera", file=sys.stderr)
|
||
running = False
|
||
return
|
||
|
||
# 创建修改后的相机矩阵(包含缩放和平移)
|
||
modified_camera_matrix = K.copy()
|
||
modified_camera_matrix[0, 0] *= scale_xy[0] # fx *= scale_x
|
||
modified_camera_matrix[1, 1] *= scale_xy[1] # fy *= scale_y
|
||
modified_camera_matrix[0, 2] += shift_xy[0] # cx += shift_x
|
||
modified_camera_matrix[1, 2] += shift_xy[1] # cy += shift_y
|
||
|
||
# 鱼眼相机去畸变 合并缩放系数
|
||
map1, map2 = cv2.fisheye.initUndistortRectifyMap(K, D, np.eye(3), modified_camera_matrix, (W, H), cv2.CV_16SC2)
|
||
|
||
# 鱼眼相机仅畸变
|
||
map3, map4 = cv2.fisheye.initUndistortRectifyMap(K, D, np.eye(3), K, (W, H), cv2.CV_16SC2)
|
||
|
||
while running:
|
||
ret, f = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 图像去畸变
|
||
undistorted = cv2.remap(f, map1, map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
|
||
undistorted2 = cv2.remap(f, map3, map4, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
|
||
|
||
proj_image = cv2.warpPerspective(
|
||
undistorted,
|
||
front_proj_matrix,
|
||
(W, H), # 输出大小 (与原始分辨率一致)
|
||
borderMode=cv2.BORDER_CONSTANT,
|
||
borderValue=(0, 0, 0)
|
||
)
|
||
|
||
frame = f.copy()
|
||
birdseye_small = cv2.resize(f, (W//2, H//2))
|
||
undistorted2_small = cv2.resize(undistorted2, (W//2, H//2))
|
||
|
||
# 拼接原视频和抗畸变后的视频 (左右显示)
|
||
comparison = np.hstack((birdseye_small, undistorted2_small))
|
||
show_video = np.vstack((comparison, proj_image))
|
||
|
||
# 设置视频流全屏显示
|
||
text_info = f"Camera: {args.i.upper()} | Press 'q' to quit, 's' to screenshot"
|
||
cv2.putText(show_video, text_info, (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv2.LINE_AA)
|
||
|
||
cv2.namedWindow('Video old vs new', cv2.WND_PROP_FULLSCREEN)
|
||
cv2.setWindowProperty('Video old vs new', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
||
cv2.imshow('Video old vs new', show_video)
|
||
|
||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||
running = False
|
||
break
|
||
|
||
cap.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
def input_thread():
|
||
global running
|
||
print("SSH命令: 's' = 截图, 'q' = 退出")
|
||
while running:
|
||
try:
|
||
cmd = input().strip().lower()
|
||
if cmd == 's':
|
||
if frame is not None:
|
||
filename = f"./images/{args.i.lower()}.png"
|
||
cv2.imwrite(filename, frame)
|
||
print(f"[SSH] Saved: {os.path.abspath(filename)}")
|
||
else:
|
||
print("[SSH] No frame available yet.")
|
||
elif cmd == 'q':
|
||
running = False
|
||
break
|
||
else:
|
||
print("[SSH] Unknown command. Use 's' or 'q'.")
|
||
except EOFError:
|
||
break
|
||
|
||
if __name__ == "__main__":
|
||
# 获取用户参数启动
|
||
parser = argparse.ArgumentParser(description="Camera Parameter Loading Tool")
|
||
|
||
# 获取用户输入的摄像头方位 (front back left right)
|
||
parser.add_argument("--i", type=str, required=True,
|
||
choices=["front", "back", "left", "right"],
|
||
help="Camera direction (front/back/left/right)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
print("相机方位:", args.i)
|
||
|
||
if args.i == "front":
|
||
which_camera = 0
|
||
elif args.i == "back":
|
||
which_camera = 2
|
||
elif args.i == "left":
|
||
which_camera = 1
|
||
elif args.i == "right":
|
||
which_camera = 3
|
||
else:
|
||
print("[ERROR] Invalid camera direction. Use 'front', 'back', 'left', or 'right'.", file=sys.stderr)
|
||
running = False
|
||
exit(1)
|
||
|
||
# 检查YAML目录是否存在
|
||
yaml_dir = "yaml"
|
||
if not os.path.exists(yaml_dir):
|
||
print(f"[ERROR] YAML directory not found: {yaml_dir}", file=sys.stderr)
|
||
print("Please ensure YAML files are in the 'yaml' directory", file=sys.stderr)
|
||
exit(1)
|
||
|
||
# 启动视频线程
|
||
vt = threading.Thread(target=video_thread, daemon=True)
|
||
vt.start()
|
||
|
||
# 主线程监听 SSH 输入
|
||
input_thread()
|
||
|
||
print("[INFO] Exiting...") |