Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Publishers and Subscribers

ros-z implements ROS 2's publish-subscribe pattern with type-safe, zero-copy messaging over Zenoh. This enables efficient, decoupled communication between nodes with minimal overhead.

Note

The pub-sub pattern forms the foundation of ROS 2 communication, allowing nodes to exchange data without direct coupling. ros-z leverages Zenoh's efficient transport layer for optimal performance.

Visual Flow

graph TD
    A[ZContextBuilder] -->|configure| B[ZContext]
    B -->|create| C[Node]
    C -->|publisher| D[Publisher]
    C -->|subscriber| E[Subscriber]
    D -->|publish| F[Topic]
    F -->|deliver| E
    E -->|callback| G[Message Handler]

Key Features

FeatureDescriptionBenefit
Type SafetyStrongly-typed messages using Rust structsCompile-time error detection
Zero-CopyEfficient message passing via ZenohReduced latency and CPU usage
QoS ProfilesConfigurable reliability, durability, historyFine-grained delivery control
Async/BlockingDual API for both paradigmsFlexible integration patterns

Publisher Example

This example demonstrates publishing "Hello World" messages to a topic. The publisher sends messages periodically, showcasing the fundamental publishing pattern.

/// Talker node that publishes "Hello World" messages to a topic
///
/// # Arguments
/// * `ctx` - The ROS-Z context
/// * `topic` - The topic name to publish to
/// * `period` - Duration between messages
/// * `max_count` - Optional maximum number of messages to publish. If None, publishes indefinitely.
pub async fn run_talker(
    ctx: ZContext,
    topic: &str,
    period: Duration,
    max_count: Option<usize>,
) -> Result<()> {
    // Create a node named "talker"
    let node = ctx.create_node("talker").build()?;

    // Create a publisher with a custom Quality of Service profile
    let qos = QosProfile {
        history: QosHistory::KeepLast(NonZeroUsize::new(7).unwrap()),
        ..Default::default()
    };
    let publisher = node.create_pub::<RosString>(topic).with_qos(qos).build()?;

    let mut count = 1;

    loop {
        // Create the message
        let msg = RosString {
            data: format!("Hello World: {}", count),
        };

        // Log the message being published
        println!("Publishing: '{}'", msg.data);

        // Publish the message (non-blocking)
        publisher.async_publish(&msg).await?;

        // Check if we've reached the max count
        if let Some(max) = max_count
            && count >= max
        {
            break;
        }

        // Wait for the next publish cycle
        tokio::time::sleep(period).await;

        count += 1;
    }

    Ok(())
}

Key points:

  • QoS Configuration: Uses KeepLast(7) to buffer the last 7 messages
  • Async Publishing: Non-blocking async_publish() for efficient I/O. For blocking (non-async) contexts, use publisher.publish(&msg)? instead.
  • Rate Control: Uses tokio::time::sleep() to control publishing frequency
  • Bounded Operation: Optional max_count for testing scenarios

Running the publisher:

# Basic usage
cargo run --example demo_nodes_talker

# Custom topic and rate
cargo run --example demo_nodes_talker -- --topic /my_topic --period 0.5

Subscriber Example

This example demonstrates subscribing to messages from a topic. The subscriber receives and displays messages, showing both timeout-based and async reception patterns.

/// Listener node that subscribes to a topic
///
/// # Arguments
/// * `ctx` - The ROS-Z context
/// * `topic` - The topic name to subscribe to
/// * `max_count` - Optional maximum number of messages to receive. If None, listens indefinitely.
/// * `timeout` - Optional timeout duration. If None, waits indefinitely.
///
/// # Returns
/// A vector of received messages
pub async fn run_listener(
    ctx: ZContext,
    topic: &str,
    max_count: Option<usize>,
    timeout: Option<Duration>,
) -> Result<Vec<String>> {
    // Create a node named "listener"
    let node = ctx.create_node("listener").build()?;

    // Create a subscription to the "chatter" topic
    let qos = QosProfile {
        history: QosHistory::KeepLast(NonZeroUsize::new(10).unwrap()),
        ..Default::default()
    };
    let subscriber = node.create_sub::<RosString>(topic).with_qos(qos).build()?;

    let mut received_messages = Vec::new();
    let start = std::time::Instant::now();

    // Receive messages in a loop
    loop {
        // Check timeout
        if let Some(t) = timeout
            && start.elapsed() > t
        {
            break;
        }

        // Try to receive with a small timeout to allow checking other conditions
        let recv_result = if timeout.is_some() || max_count.is_some() {
            subscriber.recv_timeout(Duration::from_millis(100))
        } else {
            // If no limits, use async_recv
            subscriber.async_recv().await
        };

        match recv_result {
            Ok(msg) => {
                // Log the received message
                println!("I heard: [{}]", msg.data);
                received_messages.push(msg.data.clone());

                // Check if we've received enough messages
                if let Some(max) = max_count
                    && received_messages.len() >= max
                {
                    break;
                }
            }
            Err(_) => {
                // Continue if timeout on recv_timeout
                if timeout.is_some() || max_count.is_some() {
                    continue;
                } else {
                    break;
                }
            }
        }
    }

    Ok(received_messages)
}

Key points:

  • Flexible Reception: Supports timeout-based and indefinite blocking
  • Testable Design: Returns received messages for verification
  • Bounded Operation: Optional max_count and timeout parameters
  • QoS Configuration: Uses KeepLast(10) for message buffering

Running the subscriber:

# Basic usage
cargo run --example demo_nodes_listener

# Custom topic
cargo run --example demo_nodes_listener -- --topic /my_topic

Complete Pub-Sub Workflow

To see publishers and subscribers in action together, you'll need to start a Zenoh router first:

Terminal 1 - Start Zenoh Router:

cargo run --example zenoh_router

Terminal 2 - Start Subscriber:

cargo run --example demo_nodes_listener

Terminal 3 - Start Publisher:

cargo run --example demo_nodes_talker

Subscriber Patterns

ros-z provides three patterns for receiving messages, each suited for different use cases:

Tip

use ros_z::Builder; must be in scope to call .build() on any ros-z builder type. Add it alongside your other ros-z imports.

Pattern 1: Blocking Receive (Pull Model)

Best for: Simple sequential processing, scripting

use ros_z::Builder; // required to call .build()

let subscriber = node
    .create_sub::<RosString>("topic_name")
    .build()?;

while let Ok(msg) = subscriber.recv() {
    println!("Received: {}", msg.data);
}

Pattern 2: Async Receive (Pull Model)

Best for: Integration with async codebases, handling multiple streams

use ros_z::Builder; // required to call .build()

let subscriber = node
    .create_sub::<RosString>("topic_name")
    .build()?;

while let Ok(msg) = subscriber.async_recv().await {
    println!("Received: {}", msg.data);
}

Pattern 3: Callback (Push Model)

Best for: Event-driven architectures, low-latency response

use ros_z::Builder; // required to call .build_with_callback()

let subscriber = node
    .create_sub::<RosString>("topic_name")
    .build_with_callback(|msg| {
        println!("Received: {}", msg.data);
    })?;

// No need to call recv() - callback handles messages automatically
// Your code continues while messages are processed in the background

Tip

Use callbacks for low-latency event-driven processing. Use blocking/async receive when you need explicit control over when messages are processed.

Pattern Comparison

AspectBlocking ReceiveAsync ReceiveCallback
Control FlowSequentialSequentialEvent-driven
LatencyMedium (poll-based)Medium (poll-based)Low (immediate)
MemoryQueue size × messageQueue size × messageNo queue
BackpressureBuilt-in (queue full)Built-in (queue full)None (drops if slow)
Use CaseSimple scriptsAsync applicationsReal-time response

Quality of Service (QoS)

QoS profiles control message delivery behavior. Both publishers and subscribers accept a QoS profile:

Publisher QoS:

use std::num::NonZeroUsize;
use ros_z::Builder;
use ros_z::qos::{QosProfile, QosHistory, QosReliability};

let qos = QosProfile {
    history: QosHistory::KeepLast(NonZeroUsize::new(10).unwrap()),
    reliability: QosReliability::Reliable,
    ..Default::default()
};

let publisher = node
    .create_pub::<RosString>("topic")
    .with_qos(qos)
    .build()?;

Subscriber QoS:

use std::num::NonZeroUsize;
use ros_z::Builder;
use ros_z::qos::{QosProfile, QosHistory, QosReliability};

let qos = QosProfile {
    history: QosHistory::KeepLast(NonZeroUsize::new(10).unwrap()),
    reliability: QosReliability::Reliable,
    ..Default::default()
};

let subscriber = node
    .create_sub::<RosString>("topic")
    .with_qos(qos)
    .build()?;

Tip

Use QosHistory::KeepLast(NonZeroUsize::new(1).unwrap()) for sensor data and QosReliability::Reliable for critical commands. Match QoS profiles between publishers and subscribers for optimal message delivery.

Name Remapping

ros-z supports ROS 2-style topic remapping via ZContextBuilder::with_remap_rule(). Remapping rules apply to all nodes created from the same context and redirect topic/service names at the context level.

fn main() -> zenoh::Result<()> {
use ros_z::context::ZContextBuilder;
use ros_z::Builder;

let ctx = ZContextBuilder::default()
    .with_remap_rule("/chatter:=/my_chatter")?  // redirect /chatter to /my_chatter
    .with_remap_rule("__node:=renamed_node")?   // rename the node
    .build()?;
Ok(())
}

Multiple rules can be added with .with_remap_rules():

fn main() -> zenoh::Result<()> {
use ros_z::context::ZContextBuilder;
use ros_z::Builder;

let ctx = ZContextBuilder::default()
    .with_remap_rules(["/input:=/sensor/data", "/output:=/processed/data"])?
    .build()?;
Ok(())
}

The rule format follows the ROS 2 convention: from:=to.

ROS 2 Interoperability

ros-z publishers and subscribers interoperate with ROS 2 C++ and Python nodes when both sides share the same Zenoh transport:

Requirements:

  • ROS 2 nodes must use rmw_zenoh_cpp (export RMW_IMPLEMENTATION=rmw_zenoh_cpp) or be bridged via zenoh-bridge-ros2dds
  • Both sides must use matching message types with identical RIHS01 type hashes
  • All nodes must connect to the same Zenoh router
# List active topics
ros2 topic list

# Echo messages from ros-z publisher
ros2 topic echo /chatter

# Publish to ros-z subscriber from ROS 2
ros2 topic pub /chatter std_msgs/msg/String "data: 'Hello from ROS 2'"

# Check topic info
ros2 topic info /chatter

Warning

If type hashes differ (e.g. mismatched message definitions), nodes won't exchange messages. Enable RUST_LOG=ros_z=debug to see the hash in the key expression and compare it with the ROS 2 side. See Troubleshooting for diagnosis steps.

Resources

Start with the examples above to understand the basic pub-sub workflow, then explore custom messages for domain-specific communication.