Added new topics to record rosbag2 dataset#82
Conversation
…message handling in service_bag_recorder Signed-off-by: Dongyun Kim <kdy@robotis.com>
… packages Signed-off-by: Dongyun Kim <kdy@robotis.com>
Signed-off-by: Dongyun Kim <kdy@robotis.com>
Summary of ChangesHello @DongyunRobotis, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the rosbag2 recording capabilities by extending the list of automatically recorded topics to include essential robot state and sensor data like transforms, robot descriptions, and camera information. Crucially, it introduces a robust solution for handling latched topics, which are often missed in standard recordings due to their publish-once nature. By buffering these messages before recording begins and flushing them at the start, the system ensures that complete and accurate initial state information is always captured, leading to more reliable and comprehensive datasets for analysis and simulation. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull request overview
This PR implements support for recording latched topics in rosbag2 by introducing a buffering mechanism that caches latched messages received during the prepare stage and flushes them when recording starts. This ensures critical system state information (like /tf, /robot_description, and camera info topics) is reliably captured.
Changes:
- Implemented latched topic detection and buffering mechanism in the rosbag recorder
- Added configuration entries for additional topics (
/tf,/tf_static,/robot_description, camera info) across multiple robot configurations - Version bumped to 0.7.3 across all packages
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| rosbag_recorder/src/service_bag_recorder.cpp | Core implementation of latched message buffering, QoS detection, and flush logic |
| rosbag_recorder/include/rosbag_recorder/service_bag_recorder.hpp | Added new member variables and method declarations for latched message handling |
| rosbag_recorder/package.xml | Version bump to 0.7.3 |
| rosbag_recorder/CHANGELOG.rst | Changelog entry for version 0.7.3 |
| physical_ai_server/config/*.yaml | Added rosbag_extra_topic_list configuration for various robot platforms |
| physical_ai_server/package.xml | Version bump to 0.7.3 |
| physical_ai_server/setup.py | Version bump to 0.7.3 |
| physical_ai_server/CHANGELOG.rst | Changelog entry for version 0.7.3 |
| physical_ai_tools/package.xml | Version bump to 0.7.3 |
| physical_ai_tools/CHANGELOG.rst | Changelog entry for version 0.7.3 |
| physical_ai_manager/package.json | Version bump to 0.7.3 |
| physical_ai_manager/CHANGELOG.rst | Changelog entry for version 0.7.3 |
| physical_ai_interfaces/package.xml | Version bump to 0.7.3 |
| physical_ai_interfaces/CHANGELOG.rst | Changelog entry for version 0.7.3 |
| Isaac-GR00T | Added subproject commit reference |
Files not reviewed (1)
- physical_ai_manager/package-lock.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return rclcpp::QoS(rclcpp::KeepLast(10)) | ||
| .reliable() | ||
| .transient_local(); |
There was a problem hiding this comment.
The magic number 10 for KeepLast queue size should be extracted as a named constant or explained with a comment. It's unclear why latched topics need a queue size of 10 specifically when other topics use 100.
| // If not recording yet but this is a latched topic, buffer it | ||
| if (!is_recording_ && is_latched_topic(topic)) { | ||
| latched_message_buffer_[topic] = {serialized_msg, this->now()}; | ||
| RCLCPP_DEBUG(this->get_logger(), "Buffered latched message from topic: %s", topic.c_str()); |
There was a problem hiding this comment.
The is_latched_topic function performs publisher info lookup on every message received before recording starts. Consider caching the latched topic determination during subscription creation to avoid repeated lookups.
| // If not recording yet but this is a latched topic, buffer it | |
| if (!is_recording_ && is_latched_topic(topic)) { | |
| latched_message_buffer_[topic] = {serialized_msg, this->now()}; | |
| RCLCPP_DEBUG(this->get_logger(), "Buffered latched message from topic: %s", topic.c_str()); | |
| // If not recording yet but this is a latched topic, buffer it. | |
| // Cache the latched status per topic to avoid repeated publisher info lookups. | |
| if (!is_recording_) { | |
| static std::unordered_map<std::string, bool> latched_topic_cache; | |
| bool is_latched = false; | |
| auto cache_it = latched_topic_cache.find(topic); | |
| if (cache_it != latched_topic_cache.end()) { | |
| is_latched = cache_it->second; | |
| } else { | |
| is_latched = is_latched_topic(topic); | |
| latched_topic_cache[topic] = is_latched; | |
| } | |
| if (is_latched) { | |
| latched_message_buffer_[topic] = {serialized_msg, this->now()}; | |
| RCLCPP_DEBUG(this->get_logger(), "Buffered latched message from topic: %s", topic.c_str()); | |
| } |
|
|
||
| // If not recording yet but this is a latched topic, buffer it | ||
| if (!is_recording_ && is_latched_topic(topic)) { | ||
| latched_message_buffer_[topic] = {serialized_msg, this->now()}; |
There was a problem hiding this comment.
Only the most recent latched message is stored per topic. If a latched topic publishes multiple updates before recording starts, earlier messages will be overwritten. Consider whether this is the intended behavior or if all messages should be buffered.
There was a problem hiding this comment.
Code Review
This pull request correctly implements the recording of additional topics, including latched topics like /tf and /robot_description, by creating subscriptions early and buffering messages. The approach is sound. My main feedback is on a performance issue in the implementation of message handling for latched topics, which could be problematic in systems with high message rates. I've provided suggestions to optimize this by caching the latched status of topics. I also have a minor suggestion to reduce log verbosity.
| void delete_bag_directory(const std::string & bag_uri); | ||
| void create_subscriptions(); | ||
| rclcpp::QoS get_qos_for_topic(const std::string & topic); | ||
| bool is_latched_topic(const std::string & topic); |
There was a problem hiding this comment.
This method is called for every message before recording starts, which causes a performance issue due to repeated discovery of publishers. It's better to determine if a topic is latched once at subscription time. This method can be removed after refactoring create_subscriptions and handle_serialized_message.
void flush_latched_messages();|
|
||
| std::vector<rclcpp::GenericSubscription::SharedPtr> subscriptions_; | ||
| std::unique_ptr<rosbag2_cpp::Writer> writer_; | ||
| std::unordered_map<std::string, std::string> type_for_topic_; |
There was a problem hiding this comment.
To improve performance by avoiding repeated checks for latched topics, I suggest adding a member to cache the names of latched topics. This cache will be populated once in create_subscriptions.
std::unordered_map<std::string, std::string> type_for_topic_;
std::unordered_set<std::string> latched_topics_;| auto qos = get_qos_for_topic(topic); | ||
| auto sub = this->create_generic_subscription( | ||
| topic, | ||
| type, | ||
| rclcpp::QoS(100), | ||
| qos, |
There was a problem hiding this comment.
To improve performance, let's cache whether a topic is latched here. We can inspect the QoS returned by get_qos_for_topic. This avoids repeatedly checking in handle_serialized_message.
auto qos = get_qos_for_topic(topic);
if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
latched_topics_.insert(topic);
}| if (!is_recording_ && is_latched_topic(topic)) { | ||
| latched_message_buffer_[topic] = {serialized_msg, this->now()}; | ||
| RCLCPP_DEBUG(this->get_logger(), "Buffered latched message from topic: %s", topic.c_str()); | ||
| } |
There was a problem hiding this comment.
With the latched topic status now cached, this check can be simplified to use the cache, which significantly improves performance by avoiding repeated publisher discovery.
if (!is_recording_ && latched_topics_.count(topic)) {
latched_message_buffer_[topic] = {serialized_msg, this->now()};
RCLCPP_DEBUG(this->get_logger(), "Buffered latched message from topic: %s", topic.c_str());
}| bool ServiceBagRecorder::is_latched_topic(const std::string & topic) | ||
| { | ||
| auto publishers_info = this->get_publishers_info_by_topic(topic); | ||
|
|
||
| for (const auto & pub_info : publishers_info) { | ||
| if (pub_info.qos_profile().durability() == rclcpp::DurabilityPolicy::TransientLocal) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } |
| const auto it = type_for_topic_.find(topic); | ||
| if (it != type_for_topic_.end()) { | ||
| writer_->write(buffered.msg, topic, it->second, buffered.timestamp); | ||
| RCLCPP_INFO(this->get_logger(), "Flushed latched message from: %s", topic.c_str()); |
There was a problem hiding this comment.
This log message is inside a loop and could be very noisy if there are many latched topics. Consider changing it to RCLCPP_DEBUG to avoid flooding the logs. The summary log at the beginning of the function is sufficient at the INFO level.
RCLCPP_DEBUG(this->get_logger(), "Flushed latched message from: %s", topic.c_str());Signed-off-by: Dongyun Kim <kdy@robotis.com>
- Add latched_topics_ cache to avoid repeated publisher info lookups - Remove is_latched_topic() function as it's no longer needed - Cache latched status during subscription creation - Use O(1) lookup instead of O(n) publisher discovery per message - Change individual flush logs to DEBUG level to reduce noise - Add comment explaining KeepLast(10) queue size for latched topics - Clear latched_topics_ cache when recreating subscriptions Addresses feedback from Gemini Code Assist and GitHub Copilot reviews. Signed-off-by: Dongyun Kim <kdy@robotis.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 18 changed files in this pull request and generated 15 comments.
Files not reviewed (1)
- physical_ai_manager/package-lock.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| rosbag_extra_topic_list: | ||
| - /tf | ||
| - /tf_static |
There was a problem hiding this comment.
The PR description mentions adding /tf_status to the recorded topics, but this topic is not present in any of the configuration files. Only /tf and /tf_static are added. If /tf_status was intended to be recorded, it should be added to the rosbag_extra_topic_list in the configuration files.
| - /tf_static | |
| - /tf_static | |
| - /tf_status |
| rosbag_extra_topic_list: | ||
| - /tf | ||
| - /tf_static | ||
| - /robot_description | ||
| - /zed/zed_node/rgb/camera_info | ||
| - /zed/zed_node/left/camera_info | ||
| - /zed/zed_node/right/camera_info | ||
| - /camera_left/camera_left/color/camera_info | ||
| - /camera_right/camera_right/color/camera_info | ||
|
|
There was a problem hiding this comment.
The PR description mentions adding /tf_status to the recorded topics, but this topic is not present in any of the configuration files. Only /tf and /tf_static are added. If /tf_status was intended to be recorded, it should be added to the rosbag_extra_topic_list in the configuration files.
|
|
||
| 0.7.3 (2026-01-19) | ||
| ------------------ | ||
| * Modified to record /tf, /robot_description, /camera_info topics in rosbag2 |
There was a problem hiding this comment.
The changelog entry is incomplete. It mentions recording /tf, /robot_description, and /camera_info topics but omits /tf_static which is actually added in all configuration files. The changelog should accurately list all topics being added, including /tf_static.
| * Modified to record /tf, /robot_description, /camera_info topics in rosbag2 | |
| * Modified to record /tf, /tf_static, /robot_description, /camera_info topics in rosbag2 |
| std::unordered_map<std::string, std::string> type_for_topic_; | ||
|
|
||
| // Cache of latched topic names to avoid repeated publisher info lookups | ||
| std::unordered_set<std::string> latched_topics_; |
There was a problem hiding this comment.
Missing include for std::unordered_set. The header file uses std::unordered_set but does not include the required header <unordered_set>. This will cause compilation errors.
| - /zed/zed_node/rgb/camera_info | ||
| - /zed/zed_node/left/camera_info | ||
| - /zed/zed_node/right/camera_info | ||
| - /camera_left/camera_left/color/camera_info | ||
| - /camera_right/camera_right/color/camera_info |
There was a problem hiding this comment.
The PR description mentions recording /*/camera_info topics (wildcard pattern), but the implementation adds specific camera_info topics like /zed/zed_node/rgb/camera_info. The PR description should accurately reflect what was implemented, or the implementation should support the wildcard pattern as described.
| - /zed/zed_node/rgb/camera_info | |
| - /zed/zed_node/left/camera_info | |
| - /zed/zed_node/right/camera_info | |
| - /camera_left/camera_left/color/camera_info | |
| - /camera_right/camera_right/color/camera_info | |
| - /*/camera_info |
| is_recording_ = true; | ||
| flush_latched_messages(); |
There was a problem hiding this comment.
The flush_latched_messages function is called without mutex protection. This function is invoked in handle_start (line 187) after is_recording_ is set to true, but outside the scope of the service handler's mutex lock. Meanwhile, handle_serialized_message (which runs in subscription callbacks) accesses the same latched_message_buffer_ under mutex protection. This creates a race condition where flush_latched_messages could be iterating over latched_message_buffer_ while handle_serialized_message might be modifying it, leading to undefined behavior. The flush_latched_messages call should be moved inside the try block before line 183, or the entire operation including setting is_recording_ and flushing should be protected by the mutex.
| rclcpp::QoS ServiceBagRecorder::get_qos_for_topic( | ||
| const std::string & topic) | ||
| { | ||
| std::scoped_lock<std::mutex> lock(mutex_); | ||
| // Get publisher info to determine QoS settings | ||
| auto publishers_info = this->get_publishers_info_by_topic(topic); | ||
|
|
||
| if (!publishers_info.empty()) { | ||
| // Check if any publisher uses TRANSIENT_LOCAL durability | ||
| for (const auto & pub_info : publishers_info) { | ||
| if (pub_info.qos_profile().durability() == rclcpp::DurabilityPolicy::TransientLocal) { | ||
| RCLCPP_INFO( | ||
| this->get_logger(), | ||
| "Topic '%s' uses TRANSIENT_LOCAL durability, using matching QoS", | ||
| topic.c_str()); | ||
| // Use smaller queue size (10) for latched topics as they typically publish once | ||
| // and rely on durability rather than queue depth | ||
| return rclcpp::QoS(rclcpp::KeepLast(10)) | ||
| .reliable() | ||
| .transient_local(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Use default QoS for other topics | ||
| return rclcpp::QoS(100); |
There was a problem hiding this comment.
The get_qos_for_topic function calls get_publishers_info_by_topic which can be an expensive operation as it queries the ROS graph. This function is called for every topic during subscription creation. While the latched status is cached afterward, the initial call during create_subscriptions could be slow if there are many topics or if publishers haven't fully initialized yet. Consider adding error handling for the case where publisher info is not yet available, or implement a retry mechanism with appropriate logging.
| if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { | ||
| latched_topics_.insert(topic); | ||
| } |
There was a problem hiding this comment.
Missing mutex protection for latched_topics_ access. The latched_topics_ set is populated in create_subscriptions (line 322) and read in handle_serialized_message (line 412). While handle_serialized_message is protected by mutex, create_subscriptions is called from handle_prepare which is also protected by mutex in the service handler. However, if create_subscriptions is ever called from a different context or if the locking strategy changes, this could lead to race conditions. Consider documenting that create_subscriptions must always be called under mutex protection, or add assertions to verify this precondition.
|
|
||
| rosbag_extra_topic_list: | ||
| - /tf | ||
| - /tf_static |
There was a problem hiding this comment.
The PR description mentions adding /tf_status to the recorded topics, but this topic is not present in any of the configuration files. Only /tf and /tf_static are added. If /tf_status was intended to be recorded, it should be added to the rosbag_extra_topic_list in the configuration files.
| - /tf_static | |
| - /tf_static | |
| - /tf_status |
| // Buffer for latched messages received before recording starts | ||
| struct BufferedMessage | ||
| { | ||
| std::shared_ptr<rclcpp::SerializedMessage> msg; | ||
| rclcpp::Time timestamp; | ||
| }; | ||
| std::unordered_map<std::string, BufferedMessage> latched_message_buffer_; |
There was a problem hiding this comment.
The BufferedMessage struct stores only one message per topic. For latched topics, this is generally correct since they publish once. However, if a latched topic republishes before recording starts (e.g., due to parameter updates or re-initialization), only the latest message will be buffered and previous messages will be lost. Consider whether this behavior is acceptable, or add a comment explaining that only the most recent latched message is preserved, which is the intended behavior.
…ods and timestamp extraction Signed-off-by: Dongyun Kim <kdy@robotis.com>
- Use source_timestamp from rmw_message_info instead of this->now() - source_timestamp is closer to header.stamp (publisher's original time) - Improves data synchronization accuracy in rosbag recordings - Latched messages still use this->now() as they only need timestamp after start Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add simplified recording mode using rosbag2 MCAP format directly - Remove LeRobot dependency for data collection - Add visualization module for rosbag data quality validation - Add test_utils package with mock topic publisher for testing - Add URDF and metadata saving for Scale AI compatibility - Remove tf_static from recording (included in URDF) - Fix multi-episode recording state management - Fix joint topic recording rate (100Hz) with MultiThreadedExecutor - Apply lint fixes (flake8, cpplint, uncrustify) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Modified rosbag2 recording to include /tf, /tf_status, /robot_description, and /*/camera_info topics in the saved data.
Solution for Latched Topics
Implemented a caching mechanism to handle latched topics that are only published once upon initial subscription:
Cache latched topic data during the prepare stage when subscribers are created
Republish cached data when rosbag recording starts to ensure proper storage
This ensures that critical system state information is reliably captured in rosbag recordings.