Files
ADAS360/run_calibrate_camera.py
2025-10-28 18:46:04 +08:00

355 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import os
import numpy as np
import cv2
from surround_view import CaptureThread, MultiBufferManager
import surround_view.utils as utils
import re
# 保存相机参数文件的目录
TARGET_DIR = os.path.join(os.getcwd(), "yaml")
# 默认参数文件路径
DEFAULT_PARAM_FILE = os.path.join(TARGET_DIR, "camera_params.yaml")
def is_rtsp_url(input_str):
"""检查输入是否为RTSP URL"""
return input_str.startswith('rtsp://')
def main():
parser = argparse.ArgumentParser()
# 输入视频流可以是相机设备索引或RTSP URL
parser.add_argument("-i", "--input", type=str, default="0",
help="输入相机设备索引或RTSP URL")
# 棋盘格图案尺寸
parser.add_argument("-grid", "--grid", default="9x6",
help="标定棋盘格的尺寸")
parser.add_argument("-r", "--resolution", default="640x480",
help="相机图像的分辨率")
parser.add_argument("-framestep", type=int, default=20,
help="视频中每隔n帧使用一次")
parser.add_argument("-o", "--output", default=DEFAULT_PARAM_FILE,
help="输出yaml文件的路径")
parser.add_argument("-fisheye", "--fisheye", action="store_true",
help="如果是鱼眼相机则设置为true")
parser.add_argument("-flip", "--flip", default=0, type=int,
help="相机的翻转方式")
parser.add_argument("--no_gst", action="store_true",
help="如果不使用gstreamer捕获相机则设置为true")
args = parser.parse_args()
if not os.path.exists(TARGET_DIR):
os.mkdir(TARGET_DIR)
text1 = "按c键进行标定"
text2 = "按q键退出"
text3 = "设备: {}".format(args.input)
font = cv2.FONT_HERSHEY_SIMPLEX
fontscale = 0.6
resolution_str = args.resolution.split("x")
target_W = int(resolution_str[0])
target_H = int(resolution_str[1])
grid_size = tuple(int(x) for x in args.grid.split("x"))
grid_points = np.zeros((1, np.prod(grid_size), 3), np.float32)
grid_points[0, :, :2] = np.indices(grid_size).T.reshape(-1, 2)
objpoints = [] # 真实世界中的3D点
imgpoints = [] # 图像平面中的2D点
# 检查输入是RTSP URL还是设备索引
if is_rtsp_url(args.input):
rtsp_url = args.input
# 初始化VideoCapture强制使用FFmpeg后端
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
# 配置H.265解码参数(关键修改)
try:
# 基础参数使用TCP传输避免丢包
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "rtsp_transport", "tcp")
# 明确指定H.265编码hevc是H.265的标准编码名)
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "vcodec", "hevc")
# 低延迟配置
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "flags", "low_delay")
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "fflags", "nobuffer")
# 设置缓冲区大小为1减少延迟
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# 设置超时参数,避免无限等待
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "stimeout", "5000000") # 5秒超时
except Exception as e:
print(f"警告: 设置FFmpeg参数时出错: {e}")
# 第一次连接失败:尝试简化参数(仅保留必要参数)
if not cap.isOpened():
print("首次RTSP连接失败尝试简化参数...")
cap.release()
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
try:
# 仅保留最关键的TCP传输参数
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "rtsp_transport", "tcp")
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "stimeout", "5000000")
except Exception as e:
print(f"简化参数设置失败: {e}")
# 最终检查连接状态
if not cap.isOpened():
print(f"无法打开RTSP流: {rtsp_url}")
print("可能的原因: 1. FFmpeg不支持H.265; 2. 相机认证失败; 3. 网络连接问题")
return
# 获取流的实际分辨率
actual_W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
W, H = actual_W, actual_H
print(f"检测到RTSP流分辨率: {W}x{H}")
# 尝试设置目标分辨率RTSP流可能不支持修改分辨率
if actual_W != target_W or actual_H != target_H:
print(f"尝试将分辨率设置为 {target_W}x{target_H}...")
cap.set(cv2.CAP_PROP_FRAME_WIDTH, target_W)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, target_H)
# 验证是否设置成功
new_W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
new_H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if new_W != target_W or new_H != target_H:
print(f"无法设置分辨率RTSP流可能为固定分辨率使用 {new_W}x{new_H}")
W, H = new_W, new_H
use_rtsp = True
else:
# 处理本地相机设备
device = int(args.input)
cap_thread = CaptureThread(device_id=device,
flip_method=args.flip,
resolution=(target_W, target_H),
use_gst=not args.no_gst,
)
buffer_manager = MultiBufferManager()
buffer_manager.bind_thread(cap_thread, buffer_size=8)
if cap_thread.connect_camera():
cap_thread.start()
use_rtsp = False
W, H = target_W, target_H
else:
print("无法打开本地相机设备")
return
quit = False
do_calib = False
i = -1
print("\n开始标定过程...")
print("'c'键开始标定需要≥12个有效的棋盘格帧")
print("'q'键退出")
# 创建可调整大小的窗口
cv2.namedWindow("corners", cv2.WINDOW_NORMAL)
while True:
i += 1
# 根据输入类型读取帧
if use_rtsp:
ret, img = cap.read()
if not ret:
print("读取RTSP帧失败重试...")
# 帧读取失败时重试连接
cap.release()
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
# 重新设置关键参数
try:
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "rtsp_transport", "tcp")
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "vcodec", "hevc")
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
except:
pass
continue
else:
img = buffer_manager.get_device(device).get().image
if img is None:
print("读取本地相机帧失败")
break
# 每隔N帧处理一次
if i % args.framestep != 0:
continue
print(f"在第{i}帧中搜索棋盘格角点...")
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 检测棋盘格角点(增强鲁棒性参数)
found, corners = cv2.findChessboardCorners(
gray,
grid_size,
cv2.CALIB_CB_ADAPTIVE_THRESH +
cv2.CALIB_CB_NORMALIZE_IMAGE +
cv2.CALIB_CB_FILTER_QUADS +
cv2.CALIB_CB_FAST_CHECK # 快速校验,减少误检
)
if found:
# 亚像素级角点优化
term = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_COUNT, 30, 0.01)
cv2.cornerSubPix(gray, corners, (5, 5), (-1, -1), term)
print("✓ 检测到角点")
imgpoints.append(corners)
objpoints.append(grid_points)
# 绘制角点到图像
cv2.drawChessboardCorners(img, grid_size, corners, found)
else:
print("✗ 未找到角点")
# 绘制状态文本
cv2.putText(img, text1, (20, 70), font, fontscale, (255, 200, 0), 2)
cv2.putText(img, text2, (20, 110), font, fontscale, (255, 200, 0), 2)
cv2.putText(img, text3, (20, 30), font, fontscale, (255, 200, 0), 2)
cv2.putText(img, f"分辨率: {img.shape[1]}x{img.shape[0]} | 有效帧数: {len(objpoints)}",
(20, 150), font, fontscale, (255, 200, 0), 2)
# 自动缩放过大图像
max_display_width = 1280
max_display_height = 720
scale = min(max_display_width / img.shape[1], max_display_height / img.shape[0])
if scale < 1:
img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale)))
cv2.imshow("corners", img)
key = cv2.waitKey(1) & 0xFF
if key == ord("c"):
print("\n=== 开始标定 ===")
N_OK = len(objpoints)
if N_OK < 12:
print(f"错误: 只有{N_OK}个有效帧需要≥12个。继续采集...")
else:
do_calib = True
break
elif key == ord("q"):
quit = True
break
# 资源清理
if use_rtsp:
cap.release()
else:
cap_thread.stop()
cap_thread.disconnect_camera()
cv2.destroyAllWindows()
if quit:
print("用户退出程序")
return
if do_calib:
N_OK = len(objpoints)
K = np.zeros((3, 3)) # 相机内参矩阵
D = np.zeros((4, 1)) # 畸变系数
rvecs = [np.zeros((1, 1, 3), dtype=np.float64) for _ in range(N_OK)] # 旋转向量
tvecs = [np.zeros((1, 1, 3), dtype=np.float64) for _ in range(N_OK)] # 平移向量
# 鱼眼标定 flags
calibration_flags = (cv2.fisheye.CALIB_RECOMPUTE_EXTRINSIC +
# cv2.fisheye.CALIB_CHECK_COND +
cv2.fisheye.CALIB_FIX_SKEW)
if args.fisheye:
# 鱼眼相机标定
ret, mtx, dist, rvecs, tvecs = cv2.fisheye.calibrate(
objpoints,
imgpoints,
(W, H),
K,
D,
rvecs,
tvecs,
calibration_flags,
(cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 1e-6)
)
else:
# 普通相机标定
ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(
objpoints,
imgpoints,
(W, H),
None,
None)
if ret:
# 保存标定结果到YAML文件
fs = cv2.FileStorage(args.output, cv2.FILE_STORAGE_WRITE)
fs.write("resolution", np.int32([W, H]))
fs.write("camera_matrix", K)
fs.write("dist_coeffs", D)
fs.release()
print(f"✓ 标定成功!")
print(f" 保存至: {args.output}")
print(f" 使用的有效帧数: {N_OK}")
# 显示标定结果
print("\n=== 标定结果 ===")
print("相机内参矩阵 (K):")
print(np.round(K, 4))
print("\n畸变系数 (D):")
print(np.round(D.T, 6))
# 展示畸变校正前后对比
print("\n显示原始图像与校正后图像5秒...")
if use_rtsp:
# 重新打开RTSP流获取测试帧
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
try:
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "rtsp_transport", "tcp")
cap.set(cv2.CAP_PROP_FFMPEG_OPTION, "vcodec", "hevc")
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
except:
pass
ret, test_img = cap.read()
cap.release()
else:
# 从本地相机获取测试帧
test_img = buffer_manager.get_device(device).get().image
if test_img is not None:
if args.fisheye:
# 鱼眼图像校正
map1, map2 = cv2.fisheye.initUndistortRectifyMap(
K, D, np.eye(3), K, (W, H), cv2.CV_16SC2)
undistorted = cv2.remap(test_img, map1, map2,
interpolation=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT)
else:
# 普通图像校正
undistorted = cv2.undistort(test_img, K, D)
# 拼接对比图并显示
comparison = np.hstack((test_img, undistorted))
cv2.putText(comparison, "原始图像(左) vs 校正后图像(右)",
(10, 30), font, 0.7, (0, 255, 0), 2)
# 缩放对比图适配屏幕
scale = min(max_display_width / comparison.shape[1], max_display_height / comparison.shape[0])
if scale < 1:
comparison = cv2.resize(comparison,
(int(comparison.shape[1] * scale),
int(comparison.shape[0] * scale)))
cv2.imshow("标定结果: 原始图像 vs 校正后图像", comparison)
cv2.waitKey(5000) # 显示5秒
cv2.destroyAllWindows()
else:
print("✗ 标定失败! (检查棋盘格检测或帧质量)")
if __name__ == "__main__":
main()