Publishers and Subscribers¶
Go users
The code examples in this chapter are Rust. For Go pub/sub patterns, QoS presets, and the typed subscriber API, see the Go Bindings chapter.
ros-z implements ROS 2's publish-subscribe pattern with type-safe, zero-copy messaging over Eclipse Zenoh. This enables efficient, decoupled communication between nodes with minimal overhead.
Note
The pub-sub pattern forms the foundation of ROS 2 communication, allowing nodes to exchange data without direct coupling. ros-z leverages Zenoh's efficient transport layer for optimal performance.
What is Publish-Subscribe?¶
graph LR
P1([Publisher A]) --> T{Topic}
P2([Publisher B]) --> T
T --> S1([Subscriber 1])
T --> S2([Subscriber 2])
T --> S3([Subscriber 3])
style T fill:#3f51b5,color:#fff,stroke:#3f51b5
One topic. Any number of senders and receivers. Neither side knows the other exists.
- Publishers write messages to a named topic
- All current subscribers receive every message
- Adding a new subscriber (logger, visualizer) requires no code changes anywhere
When to use topics¶
| Situation | Use | Why |
|---|---|---|
| Camera feed, lidar scan, IMU | Topic | High-frequency, many consumers |
| Robot position, joint states | Topic | Continuous stream, multiple observers |
| "Add these two numbers" | Service | One-shot, need a result |
| "Drive to (3, 4)" | Action | Long task, need progress + cancel |
| "Set max speed to 2.5" | Parameter | Runtime config, not data stream |
Message flow¶
sequenceDiagram
participant P as Publisher
participant Z as Eclipse Zenoh
participant S1 as Subscriber 1
participant S2 as Subscriber 2
P->>Z: publish("Hello #1")
Z->>S1: deliver
Z->>S2: deliver
P->>Z: publish("Hello #2")
Z->>S1: deliver
Z->>S2: deliver
Note over P,S2: Fire-and-forget. Publisher never blocks waiting for subscribers.
Type safety¶
Every topic has exactly one message type. Mismatches are caught at connection time — not at runtime.
/camera/image → sensor_msgs/Image ✓ all publishers must use this type
/camera/image → sensor_msgs/Compressed ✗ rejected at connection
Quality of Service¶
QoS controls delivery guarantees. Incompatible settings = silent data loss.
| Preset | Reliability | Durability | Use for |
|---|---|---|---|
| Default | Reliable | Volatile | Commands, state |
| Sensor data | Best-effort | Volatile | Camera, lidar (recency > completeness) |
| Transient local | Reliable | Transient-local | Config topics late-joiners must receive |
Warning
Incompatible QoS produces no error — messages silently stop flowing. Run ros2 topic info -v /topic to compare QoS on both ends.
Key Concepts at a Glance¶
Visual Flow¶
graph TD
A[ZContextBuilder] -->|configure| B[ZContext]
B -->|create| C[Node]
C -->|publisher| D[Publisher]
C -->|subscriber| E[Subscriber]
D -->|publish| F[Topic]
F -->|deliver| E
E -->|callback| G[Message Handler]
Message Flow¶
sequenceDiagram
participant P as Publisher (Talker)
participant Z as Eclipse Zenoh
participant S1 as Subscriber 1 (Listener)
participant S2 as Subscriber 2 (Logger)
P->>Z: publish("Hello World #1")
Z->>S1: deliver message
Z->>S2: deliver message
S1-->>P: (no reply — fire and forget)
P->>Z: publish("Hello World #2")
Z->>S1: deliver message
Z->>S2: deliver message
Note over P,S2: Any number of subscribers receive every message.<br/>Publisher never knows who is listening.
Key Features¶
| Feature | Description | Benefit |
|---|---|---|
| Type Safety | Strongly-typed messages using Rust structs | Compile-time error detection |
| Zero-Copy | Efficient message passing via Zenoh | Reduced latency and CPU usage |
| QoS Profiles | Configurable reliability, durability, history | Fine-grained delivery control |
| Async/Blocking | Dual API for both paradigms | Flexible integration patterns |
Publisher Example¶
This example demonstrates publishing "Hello World" messages to a topic. The publisher sends messages periodically, showcasing the fundamental publishing pattern.
/// Talker node that publishes "Hello World" messages to a topic
///
/// # Arguments
/// * `ctx` - The ros-z context
/// * `topic` - The topic name to publish to
/// * `period` - Duration between messages
/// * `max_count` - Optional maximum number of messages to publish. If None, publishes indefinitely.
pub async fn run_talker(
ctx: ZContext,
topic: &str,
period: Duration,
max_count: Option<usize>,
) -> Result<()> {
// Create a node named "talker"
let node = ctx.create_node("talker").build()?;
// Create a publisher with a custom Quality of Service profile
let qos = QosProfile {
history: QosHistory::KeepLast(NonZeroUsize::new(7).unwrap()),
..Default::default()
};
let publisher = node.create_pub::<RosString>(topic).with_qos(qos).build()?;
let mut count = 1;
loop {
// Create the message
let msg = RosString {
data: format!("Hello World: {}", count),
};
// Log the message being published
println!("Publishing: '{}'", msg.data);
// Publish the message (non-blocking)
publisher.async_publish(&msg).await?;
// Check if we've reached the max count
if let Some(max) = max_count
&& count >= max
{
break;
}
// Wait for the next publish cycle
tokio::time::sleep(period).await;
count += 1;
}
Ok(())
}
Key points:
- QoS Configuration: Uses
KeepLast(7)to buffer the last 7 messages - Async Publishing: Non-blocking
async_publish()for efficient I/O. For blocking (non-async) contexts, usepublisher.publish(&msg)?instead. - Rate Control: Uses
tokio::time::sleep()to control publishing frequency - Bounded Operation: Optional
max_countfor testing scenarios
Running the publisher:
# Basic usage
cargo run --example demo_nodes_talker
# Custom topic and rate
cargo run --example demo_nodes_talker -- --topic /my_topic --period 0.5
Subscriber Example¶
This example demonstrates subscribing to messages from a topic. The subscriber receives and displays messages, showing both timeout-based and async reception patterns.
/// Listener node that subscribes to a topic
///
/// # Arguments
/// * `ctx` - The ros-z context
/// * `topic` - The topic name to subscribe to
/// * `max_count` - Optional maximum number of messages to receive. If None, listens indefinitely.
/// * `timeout` - Optional timeout duration. If None, waits indefinitely.
///
/// # Returns
/// A vector of received messages
pub async fn run_listener(
ctx: ZContext,
topic: &str,
max_count: Option<usize>,
timeout: Option<Duration>,
) -> Result<Vec<String>> {
// Create a node named "listener"
let node = ctx.create_node("listener").build()?;
// Create a subscription to the "chatter" topic
let qos = QosProfile {
history: QosHistory::KeepLast(NonZeroUsize::new(10).unwrap()),
..Default::default()
};
let subscriber = node.create_sub::<RosString>(topic).with_qos(qos).build()?;
let mut received_messages = Vec::new();
let start = std::time::Instant::now();
// Receive messages in a loop
loop {
// Check timeout
if let Some(t) = timeout
&& start.elapsed() > t
{
break;
}
// Try to receive with a small timeout to allow checking other conditions
let recv_result = if timeout.is_some() || max_count.is_some() {
subscriber.recv_timeout(Duration::from_millis(100))
} else {
// If no limits, use async_recv
subscriber.async_recv().await
};
match recv_result {
Ok(msg) => {
// Log the received message
println!("I heard: [{}]", msg.data);
received_messages.push(msg.data.clone());
// Check if we've received enough messages
if let Some(max) = max_count
&& received_messages.len() >= max
{
break;
}
}
Err(_) => {
// Continue if timeout on recv_timeout
if timeout.is_some() || max_count.is_some() {
continue;
} else {
break;
}
}
}
}
Ok(received_messages)
}
Key points:
- Flexible Reception: Supports timeout-based and indefinite blocking
- Testable Design: Returns received messages for verification
- Bounded Operation: Optional
max_countandtimeoutparameters - QoS Configuration: Uses
KeepLast(10)for message buffering
Running the subscriber:
# Basic usage
cargo run --example demo_nodes_listener
# Custom topic
cargo run --example demo_nodes_listener -- --topic /my_topic
Complete Pub-Sub Workflow¶
To see publishers and subscribers in action together, you'll need to start a Zenoh router first:
Terminal 1 - Start Zenoh Router:
Terminal 2 - Start Subscriber:
Terminal 3 - Start Publisher:
Subscriber Patterns¶
ros-z provides three patterns for receiving messages, each suited for different use cases:
Tip
use ros_z::Builder; must be in scope to call .build() on any ros-z builder type. Add it alongside your other ros-z imports.
Pattern 1: Blocking Receive (Pull Model)¶
Best for: Simple sequential processing, scripting
use ros_z::Builder; // required to call .build()
let subscriber = node
.create_sub::<RosString>("topic_name")
.build()?;
while let Ok(msg) = subscriber.recv() {
println!("Received: {}", msg.data);
}
Pattern 2: Async Receive (Pull Model)¶
Best for: Integration with async codebases, handling multiple streams
use ros_z::Builder; // required to call .build()
let subscriber = node
.create_sub::<RosString>("topic_name")
.build()?;
while let Ok(msg) = subscriber.async_recv().await {
println!("Received: {}", msg.data);
}
Pattern 3: Callback (Push Model)¶
Best for: Event-driven architectures, low-latency response
use ros_z::Builder; // required to call .build_with_callback()
let subscriber = node
.create_sub::<RosString>("topic_name")
.build_with_callback(|msg| {
println!("Received: {}", msg.data);
})?;
// No need to call recv() - callback handles messages automatically
// Your code continues while messages are processed in the background
Tip
Use callbacks for low-latency event-driven processing. Use blocking/async receive when you need explicit control over when you process messages.
Pattern Comparison¶
| Aspect | Blocking Receive | Async Receive | Callback |
|---|---|---|---|
| Control Flow | Sequential | Sequential | Event-driven |
| Latency | Medium (poll-based) | Medium (poll-based) | Low (immediate) |
| Memory | Queue size × message | Queue size × message | No queue |
| Backpressure | Built-in (queue full) | Built-in (queue full) | None (drops if slow) |
| Use Case | Simple scripts | Async applications | Real-time response |
Quality of Service (QoS)¶
QoS profiles control message delivery behavior. Both publishers and subscribers accept a QoS profile:
Publisher QoS:
use std::num::NonZeroUsize;
use ros_z::Builder;
use ros_z::qos::{QosProfile, QosHistory, QosReliability};
let qos = QosProfile {
history: QosHistory::KeepLast(NonZeroUsize::new(10).unwrap()),
reliability: QosReliability::Reliable,
..Default::default()
};
let publisher = node
.create_pub::<RosString>("topic")
.with_qos(qos)
.build()?;
Subscriber QoS:
use std::num::NonZeroUsize;
use ros_z::Builder;
use ros_z::qos::{QosProfile, QosHistory, QosReliability};
let qos = QosProfile {
history: QosHistory::KeepLast(NonZeroUsize::new(10).unwrap()),
reliability: QosReliability::Reliable,
..Default::default()
};
let subscriber = node
.create_sub::<RosString>("topic")
.with_qos(qos)
.build()?;
Tip
Use QosHistory::KeepLast(NonZeroUsize::new(1).unwrap()) for sensor data and QosReliability::Reliable for critical commands. Match QoS profiles between publishers and subscribers for optimal message delivery.
Name Remapping¶
ros-z supports ROS 2-style topic remapping via ZContextBuilder::with_remap_rule(). Remapping rules apply to all nodes created from the same context and redirect topic/service names at the context level.
# fn main() -> zenoh::Result<()> {
use ros_z::context::ZContextBuilder;
use ros_z::Builder;
let ctx = ZContextBuilder::default()
.with_remap_rule("/chatter:=/my_chatter")? // redirect /chatter to /my_chatter
.with_remap_rule("__node:=renamed_node")? // rename the node
.build()?;
# Ok(())
# }
Add multiple rules with .with_remap_rules():
# fn main() -> zenoh::Result<()> {
use ros_z::context::ZContextBuilder;
use ros_z::Builder;
let ctx = ZContextBuilder::default()
.with_remap_rules(["/input:=/sensor/data", "/output:=/processed/data"])?
.build()?;
# Ok(())
# }
The rule format follows the ROS 2 convention: from:=to.
ROS 2 Interoperability¶
ros-z publishers and subscribers interoperate with ROS 2 C++ and Python nodes via the shared Zenoh transport. See the dedicated ROS 2 Interoperability chapter for setup instructions covering Rust, Python, and Go.
Resources¶
- Custom Messages - Defining and using custom message types
- Message Generation - Generating Rust types from ROS 2 messages
- Quick Start - Getting started guide
Start with the examples above to understand the basic pub-sub workflow, then explore custom messages for domain-specific communication.