Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 30 additions & 29 deletions flexiv_test_nodes/README.md
Original file line number Diff line number Diff line change
@@ -1,60 +1,61 @@
# Flexiv Test Nodes

Demo nodes for the Flexiv ROS2 driver.
This package contains the example demo nodes to use with/without Flexiv ROS 2 driver.

## Nodes
## Robot States Publisher and Monitor

These nodes demonstrate how to publish and monitor Flexiv robot states directly from the Flexiv RDK using the Python `flexivrdk` package, without relying on the main ROS 2 driver stack.

### Requirements

- ROS 2 Humble or Jazzy
- `flexivrdk` Python package (install via `pip install flexivrdk`)
- `flexiv_msgs` package (built with `flexiv_ros2`)

### 1. Robot States Publisher

Publishes robot states directly from Flexiv RDK to ROS2 topics, bypassing the main ROS2 driver.
Publishes robot states directly from Flexiv RDK to the ROS 2 topic, bypassing the main ROS 2 driver.

**Use case:** When you need direct robot state monitoring alongside or instead of the main flexiv_ros2 driver.
**Features:**

**Installation:**
```bash
pip install flexivrdk
```
- Direct RDK integration using Python `flexivrdk` package
- Publishes Flexiv robot states at 100 Hz
- Monitors robot status (busy, operational, fault, reduced)
- Compatible with ROS 2 Humble and Jazzy

**Use case:** When you need direct robot state monitoring instead of the `flexiv_robot_states_broadcaster` node from the main driver stack.

**Launch:**

```bash
ros2 launch flexiv_test_nodes robot_states_publisher.launch.py robot_sn:=[robot_sn]
```

**Published topic:**
- `/<robot_sn_with_underscores>/flexiv_robot_states` ([flexiv_msgs/RobotStates])

- `/${robot_sn}/flexiv_robot_states` ([`flexiv_msgs/msg/RobotStates.msg`](../flexiv_msgs/msg/RobotStates.msg))

**Parameters:**

- `robot_sn`: Robot serial number (required)
- `network_interface`: Network interface name (optional, auto-detect if empty)
- `publish_rate`: Publishing rate in Hz (default: 100)

**Features:**
- Direct RDK integration using Python flexivrdk package
- Publishes complete robot states at 100 Hz
- Monitors robot status (busy, operational, fault, reduced)
- Compatible with ROS2 Humble and Jazzy
- `publish_rate`: Publish rate in Hz (default: 100)

### 2. Robot States Monitor

Example subscriber node demonstrating how to receive and process robot states.

**Run:**

```bash
ros2 run flexiv_test_nodes robot_states_monitor --ros-args -p robot_sn:=[robot_sn]
```

### 3. Publisher Joint Trajectory Controller

Existing test node for joint trajectory control.

## Requirements

- ROS2 Humble or Jazzy
- flexivrdk Python package (install via `pip install flexivrdk`)
- flexiv_msgs package (built with flexiv_ros2)
> [!NOTE]
>
> - The ROS/RDK version must match the Flexiv robot software version.
> - Topic names are automatically sanitized for robot serial numbers (dash becomes underscore).

## Notes
## Publisher Joint Trajectory Controller

- The RDK version must match your robot firmware version
- Topic names automatically sanitize robot serial numbers (dashes become underscores)
- This node provides state monitoring without requiring the full flexiv_ros2 driver stack
Example node to send joint position commands to the joint trajectory controller.
80 changes: 37 additions & 43 deletions flexiv_test_nodes/flexiv_test_nodes/robot_states_monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
"""
Example subscriber node demonstrating how to receive and process robot states
published by the robot_states_publisher node.
"""Example subscriber node demonstrating how to receive and process robot
states published by the robot_states_publisher node.

Author: Flexiv Robotics
License: Apache-2.0
Expand All @@ -14,78 +13,73 @@


class StateMonitor(Node):
"""
Example ROS2 node that subscribes to robot states and displays key information.
"""

"""Example ROS2 node that subscribes to robot states and displays key
information."""

def __init__(self):
super().__init__('state_monitor')
super().__init__("state_monitor")

# Declare a parameter for the robot serial number
self.declare_parameter('robot_sn', 'Rizon4-000000')
robot_sn = self.get_parameter('robot_sn').get_parameter_value().string_value
self.declare_parameter("robot_sn", "Rizon4-000000")
robot_sn = self.get_parameter("robot_sn").get_parameter_value().string_value

# Construct topic name dynamically based on robot SN
# ROS topic names can't contain dashes, so replace with underscores
topic_robot_sn = robot_sn.replace('-', '_')
topic_name = f'/{topic_robot_sn}/flexiv_robot_states'
self.get_logger().info(f'State Monitor started')
self.get_logger().info(f'Subscribing to topic: {topic_name}')
topic_robot_sn = robot_sn.replace("-", "_")
topic_name = f"/{topic_robot_sn}/flexiv_robot_states"

self.get_logger().info(f"State Monitor started")
self.get_logger().info(f"Subscribing to topic: {topic_name}")

# Create subscription
self.subscription = self.create_subscription(
RobotStates,
topic_name,
self.callback,
10
RobotStates, topic_name, self.callback, 10
)

def callback(self, msg):
"""
Callback function that processes received robot states.

"""Callback function that processes received robot states.

Args:
msg: RobotStates message containing complete robot state information
"""
# Access joint positions (7 joints)
self.get_logger().info(f'Joint positions: {[f"{j:.3f}" for j in msg.q]}')

# Access TCP position
tcp = msg.tcp_pose.pose.position
self.get_logger().info(f'TCP Position: x={tcp.x:.3f}, y={tcp.y:.3f}, z={tcp.z:.3f}')

self.get_logger().info(
f"TCP Position: x={tcp.x:.3f}, y={tcp.y:.3f}, z={tcp.z:.3f}"
)

# Access TCP orientation (quaternion)
tcp_ori = msg.tcp_pose.pose.orientation
self.get_logger().info(
f'TCP Orientation: w={tcp_ori.w:.3f}, x={tcp_ori.x:.3f}, '
f'y={tcp_ori.y:.3f}, z={tcp_ori.z:.3f}'
f"TCP Orientation: w={tcp_ori.w:.3f}, x={tcp_ori.x:.3f}, "
f"y={tcp_ori.y:.3f}, z={tcp_ori.z:.3f}"
)

# Access external force magnitude
force = msg.ext_wrench_in_tcp.wrench.force
force_mag = math.sqrt(force.x**2 + force.y**2 + force.z**2)
self.get_logger().info(f'External Force Magnitude: {force_mag:.3f} N')
self.get_logger().info('---')
self.get_logger().info(f"External Force Magnitude: {force_mag:.3f} N")

self.get_logger().info("---")


def main(args=None):
"""
Main entry point for the State Monitor example node.
"""
"""Main entry point for the State Monitor example node."""
rclpy.init(args=args)

try:
node = StateMonitor()
rclpy.spin(node)
except KeyboardInterrupt:
print('\nKeyboard interrupt, shutting down...')
print("\nKeyboard interrupt, shutting down...")
finally:
if 'node' in locals():
if "node" in locals():
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
if __name__ == "__main__":
main()
Loading