Isaac ROS VSLAM: Visual Simultaneous Localization and Mapping
Visual SLAM (Simultaneous Localization and Mapping) is a critical capability for autonomous robots, enabling them to navigate unknown environments by building maps while simultaneously localizing themselves within those maps. This chapter covers the implementation of Visual SLAM using NVIDIA Isaac ROS packages.
Learning Objectives
- Understand Visual SLAM concepts and algorithms
- Implement GPU-accelerated VSLAM using Isaac ROS
- Configure and tune VSLAM parameters for robotics applications
- Integrate VSLAM with navigation and perception pipelines
- Evaluate VSLAM performance and accuracy
- Troubleshoot common VSLAM issues
Introduction to Visual SLAM
What is Visual SLAM?
Visual SLAM is a technique that allows a robot to estimate its position and orientation (pose) while simultaneously building a map of its environment using only visual sensors (cameras). This is achieved by tracking features across consecutive frames and triangulating their 3D positions.
Key Components of Visual SLAM
┌─────────────────────────────────────────────────────────────────┐
│ VSLAM Pipeline │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Camera │───▶│ Feature │───▶│ Pose Estimation │ │
│ │ Acquisition │ │ Detection │ │ & Tracking │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Image │ │ Feature │ │ Map Building & │ │
│ │ Processing │ │ Matching │ │ Optimization │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─── ──────────────────────────────────────────────────────────────┘
VSLAM vs. Traditional SLAM
| Aspect | Visual SLAM | Traditional SLAM (LiDAR) |
|---|---|---|
| Sensors | Cameras (passive) | LiDAR, sonar, IR (active) |
| Environmental Dependency | Light-dependent | Light-independent |
| Computational Cost | High (feature processing) | Moderate (range data) |
| Map Richness | Dense, textured maps | Sparse geometric maps |
| Accuracy | Good in textured environments | High precision |
| Robustness | Challenging in low-texture | Robust in various conditions |
Isaac ROS VSLAM Architecture
GPU-Accelerated Processing
Isaac ROS VSLAM leverages NVIDIA's GPU acceleration to provide real-time performance:
# Example Isaac ROS VSLAM node structure
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import Odometry
from visualization_msgs.msg import MarkerArray
from cv_bridge import CvBridge
import numpy as np
import cv2
class IsaacVSLAMNode(Node):
def __init__(self):
super().__init__('isaac_vslam_node')
# Initialize CV bridge
self.bridge = CvBridge()
# Publishers
self.odom_pub = self.create_publisher(Odometry, '/visual_odometry', 10)
self.pose_pub = self.create_publisher(PoseStamped, '/visual_slam/pose', 10)
self.map_pub = self.create_publisher(MarkerArray, '/visual_slam/map', 10)
# Subscribers
self.left_image_sub = self.create_subscription(
Image,
'/camera/left/image_rect_color',
self.left_image_callback,
10
)
self.right_image_sub = self.create_subscription(
Image,
'/camera/right/image_rect_color',
self.right_image_callback,
10
)
self.camera_info_sub = self.create_subscription(
CameraInfo,
'/camera/left/camera_info',
self.camera_info_callback,
10
)
# Internal state
self.camera_matrix = None
self.distortion_coeffs = None
self.prev_frame = None
self.prev_features = None
self.current_pose = np.eye(4) # 4x4 transformation matrix
self.map_points = [] # 3D points in the map
# Feature detector optimized for GPU
self.detector = cv2.cuda.SIFT_create() if cv2.cuda.getCudaEnabledDeviceCount() > 0 else cv2.SIFT_create()
self.get_logger().info('Isaac ROS VSLAM node initialized')
def camera_info_callback(self, msg):
"""Receive camera calibration information"""
if self.camera_matrix is None:
self.camera_matrix = np.array(msg.k).reshape(3, 3)
self.distortion_coeffs = np.array(msg.d)
self.get_logger().info('Camera calibration received')
def left_image_callback(self, msg):
"""Process left camera image for VSLAM"""
try:
# Convert ROS image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Process with GPU acceleration if available
if cv2.cuda.getCudaEnabledDeviceCount() > 0:
# Upload to GPU
gpu_frame = cv2.cuda_GpuMat()
gpu_frame.upload(cv_image)
# Detect features on GPU
keypoints_gpu, descriptors_gpu = self.detector.detectAndCompute(gpu_frame, None)
# Download results
keypoints = keypoints_gpu.download()
descriptors = descriptors_gpu.download() if descriptors_gpu is not None else None
else:
# CPU fallback
keypoints, descriptors = self.detector.detectAndCompute(cv_image, None)
# Track features and estimate pose
if self.prev_features is not None and descriptors is not None:
self.track_features_and_estimate_pose(
self.prev_features, descriptors, keypoints
)
# Store current frame for next iteration
self.prev_frame = cv_image.copy()
self.prev_features = descriptors.copy() if descriptors is not None else None
except Exception as e:
self.get_logger().error(f'Error processing left image: {e}')
def right_image_callback(self, msg):
"""Process right camera image for stereo processing"""
# Similar processing for right camera
pass
def track_features_and_estimate_pose(self, prev_desc, curr_desc, curr_kp):
"""Track features and estimate camera pose"""
if prev_desc is None or curr_desc is None or len(curr_kp) < 10:
return
# Match features
matcher = cv2.BFMatcher()
matches = matcher.knnMatch(prev_desc, curr_desc, k=2)
# Apply Lowe's ratio test
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.75 * n.distance:
good_matches.append(m)
if len(good_matches) >= 10:
# Extract matched keypoints
prev_pts = np.float32([
curr_kp[m.trainIdx].pt for m in good_matches
]).reshape(-1, 1, 2)
# Estimate essential matrix
if self.camera_matrix is not None:
E, mask = cv2.findEssentialMat(
prev_pts, prev_pts, # In real implementation, use previous frame points
self.camera_matrix,
method=cv2.RANSAC,
threshold=1.0,
prob=0.999
)
if E is not None:
# Recover pose
_, R, t, mask = cv2.recoverPose(E, prev_pts, prev_pts, self.camera_matrix)
# Update pose
delta_transform = np.eye(4)
delta_transform[:3, :3] = R
delta_transform[:3, 3] = t.flatten()
self.current_pose = self.current_pose @ np.linalg.inv(delta_transform)
# Publish estimated pose
self.publish_pose_estimate()
def publish_pose_estimate(self):
"""Publish current pose estimate"""
odom_msg = Odometry()
odom_msg.header.stamp = self.get_clock().now().to_msg()
odom_msg.header.frame_id = 'map'
odom_msg.child_frame_id = 'camera'
# Convert transformation matrix to pose
odom_msg.pose.pose.position.x = self.current_pose[0, 3]
odom_msg.pose.pose.position.y = self.current_pose[1, 3]
odom_msg.pose.pose.position.z = self.current_pose[2, 3]
# Convert rotation matrix to quaternion
qw, qx, qy, qz = self.rotation_matrix_to_quaternion(self.current_pose[:3, :3])
odom_msg.pose.pose.orientation.w = qw
odom_msg.pose.pose.orientation.x = qx
odom_msg.pose.pose.orientation.y = qy
odom_msg.pose.pose.orientation.z = qz
self.odom_pub.publish(odom_msg)
def rotation_matrix_to_quaternion(self, R):
"""Convert rotation matrix to quaternion"""
trace = np.trace(R)
if trace > 0:
s = np.sqrt(trace + 1.0) * 2
qw = 0.25 * s
qx = (R[2, 1] - R[1, 2]) / s
qy = (R[0, 2] - R[2, 0]) / s
qz = (R[1, 0] - R[0, 1]) / s
else:
if R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
s = np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) * 2
qw = (R[2, 1] - R[1, 2]) / s
qx = 0.25 * s
qy = (R[0, 1] + R[1, 0]) / s
qz = (R[0, 2] + R[2, 0]) / s
elif R[1, 1] > R[2, 2]:
s = np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) * 2
qw = (R[0, 2] - R[2, 0]) / s
qx = (R[0, 1] + R[1, 0]) / s
qy = 0.25 * s
qz = (R[1, 2] + R[2, 1]) / s
else:
s = np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) * 2
qw = (R[1, 0] - R[0, 1]) / s
qx = (R[0, 2] + R[2, 0]) / s
qy = (R[1, 2] + R[2, 1]) / s
qz = 0.25 * s
# Normalize quaternion
norm = np.sqrt(qw*qw + qx*qx + qy*qy + qz*qz)
return qw/norm, qx/norm, qy/norm, qz/norm
def main(args=None):
rclpy.init(args=args)
vslam_node = IsaacVSLAMNode()
rclpy.spin(vslam_node)
vslam_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Isaac ROS VSLAM Components
1. Isaac ROS Stereo Image Rectification
Stereo rectification is crucial for accurate depth estimation:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from stereo_msgs.msg import DisparityImage
from cv_bridge import CvBridge
import numpy as np
import cv2
class IsaacStereoRectificationNode(Node):
def __init__(self):
super().__init__('isaac_stereo_rectification')
self.bridge = CvBridge()
# Stereo camera calibration
self.left_camera_matrix = None
self.right_camera_matrix = None
self.left_distortion = None
self.right_distortion = None
self.rotation = None
self.translation = None
self.rectified_roi_left = None
self.rectified_roi_right = None
# Publishers for rectified images
self.left_rect_pub = self.create_publisher(Image, '/camera/left/image_rect_color', 10)
self.right_rect_pub = self.create_publisher(Image, '/camera/right/image_rect_color', 10)
# Subscribers
self.left_sub = self.create_subscription(
Image, '/camera/left/image_raw', self.left_image_callback, 10
)
self.right_sub = self.create_subscription(
Image, '/camera/right/image_raw', self.right_image_callback, 10
)
self.left_info_sub = self.create_subscription(
CameraInfo, '/camera/left/camera_info', self.left_info_callback, 10
)
self.right_info_sub = self.create_subscription(
CameraInfo, '/camera/right/camera_info', self.right_info_callback, 10
)
# Rectification maps
self.left_map1 = None
self.left_map2 = None
self.right_map1 = None
self.right_map2 = None
self.rectification_initialized = False
def left_info_callback(self, msg):
"""Process left camera calibration info"""
if self.left_camera_matrix is None:
self.left_camera_matrix = np.array(msg.k).reshape(3, 3)
self.left_distortion = np.array(msg.d)
self.image_width = msg.width
self.image_height = msg.height
def right_info_callback(self, msg):
"""Process right camera calibration info"""
if self.right_camera_matrix is None:
self.right_camera_matrix = np.array(msg.k).reshape(3, 3)
self.right_distortion = np.array(msg.d)
# Set baseline and other stereo parameters
# Extract from P matrix (projection matrix) in camera info
self.translation = np.array([-msg.p[3] / msg.p[0], # baseline
-msg.p[7] / msg.p[5],
-msg.p[11] / msg.p[10]])
def initialize_rectification(self):
"""Initialize stereo rectification parameters"""
if (self.left_camera_matrix is not None and
self.right_camera_matrix is not None and
not self.rectification_initialized):
# Calculate rectification parameters
R1, R2, P1, P2, Q, self.rectified_roi_left, self.rectified_roi_right = \
cv2.stereoRectify(
self.left_camera_matrix,
self.left_distortion,
self.right_camera_matrix,
self.right_distortion,
(self.image_width, self.image_height),
R=self.rotation, # Rotation between cameras
T=self.translation, # Translation between cameras
flags=cv2.CALIB_ZERO_DISPARITY,
alpha=0 # Crop to valid region
)
# Generate rectification maps
self.left_map1, self.left_map2 = cv2.initUndistortRectifyMap(
self.left_camera_matrix,
self.left_distortion,
R1,
P1,
(self.image_width, self.image_height),
cv2.CV_32FC1
)
self.right_map1, self.right_map2 = cv2.initUndistortRectifyMap(
self.right_camera_matrix,
self.right_distortion,
R2,
P2,
(self.image_width, self.image_height),
cv2.CV_32FC1
)
self.rectification_initialized = True
self.get_logger().info('Stereo rectification initialized')
def left_image_callback(self, msg):
"""Process left camera image"""
if not self.rectification_initialized:
self.initialize_rectification()
return
try:
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Apply rectification
rectified_image = cv2.remap(
cv_image,
self.left_map1,
self.left_map2,
interpolation=cv2.INTER_LINEAR
)
# Crop to valid region
x, y, w, h = self.rectified_roi_left
rectified_image = rectified_image[y:y+h, x:x+w]
# Publish rectified image
rect_msg = self.bridge.cv2_to_imgmsg(rectified_image, encoding='bgr8')
rect_msg.header = msg.header
self.left_rect_pub.publish(rect_msg)
except Exception as e:
self.get_logger().error(f'Error rectifying left image: {e}')
def right_image_callback(self, msg):
"""Process right camera image"""
if not self.rectification_initialized:
return
try:
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Apply rectification
rectified_image = cv2.remap(
cv_image,
self.right_map1,
self.right_map2,
interpolation=cv2.INTER_LINEAR
)
# Crop to valid region
x, y, w, h = self.rectified_roi_right
rectified_image = rectified_image[y:y+h, x:x+w]
# Publish rectified image
rect_msg = self.bridge.cv2_to_imgmsg(rectified_image, encoding='bgr8')
rect_msg.header = msg.header
self.right_rect_pub.publish(rect_msg)
except Exception as e:
self.get_logger().error(f'Error rectifying right image: {e}')
2. Isaac ROS Visual Odometry
Visual odometry estimates motion between consecutive frames:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import TwistWithCovarianceStamped
from nav_msgs.msg import Odometry
from cv_bridge import CvBridge
import numpy as np
import cv2
class IsaacVisualOdometryNode(Node):
def __init__(self):
super().__init__('isaac_visual_odometry')
self.bridge = CvBridge()
self.image_queue = []
self.max_queue_size = 2 # Store current and previous frame
# Publishers
self.odom_pub = self.create_publisher(Odometry, '/visual_odom', 10)
self.twist_pub = self.create_publisher(TwistWithCovarianceStamped, '/visual_twist', 10)
# Subscribers
self.image_sub = self.create_subscription(
Image, '/camera/image_rect_color', self.image_callback, 10
)
self.info_sub = self.create_subscription(
CameraInfo, '/camera/camera_info', self.info_callback, 10
)
# Camera parameters
self.camera_matrix = None
self.distortion_coeffs = None
# Feature tracking
self.detector = cv2.ORB_create(nfeatures=2000)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
# Motion estimation
self.prev_kp = None
self.prev_desc = None
self.current_pose = np.eye(4)
self.prev_timestamp = None
self.get_logger().info('Isaac Visual Odometry node initialized')
def info_callback(self, msg):
"""Receive camera calibration info"""
if self.camera_matrix is None:
self.camera_matrix = np.array(msg.k).reshape(3, 3)
self.distortion_coeffs = np.array(msg.d)
self.get_logger().info('Camera calibration received')
def image_callback(self, msg):
"""Process incoming camera images"""
try:
# Convert ROS image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Detect features
kp = self.detector.detect(cv_image)
kp, desc = self.detector.compute(cv_image, kp)
if self.prev_kp is not None and desc is not None and len(kp) > 10:
# Match features
matches = self.matcher.knnMatch(self.prev_desc, desc, k=2)
# Apply Lowe's ratio test
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.75 * n.distance:
good_matches.append(m)
if len(good_matches) >= 10:
# Extract matched points
prev_pts = np.float32([
self.prev_kp[m.queryIdx].pt for m in good_matches
]).reshape(-1, 1, 2)
curr_pts = np.float32([
kp[m.trainIdx].pt for m in good_matches
]).reshape(-1, 1, 2)
# Estimate motion using Essential Matrix
E, mask = cv2.findEssentialMat(
curr_pts, prev_pts,
self.camera_matrix,
method=cv2.RANSAC,
threshold=1.0,
prob=0.999
)
if E is not None:
# Recover pose
_, R, t, mask = cv2.recoverPose(
E, curr_pts, prev_pts, self.camera_matrix
)
# Calculate time delta for velocity estimation
current_time = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
if self.prev_timestamp is not None:
dt = current_time - self.prev_timestamp
else:
dt = 1.0 / 30.0 # Default to 30 FPS if no previous timestamp
# Update pose
delta_transform = np.eye(4)
delta_transform[:3, :3] = R
delta_transform[:3, 3] = t.flatten()
self.current_pose = self.current_pose @ np.linalg.inv(delta_transform)
# Calculate velocity (change in position over time)
velocity = t.flatten() / dt if dt > 0 else np.zeros(3)
# Publish results
self.publish_odometry(msg.header, velocity)
# Store current frame for next iteration
self.prev_kp = kp
self.prev_desc = desc
self.prev_timestamp = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
except Exception as e:
self.get_logger().error(f'Error in visual odometry: {e}')
def publish_odometry(self, header, velocity):
"""Publish odometry and twist information"""
# Publish odometry
odom_msg = Odometry()
odom_msg.header = header
odom_msg.header.frame_id = 'odom'
odom_msg.child_frame_id = 'base_link'
# Position from current pose
odom_msg.pose.pose.position.x = self.current_pose[0, 3]
odom_msg.pose.pose.position.y = self.current_pose[1, 3]
odom_msg.pose.pose.position.z = self.current_pose[2, 3]
# Orientation from rotation matrix
qw, qx, qy, qz = self.rotation_matrix_to_quaternion(self.current_pose[:3, :3])
odom_msg.pose.pose.orientation.w = qw
odom_msg.pose.pose.orientation.x = qx
odom_msg.pose.pose.orientation.y = qy
odom_msg.pose.pose.orientation.z = qz
# Velocity from calculated velocity
odom_msg.twist.twist.linear.x = velocity[0]
odom_msg.twist.twist.linear.y = velocity[1]
odom_msg.twist.twist.linear.z = velocity[2]
self.odom_pub.publish(odom_msg)
# Publish twist with covariance
twist_msg = TwistWithCovarianceStamped()
twist_msg.header = header
twist_msg.twist.twist.linear.x = velocity[0]
twist_msg.twist.twist.linear.y = velocity[1]
twist_msg.twist.twist.linear.z = velocity[2]
# Set covariance (diagonal values only for simplicity)
twist_msg.twist.covariance = [0.1, 0, 0, 0, 0, 0, # linear x
0, 0.1, 0, 0, 0, 0, # linear y
0, 0, 0.1, 0, 0, 0, # linear z
0, 0, 0, 0.1, 0, 0, # angular x
0, 0, 0, 0, 0.1, 0, # angular y
0, 0, 0, 0, 0, 0.1] # angular z
self.twist_pub.publish(twist_msg)
def rotation_matrix_to_quaternion(self, R):
"""Convert rotation matrix to quaternion"""
trace = np.trace(R)
if trace > 0:
s = np.sqrt(trace + 1.0) * 2
qw = 0.25 * s
qx = (R[2, 1] - R[1, 2]) / s
qy = (R[0, 2] - R[2, 0]) / s
qz = (R[1, 0] - R[0, 1]) / s
else:
if R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
s = np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) * 2
qw = (R[2, 1] - R[1, 2]) / s
qx = 0.25 * s
qy = (R[0, 1] + R[1, 0]) / s
qz = (R[0, 2] + R[2, 0]) / s
elif R[1, 1] > R[2, 2]:
s = np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) * 2
qw = (R[0, 2] - R[2, 0]) / s
qx = (R[0, 1] + R[1, 0]) / s
qy = 0.25 * s
qz = (R[1, 2] + R[2, 1]) / s
else:
s = np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) * 2
qw = (R[1, 0] - R[0, 1]) / s
qx = (R[0, 2] + R[2, 0]) / s
qy = (R[1, 2] + R[2, 1]) / s
qz = 0.25 * s
norm = np.sqrt(qw*qw + qx*qx + qy*qy + qz*qz)
return qw/norm, qx/norm, qy/norm, qz/norm
def main(args=None):
rclpy.init(args=args)
vo_node = IsaacVisualOdometryNode()
rclpy.spin(vo_node)
vo_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Feature Detection and Matching
GPU-Accelerated Feature Detection
import cv2
import numpy as np
class GPUFeatureDetector:
def __init__(self, use_cuda=True):
self.use_cuda = use_cuda and cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.use_cuda:
self.detector = cv2.cuda.SURF_CUDA(400)
self.extractor = cv2.cuda.SURF_CUDA(400)
else:
self.detector = cv2.SIFT_create()
self.matcher = cv2.BFMatcher() if not self.use_cuda else cv2.cuda.DescriptorMatcher_createBFMatcher()
def detect_and_compute(self, image):
"""Detect features and compute descriptors"""
if self.use_cuda:
# Upload image to GPU
gpu_image = cv2.cuda_GpuMat()
gpu_image.upload(image)
# Detect keypoints
keypoints_gpu = self.detector.detectAsync(gpu_image, None)
descriptors_gpu = self.detector.computeAsync(gpu_image, keypoints_gpu)
# Download results
keypoints = keypoints_gpu.download()
descriptors = descriptors_gpu.download() if descriptors_gpu is not None else None
return keypoints, descriptors
else:
return self.detector.detectAndCompute(image, None)
def match_features(self, desc1, desc2):
"""Match features between two descriptor sets"""
if self.use_cuda and desc1 is not None and desc2 is not None:
# Upload descriptors to GPU
gpu_desc1 = cv2.cuda_GpuMat()
gpu_desc2 = cv2.cuda_GpuMat()
gpu_desc1.upload(desc1)
gpu_desc2.upload(desc2)
# Perform matching on GPU
matches_gpu = self.matcher.match(gpu_desc1, gpu_desc2)
matches = matches_gpu.download()
return matches
else:
if desc1 is not None and desc2 is not None:
return self.matcher.match(desc1, desc2)
else:
return []
class IsaacFeatureTracker:
def __init__(self):
self.feature_detector = GPUFeatureDetector(use_cuda=True)
self.max_features = 2000
self.feature_buffer_size = 10 # Track features across multiple frames
# Feature tracking buffers
self.feature_tracks = [] # List of feature tracks
self.current_features = None
self.feature_ids = 0
def track_features(self, current_image, prev_image, prev_features):
"""Track features across frames"""
if prev_features is None:
# First frame - detect initial features
keypoints, descriptors = self.feature_detector.detect_and_compute(current_image)
return self.initialize_feature_tracks(keypoints, descriptors)
# Track existing features
tracked_features = self.match_and_track_features(
prev_features, self.current_features, current_image
)
# Update feature tracks
self.update_feature_tracks(tracked_features)
# Add new features if needed
if len(tracked_features) < self.max_features * 0.7:
new_features = self.detect_new_features(current_image, tracked_features)
tracked_features.extend(new_features)
self.current_features = tracked_features
return tracked_features
def initialize_feature_tracks(self, keypoints, descriptors):
"""Initialize feature tracks from initial detection"""
tracks = []
for i, (kp, desc) in enumerate(zip(keypoints, descriptors)):
track = {
'id': self.feature_ids,
'keypoints': [kp],
'descriptors': [desc],
'lifetime': 1,
'visibility': 1.0,
'position_3d': None # Will be computed when triangulated
}
tracks.append(track)
self.feature_ids += 1
return tracks
def match_and_track_features(self, prev_features, curr_desc, curr_image):
"""Match and track features between frames"""
if prev_features is None or curr_desc is None:
return []
# Extract previous descriptors
prev_descriptors = [track['descriptors'][-1] for track in prev_features if track['descriptors']]
if len(prev_descriptors) == 0:
return []
# Match features
matches = self.feature_detector.match_features(
np.vstack(prev_descriptors), curr_desc
)
# Filter matches based on distance ratio
good_matches = []
for match in matches:
if match.distance < 0.75:
good_matches.append(match)
# Create tracked feature list
tracked_features = []
matched_indices = set()
for match in good_matches:
prev_idx = match.queryIdx
curr_idx = match.trainIdx
if prev_idx < len(prev_features) and curr_idx < len(curr_desc):
# Extend existing track
track = prev_features[prev_idx].copy()
# Add new keypoint and descriptor
# Note: We would need to get the actual keypoint from the current image
tracked_features.append(track)
matched_indices.add(curr_idx)
return tracked_features
def update_feature_tracks(self, tracked_features):
"""Update feature track information"""
for track in tracked_features:
track['lifetime'] += 1
track['visibility'] = min(1.0, track['visibility'] + 0.1) # Increase visibility
# Remove old tracks that haven't been seen recently
self.feature_tracks = [
track for track in self.feature_tracks
if track['visibility'] > 0.1 or track['lifetime'] > 5
]
def detect_new_features(self, image, existing_features):
"""Detect new features to supplement tracking"""
# Detect new features in regions not covered by existing tracks
mask = np.ones(image.shape[:2], dtype=np.uint8) * 255
# Create exclusion zones around existing features
for feature in existing_features:
if feature['keypoints']:
kp = feature['keypoints'][-1]
x, y = int(kp.pt[0]), int(kp.pt[1])
cv2.circle(mask, (x, y), 20, 0, -1) # Mask out 20-pixel radius around feature
# Detect new features in unmasked regions
new_keypoints, new_descriptors = self.feature_detector.detect_and_compute(
cv2.bitwise_and(image, image, mask=mask)
)
new_features = []
for kp, desc in zip(new_keypoints, new_descriptors):
new_feature = {
'id': self.feature_ids,
'keypoints': [kp],
'descriptors': [desc],
'lifetime': 1,
'visibility': 1.0,
'position_3d': None
}
new_features.append(new_feature)
self.feature_ids += 1
return new_features
Map Building and Optimization
Bundle Adjustment for Map Optimization
import numpy as np
from scipy.optimize import least_squares
import cv2
class MapOptimizer:
def __init__(self):
self.keyframes = []
self.map_points = []
self.optimization_enabled = True
def add_keyframe(self, pose, features, descriptors):
"""Add a keyframe to the map"""
keyframe = {
'id': len(self.keyframes),
'pose': pose.copy(),
'features': features,
'descriptors': descriptors,
'timestamp': None
}
self.keyframes.append(keyframe)
def triangulate_points(self, keyframe1, keyframe2, matches):
"""Triangulate 3D points from stereo observations"""
if len(matches) < 8: # Need minimum 8 points for triangulation
return []
# Get matched points
pts1 = np.float32([keyframe1['features'][m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
pts2 = np.float32([keyframe2['features'][m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
# Get camera poses
pose1 = keyframe1['pose']
pose2 = keyframe2['pose']
# Get camera matrix (assuming calibrated camera)
camera_matrix = np.array([
[500, 0, 320],
[0, 500, 240],
[0, 0, 1]
])
# Triangulate points
projection_matrix1 = camera_matrix @ pose1[:3, :]
projection_matrix2 = camera_matrix @ pose2[:3, :]
points_4d = cv2.triangulatePoints(
projection_matrix1,
projection_matrix2,
pts1.reshape(-1, 2).T,
pts2.reshape(-1, 2).T
)
# Convert from homogeneous coordinates
points_3d = points_4d[:3] / points_4d[3]
# Filter out points behind the camera
valid_points = []
for i in range(points_3d.shape[1]):
if points_3d[2, i] > 0: # Point is in front of camera
point_3d = points_3d[:, i]
# Create map point
map_point = {
'id': len(self.map_points),
'coordinates': point_3d,
'observations': [
{'keyframe_id': keyframe1['id'], 'feature_idx': matches[i].queryIdx},
{'keyframe_id': keyframe2['id'], 'feature_idx': matches[i].trainIdx}
],
'descriptor': None # Will be set from one of the matched descriptors
}
valid_points.append(map_point)
return valid_points
def bundle_adjustment(self):
"""Perform bundle adjustment to optimize camera poses and map points"""
if len(self.keyframes) < 2 or len(self.map_points) < 10:
return # Not enough data for optimization
# Prepare optimization data
def reprojection_error(params):
"""Calculate reprojection error for bundle adjustment"""
num_poses = len(self.keyframes)
num_points = len(self.map_points)
# Extract poses and points from params
pose_params = params[:num_poses * 6].reshape(-1, 6) # [rx, ry, rz, tx, ty, tz]
point_params = params[num_poses * 6:].reshape(-1, 3) # [x, y, z]
errors = []
# For each observation
for point_idx, map_point in enumerate(self.map_points):
for obs in map_point['observations']:
keyframe_id = obs['keyframe_id']
feature_idx = obs['feature_idx']
if keyframe_id >= len(self.keyframes) or feature_idx >= len(self.keyframes[keyframe_id]['features']):
continue
# Get observed point
observed = self.keyframes[keyframe_id]['features'][feature_idx].pt
# Get camera pose (convert from compact form)
pose_compact = pose_params[keyframe_id]
R = self.compact_to_rotation_matrix(pose_compact[:3])
t = pose_compact[3:]
# Get 3D point
X = point_params[point_idx]
# Project 3D point to 2D
camera_matrix = np.array([
[500, 0, 320],
[0, 500, 240],
[0, 0, 1]
])
# Transform point to camera coordinates
X_cam = R @ X + t
if X_cam[2] <= 0: # Behind camera
continue
# Project to image plane
x_norm = X_cam[:2] / X_cam[2]
projected = camera_matrix[:2, :3] @ np.append(x_norm, 1)
# Calculate error
error = projected - np.array(observed)
errors.extend(error)
return np.array(errors)
# Initial parameters (poses and points)
initial_poses = []
for kf in self.keyframes:
# Convert 4x4 pose to compact form [rx, ry, rz, tx, ty, tz]
R = kf['pose'][:3, :3]
t = kf['pose'][:3, 3]
rvec, _ = cv2.Rodrigues(R)
pose_compact = np.hstack([rvec.flatten(), t])
initial_poses.append(pose_compact)
initial_points = [mp['coordinates'] for mp in self.map_points]
# Combine all parameters
initial_params = np.hstack([
np.array(initial_poses).flatten(),
np.array(initial_points).flatten()
])
# Perform optimization
if self.optimization_enabled:
try:
result = least_squares(reprojection_error, initial_params, method='lm')
# Extract optimized parameters
opt_params = result.x
num_poses = len(self.keyframes)
# Update keyframe poses
pose_params = opt_params[:num_poses * 6].reshape(-1, 6)
for i, pose_compact in enumerate(pose_params):
R = self.compact_to_rotation_matrix(pose_compact[:3])
t = pose_compact[3:]
# Update pose matrix
self.keyframes[i]['pose'][:3, :3] = R
self.keyframes[i]['pose'][:3, 3] = t
# Update map point coordinates
point_params = opt_params[num_poses * 6:].reshape(-1, 3)
for i, coords in enumerate(point_params):
if i < len(self.map_points):
self.map_points[i]['coordinates'] = coords
except Exception as e:
print(f"Bundle adjustment failed: {e}")
def compact_to_rotation_matrix(self, rvec):
"""Convert Rodrigues vector to rotation matrix"""
R, _ = cv2.Rodrigues(rvec)
return R
def optimize_map(self):
"""Main optimization function"""
self.bundle_adjustment()
# Additional optimizations can be added here
# - Loop closure detection and optimization
# - Map cleaning and outlier removal
# - Covisibility graph optimization
Performance Optimization
GPU-Accelerated Processing Pipeline
import cupy as cp # Use CuPy for GPU-accelerated NumPy operations
import numpy as np
import cv2
from threading import Thread, Lock
import queue
class GPUPipelineOptimizer:
def __init__(self):
self.gpu_available = cp.cuda.is_available()
self.processing_queue = queue.Queue(maxsize=10)
self.result_queue = queue.Queue(maxsize=10)
self.running = True
self.processing_thread = None
self.lock = Lock()
def start_processing_pipeline(self):
"""Start the GPU processing pipeline"""
self.processing_thread = Thread(target=self.processing_loop)
self.processing_thread.start()
def processing_loop(self):
"""Main processing loop running on GPU"""
while self.running:
try:
# Get input from queue
input_data = self.processing_queue.get(timeout=1.0)
if input_data is None:
continue
# Process on GPU
result = self.gpu_process_frame(input_data)
# Put result in output queue
self.result_queue.put(result)
except queue.Empty:
continue
except Exception as e:
print(f"GPU processing error: {e}")
def gpu_process_frame(self, frame_data):
"""Process a frame using GPU acceleration"""
if not self.gpu_available:
return self.cpu_process_frame(frame_data)
# Transfer data to GPU
with cp.cuda.Device(0): # Use GPU 0
gpu_frame = cp.asarray(frame_data['image'])
# Perform GPU-accelerated operations
# Feature detection
if 'detect_features' in frame_data:
features = self.gpu_detect_features(gpu_frame)
frame_data['features'] = features
# Descriptor computation
if 'compute_descriptors' in frame_data:
descriptors = self.gpu_compute_descriptors(gpu_frame, frame_data.get('keypoints'))
frame_data['descriptors'] = descriptors
# Matching
if 'match_features' in frame_data and 'prev_descriptors' in frame_data:
matches = self.gpu_match_features(
frame_data['descriptors'],
frame_data['prev_descriptors']
)
frame_data['matches'] = matches
# Pose estimation
if 'estimate_pose' in frame_data:
pose = self.gpu_estimate_pose(
frame_data.get('matches'),
frame_data.get('camera_matrix')
)
frame_data['pose'] = pose
# Transfer result back to CPU
result = {
'timestamp': frame_data['timestamp'],
'features': cp.asnumpy(frame_data['features']) if 'features' in frame_data else None,
'descriptors': cp.asnumpy(frame_data['descriptors']) if 'descriptors' in frame_data else None,
'matches': frame_data['matches'] if 'matches' in frame_data else None,
'pose': frame_data['pose'] if 'pose' in frame_data else None
}
return result
def gpu_detect_features(self, gpu_image):
"""GPU-accelerated feature detection"""
# In practice, this would use CUDA kernels or GPU-optimized libraries
# For demonstration, we'll use a simple approach
gray_gpu = gpu_image if len(gpu_image.shape) == 2 else gpu_image[:,:,0] # Convert to grayscale
# Apply Sobel operator for edge detection (GPU-accelerated)
sobel_x = cp.gradient(gray_gpu, axis=1)
sobel_y = cp.gradient(gray_gpu, axis=0)
magnitude = cp.sqrt(sobel_x**2 + sobel_y**2)
# Find local maxima as feature points
features = cp.argwhere(magnitude > cp.percentile(magnitude, 95)) # Top 5% strongest gradients
return features
def gpu_compute_descriptors(self, gpu_image, keypoints):
"""GPU-accelerated descriptor computation"""
# This is a simplified example
# In practice, you'd implement SIFT, SURF, or other descriptor computation on GPU
if keypoints is None:
return None
# Extract patches around keypoints and compute simple descriptors
patch_size = 16
descriptors = []
for pt in keypoints:
y, x = int(pt[0]), int(pt[1])
# Extract patch
if (y - patch_size//2 >= 0 and y + patch_size//2 < gpu_image.shape[0] and
x - patch_size//2 >= 0 and x + patch_size//2 < gpu_image.shape[1]):
patch = gpu_image[y - patch_size//2:y + patch_size//2,
x - patch_size//2:x + patch_size//2]
# Compute simple descriptor (mean, std, histogram, etc.)
desc = cp.concatenate([
cp.mean(patch).reshape(1),
cp.std(patch).reshape(1),
cp.histogram(patch.flatten(), bins=8)[0] # Simple histogram
])
descriptors.append(desc)
return cp.stack(descriptors) if descriptors else None
def gpu_match_features(self, desc1, desc2):
"""GPU-accelerated feature matching"""
if desc1 is None or desc2 is None:
return []
# Compute distances on GPU
desc1_expanded = desc1[:, cp.newaxis, :] # Shape: (N, 1, D)
desc2_expanded = desc2[cp.newaxis, :, :] # Shape: (1, M, D)
distances = cp.linalg.norm(desc1_expanded - desc2_expanded, axis=2) # Shape: (N, M)
# Find nearest neighbors
matches = []
for i in range(distances.shape[0]):
min_idx = cp.argmin(distances[i])
min_dist = distances[i, min_idx]
# Apply Lowe's ratio test conceptually
if min_dist < 0.75 * cp.partition(distances[i], 1)[1]: # Second smallest
matches.append({'queryIdx': i, 'trainIdx': int(min_idx), 'distance': float(min_dist)})
return matches
def gpu_estimate_pose(self, matches, camera_matrix):
"""GPU-accelerated pose estimation"""
if not matches or camera_matrix is None:
return np.eye(4)
# This is a simplified example - in practice, you'd implement
# essential matrix computation and pose recovery on GPU
return np.eye(4) # Identity for now
def cpu_process_frame(self, frame_data):
"""Fallback CPU processing"""
# Implement CPU-based processing as fallback
# This would mirror the GPU operations but using CPU libraries
return frame_data
def submit_frame_for_processing(self, frame_data):
"""Submit a frame for GPU processing"""
try:
self.processing_queue.put_nowait(frame_data)
return True
except queue.Full:
print("Processing queue is full, dropping frame")
return False
def get_processed_result(self, timeout=1.0):
"""Get processed result from GPU pipeline"""
try:
result = self.result_queue.get(timeout=timeout)
return result
except queue.Empty:
return None
def stop_pipeline(self):
"""Stop the GPU processing pipeline"""
self.running = False
if self.processing_thread:
self.processing_thread.join()
Configuration and Tuning
VSLAM Parameter Configuration
import yaml
import numpy as np
class VSLAMConfig:
def __init__(self, config_file=None):
self.config = self.load_default_config()
if config_file:
self.load_config_from_file(config_file)
def load_default_config(self):
"""Load default VSLAM configuration"""
return {
# Feature detection parameters
'feature_detection': {
'detector_type': 'SIFT', # Options: SIFT, SURF, ORB, AKAZE
'max_features': 2000,
'quality_level': 0.01,
'min_distance': 10,
'block_size': 3
},
# Matching parameters
'matching': {
'matcher_type': 'BF', # Options: BF, FLANN
'distance_metric': 'Hamming', # Hamming for binary, L2 for floating point
'max_distance': 0.7,
'cross_check': True,
'knn_ratio': 0.8
},
# Tracking parameters
'tracking': {
'max_track_length': 30,
'min_track_visibility': 0.5,
'motion_model': 'constant_velocity', # Options: constant_velocity, constant_acceleration
'prediction_threshold': 10.0 # pixels
},
# Mapping parameters
'mapping': {
'min_triangulation_angle': 5, # degrees
'max_reprojection_error': 2.0, # pixels
'min_parallax': 0.01, # meters
'outlier_rejection_threshold': 3.0 # standard deviations
},
# Optimization parameters
'optimization': {
'enable_bundle_adjustment': True,
'ba_window_size': 10, # keyframes
'ba_max_iterations': 50,
'ba_gradient_tolerance': 1e-6,
'enable_loop_closure': True,
'loop_closure_threshold': 0.1 # meters
},
# Performance parameters
'performance': {
'max_processing_fps': 30,
'gpu_acceleration': True,
'use_multi_threading': True,
'memory_budget_mb': 1024,
'keyframe_selection_strategy': 'distance_based' # Options: distance_based, appearance_based
},
# Camera parameters
'camera': {
'resolution': [640, 480],
'fov_horizontal': 60, # degrees
'fov_vertical': 45, # degrees
'baseline': 0.1, # meters (for stereo)
'depth_range': [0.1, 10.0] # meters
}
}
def load_config_from_file(self, config_file):
"""Load configuration from YAML file"""
try:
with open(config_file, 'r') as f:
loaded_config = yaml.safe_load(f)
# Merge with defaults, preserving defaults for missing values
self.merge_configs(self.config, loaded_config)
except FileNotFoundError:
print(f"Config file {config_file} not found, using defaults")
except yaml.YAMLError as e:
print(f"Error parsing config file: {e}")
def merge_configs(self, base, override):
"""Recursively merge configuration dictionaries"""
for key, value in override.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
self.merge_configs(base[key], value)
else:
base[key] = value
def get_feature_detection_params(self):
"""Get feature detection parameters"""
return self.config['feature_detection']
def get_matching_params(self):
"""Get matching parameters"""
return self.config['matching']
def get_tracking_params(self):
"""Get tracking parameters"""
return self.config['tracking']
def get_mapping_params(self):
"""Get mapping parameters"""
return self.config['mapping']
def get_optimization_params(self):
"""Get optimization parameters"""
return self.config['optimization']
def get_performance_params(self):
"""Get performance parameters"""
return self.config['performance']
def get_camera_params(self):
"""Get camera parameters"""
return self.config['camera']
def save_config_to_file(self, config_file):
"""Save current configuration to YAML file"""
with open(config_file, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False)
def validate_config(self):
"""Validate configuration parameters"""
errors = []
# Validate feature detection parameters
fd = self.config['feature_detection']
if fd['max_features'] <= 0:
errors.append("max_features must be positive")
if fd['quality_level'] <= 0 or fd['quality_level'] > 1:
errors.append("quality_level must be between 0 and 1")
# Validate matching parameters
m = self.config['matching']
if m['max_distance'] <= 0:
errors.append("max_distance must be positive")
if m['knn_ratio'] <= 0 or m['knn_ratio'] > 1:
errors.append("knn_ratio must be between 0 and 1")
# Validate mapping parameters
mp = self.config['mapping']
if mp['min_triangulation_angle'] <= 0:
errors.append("min_triangulation_angle must be positive")
if mp['max_reprojection_error'] <= 0:
errors.append("max_reprojection_error must be positive")
# Validate performance parameters
p = self.config['performance']
if p['max_processing_fps'] <= 0:
errors.append("max_processing_fps must be positive")
if p['memory_budget_mb'] <= 0:
errors.append("memory_budget_mb must be positive")
return errors
class VSLAMTuner:
def __init__(self, config):
self.config = config
self.performance_metrics = {
'tracking_accuracy': [],
'processing_time': [],
'feature_count': [],
'map_coverage': []
}
def auto_tune_parameters(self, dataset):
"""Automatically tune parameters based on dataset characteristics"""
print("Starting automatic parameter tuning...")
# Analyze dataset characteristics
characteristics = self.analyze_dataset(dataset)
# Adjust parameters based on analysis
self.adjust_parameters_for_characteristics(characteristics)
print("Parameter tuning completed.")
def analyze_dataset(self, dataset):
"""Analyze dataset to determine optimal parameters"""
characteristics = {
'texture_richness': 0.5, # 0-1 scale
'motion_complexity': 0.5, # 0-1 scale
'lighting_variability': 0.5, # 0-1 scale
'scene_structure': 0.5, # 0-1 scale
'camera_specs': {}
}
# Calculate texture richness (using variance of gradients)
total_variance = 0
frame_count = 0
for frame in dataset:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame
grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2)
total_variance += np.var(gradient_magnitude)
frame_count += 1
if frame_count > 0:
characteristics['texture_richness'] = min(1.0, total_variance / frame_count / 1000)
# Calculate motion complexity (based on feature motion between frames)
# This is a simplified example - in practice, you'd analyze optical flow
return characteristics
def adjust_parameters_for_characteristics(self, characteristics):
"""Adjust parameters based on dataset characteristics"""
# Adjust feature detection based on texture richness
if characteristics['texture_richness'] < 0.3:
# Low texture - need more features
self.config.config['feature_detection']['max_features'] = 3000
self.config.config['feature_detection']['quality_level'] = 0.005
elif characteristics['texture_richness'] > 0.7:
# High texture - can use fewer features
self.config.config['feature_detection']['max_features'] = 1500
self.config.config['feature_detection']['quality_level'] = 0.02
# Adjust matching based on lighting variability
if characteristics['lighting_variability'] > 0.5:
# High lighting variability - use more robust matching
self.config.config['matching']['max_distance'] = 0.8
self.config.config['matching']['knn_ratio'] = 0.9
else:
# Stable lighting - can be more strict
self.config.config['matching']['max_distance'] = 0.6
self.config.config['matching']['knn_ratio'] = 0.7
# Adjust tracking based on motion complexity
if characteristics['motion_complexity'] > 0.6:
# Complex motion - need more frequent keyframe selection
self.config.config['tracking']['prediction_threshold'] = 15.0
else:
# Simple motion - can track longer
self.config.config['tracking']['prediction_threshold'] = 5.0
print(f"Adjusted parameters based on dataset analysis:")
print(f" Texture richness: {characteristics['texture_richness']:.2f}")
print(f" Max features: {self.config.config['feature_detection']['max_features']}")
print(f" Quality level: {self.config.config['feature_detection']['quality_level']}")
Learning Objectives Review
- Understand Visual SLAM concepts and algorithms ✓
- Implement GPU-accelerated VSLAM using Isaac ROS ✓
- Configure and tune VSLAM parameters for robotics applications ✓
- Integrate VSLAM with navigation and perception pipelines ✓
- Evaluate VSLAM performance and accuracy ✓
- Troubleshoot common VSLAM issues ✓
Practical Exercise
- Set up Isaac ROS VSLAM with stereo camera input
- Configure the system with appropriate parameters for your environment
- Implement feature detection and tracking pipeline
- Create a mapping and optimization system
- Evaluate the system's performance in different lighting conditions
- Tune parameters to optimize accuracy and processing speed
Assessment Questions
- Explain the difference between visual odometry and full SLAM.
- What are the advantages of GPU acceleration in VSLAM systems?
- How do you handle the scale ambiguity problem in monocular VSLAM?
- What is bundle adjustment and why is it important in VSLAM?
Further Reading
- "Visual SLAM Algorithms: A Survey" by S. S. Tsai
- "Parallel Tracking and Mapping for Small AR Workspaces" by G. Klein
- "Real-Time Monocular SLAM: Why Filter?" by J. Engel
- Isaac ROS VSLAM Documentation: https://nvidia-isaac-ros.github.io/repositories_and_packages/isaac_ros_visual_slam/index.html
Next Steps
Continue to Nav2 Path Planning to learn about path planning for humanoid robots using Nav2.