Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Publishers and Subscribers

ros-z implements ROS 2's publish-subscribe pattern with type-safe, zero-copy messaging over Zenoh. This enables efficient, decoupled communication between nodes with minimal overhead.

Note

The pub-sub pattern forms the foundation of ROS 2 communication, allowing nodes to exchange data without direct coupling. ros-z leverages Zenoh's efficient transport layer for optimal performance.

Visual Flow

graph TD
    A[ZContextBuilder] -->|configure| B[ZContext]
    B -->|create| C[Node]
    C -->|publisher| D[Publisher]
    C -->|subscriber| E[Subscriber]
    D -->|publish| F[Topic]
    F -->|deliver| E
    E -->|callback| G[Message Handler]

Key Features

FeatureDescriptionBenefit
Type SafetyStrongly-typed messages using Rust structsCompile-time error detection
Zero-CopyEfficient message passing via ZenohReduced latency and CPU usage
QoS ProfilesConfigurable reliability, durability, historyFine-grained delivery control
Async/BlockingDual API for both paradigmsFlexible integration patterns

Publisher Example

This example demonstrates publishing "Hello World" messages to a topic. The publisher sends messages periodically, showcasing the fundamental publishing pattern.

use std::time::Duration;

use ros_z::{
    Builder, Result,
    context::ZContext,
    qos::{QosHistory, QosProfile},
};
use ros_z_msgs::std_msgs::String as RosString;

/// Talker node that publishes "Hello World" messages to a topic
///
/// # Arguments
/// * `ctx` - The ROS-Z context
/// * `topic` - The topic name to publish to
/// * `period` - Duration between messages
/// * `max_count` - Optional maximum number of messages to publish. If None, publishes indefinitely.
pub async fn run_talker(
    ctx: ZContext,
    topic: &str,
    period: Duration,
    max_count: Option<usize>,
) -> Result<()> {
    // Create a node named "talker"
    let node = ctx.create_node("talker").build()?;

    // Create a publisher with a custom Quality of Service profile
    let qos = QosProfile {
        history: QosHistory::KeepLast(7),
        ..Default::default()
    };
    let publisher = node.create_pub::<RosString>(topic).with_qos(qos).build()?;

    let mut count = 1;

    loop {
        // Create the message
        let msg = RosString {
            data: format!("Hello World: {}", count),
        };

        // Log the message being published
        println!("Publishing: '{}'", msg.data);

        // Publish the message (non-blocking)
        publisher.async_publish(&msg).await?;

        // Check if we've reached the max count
        if let Some(max) = max_count
            && count >= max
        {
            break;
        }

        // Wait for the next publish cycle
        tokio::time::sleep(period).await;

        count += 1;
    }

    Ok(())
}

Key points:

  • QoS Configuration: Uses KeepLast(7) to buffer the last 7 messages
  • Async Publishing: Non-blocking async_publish() for efficient I/O
  • Rate Control: Uses tokio::time::sleep() to control publishing frequency
  • Bounded Operation: Optional max_count for testing scenarios

Running the publisher:

# Basic usage
cargo run --example demo_nodes_talker

# Custom topic and rate
cargo run --example demo_nodes_talker -- --topic /my_topic --period 0.5

# Publish 10 messages then exit
cargo run --example demo_nodes_talker -- --max-count 10

Subscriber Example

This example demonstrates subscribing to messages from a topic. The subscriber receives and displays messages, showing both timeout-based and async reception patterns.

use std::time::Duration;

use ros_z::{
    Builder, Result,
    context::ZContext,
    qos::{QosHistory, QosProfile},
};
use ros_z_msgs::std_msgs::String as RosString;

/// Listener node that subscribes to a topic
///
/// # Arguments
/// * `ctx` - The ROS-Z context
/// * `topic` - The topic name to subscribe to
/// * `max_count` - Optional maximum number of messages to receive. If None, listens indefinitely.
/// * `timeout` - Optional timeout duration. If None, waits indefinitely.
///
/// # Returns
/// A vector of received messages
pub async fn run_listener(
    ctx: ZContext,
    topic: &str,
    max_count: Option<usize>,
    timeout: Option<Duration>,
) -> Result<Vec<String>> {
    // Create a node named "listener"
    let node = ctx.create_node("listener").build()?;

    // Create a subscription to the "chatter" topic
    let qos = QosProfile {
        history: QosHistory::KeepLast(10),
        ..Default::default()
    };
    let subscriber = node.create_sub::<RosString>(topic).with_qos(qos).build()?;

    let mut received_messages = Vec::new();
    let start = std::time::Instant::now();

    // Receive messages in a loop
    loop {
        // Check timeout
        if let Some(t) = timeout
            && start.elapsed() > t
        {
            break;
        }

        // Try to receive with a small timeout to allow checking other conditions
        let recv_result = if timeout.is_some() || max_count.is_some() {
            subscriber.recv_timeout(Duration::from_millis(100))
        } else {
            // If no limits, use async_recv
            subscriber.async_recv().await
        };

        match recv_result {
            Ok(msg) => {
                // Log the received message
                println!("I heard: [{}]", msg.data);
                received_messages.push(msg.data.clone());

                // Check if we've received enough messages
                if let Some(max) = max_count
                    && received_messages.len() >= max
                {
                    break;
                }
            }
            Err(_) => {
                // Continue if timeout on recv_timeout
                if timeout.is_some() || max_count.is_some() {
                    continue;
                } else {
                    break;
                }
            }
        }
    }

    Ok(received_messages)
}

Key points:

  • Flexible Reception: Supports timeout-based and indefinite blocking
  • Testable Design: Returns received messages for verification
  • Bounded Operation: Optional max_count and timeout parameters
  • QoS Configuration: Uses KeepLast(10) for message buffering

Running the subscriber:

# Basic usage
cargo run --example demo_nodes_listener

# Custom topic
cargo run --example demo_nodes_listener -- --topic /my_topic

# Receive 5 messages then exit
cargo run --example demo_nodes_listener -- --max-count 5

Complete Pub-Sub Workflow

To see publishers and subscribers in action together, you'll need to start a Zenoh router first:

Terminal 1 - Start Zenoh Router:

cargo run --example zenoh_router

Terminal 2 - Start Subscriber:

cargo run --example demo_nodes_listener

Terminal 3 - Start Publisher:

cargo run --example demo_nodes_talker

Subscriber Patterns

ros-z provides three patterns for receiving messages, each suited for different use cases:

Pattern 1: Blocking Receive (Pull Model)

Best for: Simple sequential processing, scripting

let subscriber = node
    .create_sub::<RosString>("topic_name")
    .build()?;

while let Ok(msg) = subscriber.recv() {
    println!("Received: {}", msg.data);
}

Pattern 2: Async Receive (Pull Model)

Best for: Integration with async codebases, handling multiple streams

let subscriber = node
    .create_sub::<RosString>("topic_name")
    .build()?;

while let Ok(msg) = subscriber.async_recv().await {
    println!("Received: {}", msg.data);
}

Pattern 3: Callback (Push Model)

Best for: Event-driven architectures, low-latency response

let subscriber = node
    .create_sub::<RosString>("topic_name")
    .build_with_callback(|msg| {
        println!("Received: {}", msg.data);
    })?;

// No need to call recv() - callback handles messages automatically
// Your code continues while messages are processed in the background

Tip

Use callbacks for low-latency event-driven processing. Use blocking/async receive when you need explicit control over when messages are processed.

Pattern Comparison

AspectBlocking ReceiveAsync ReceiveCallback
Control FlowSequentialSequentialEvent-driven
LatencyMedium (poll-based)Medium (poll-based)Low (immediate)
MemoryQueue size × messageQueue size × messageNo queue
BackpressureBuilt-in (queue full)Built-in (queue full)None (drops if slow)
Use CaseSimple scriptsAsync applicationsReal-time response

Quality of Service (QoS)

QoS profiles control message delivery behavior:

use ros_z::qos::{QosProfile, QosHistory, Reliability};

let qos = QosProfile {
    history: QosHistory::KeepLast(10),
    reliability: Reliability::Reliable,
    ..Default::default()
};

let publisher = node
    .create_pub::<RosString>("topic")
    .with_qos(qos)
    .build()?;

Tip

Use QosHistory::KeepLast(1) for sensor data and Reliability::Reliable for critical commands. Match QoS profiles between publishers and subscribers for optimal message delivery.

ROS 2 Interoperability

ros-z publishers and subscribers work seamlessly with ROS 2 C++ and Python nodes:

# List active topics
ros2 topic list

# Echo messages from ros-z publisher
ros2 topic echo /chatter

# Publish to ros-z subscriber from ROS 2
ros2 topic pub /chatter std_msgs/msg/String "data: 'Hello from ROS 2'"

# Check topic info
ros2 topic info /chatter

Success

ros-z provides full ROS 2 compatibility via Zenoh bridge or rmw_zenoh, enabling cross-language communication.

Resources

Start with the examples above to understand the basic pub-sub workflow, then explore custom messages for domain-specific communication.