Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python Bindings

ros-z provides Python bindings via ros-z-py, enabling Python applications to communicate with Rust and ROS 2 nodes using the same Zenoh transport. The bindings use PyO3 for Rust-Python interop and msgspec for efficient message serialization.

Note

Python bindings provide the same pub/sub and service patterns as Rust, with Pythonic APIs. Messages are automatically serialized/deserialized between Python objects and CDR format for ROS 2 compatibility.

Visual Flow

graph TD
    A[Python Code] -->|import| B[ros_z_py]
    B -->|PyO3| C[Rust ros-z]
    C -->|Zenoh| D[Network]
    D -->|Zenoh| E[ROS 2 / Rust Nodes]

    F[Python Message] -->|msgspec| G[Struct]
    G -->|serialize| H[CDR Bytes]
    H -->|deserialize| I[Rust Struct]

Installation

Prerequisites

  • Python 3.8+
  • Rust toolchain
  • maturin (pip install maturin)

Setup

# Create and activate virtual environment
cd ros-z-py
python -m venv .venv
source .venv/bin/activate

# Install message types
pip install -e ../crates/ros-z-msgs/python/

# Build and install ros-z-py
maturin develop

Tip

Use maturin develop --release for optimized builds when benchmarking or running in production.

Quick Start

Here's a complete publisher and subscriber example from ros-z-py/examples/topic_demo.py:

Publisher (Talker)

def run_talker(ctx, topic: str, count: int, interval: float):
    """Run the talker (publisher)."""
    node = ctx.create_node("talker").build()
    pub = node.create_publisher(topic, std_msgs.String)

    print(f"Talker started. Publishing to {topic}...")

    i = 0
    while count == 0 or i < count:
        message = f"Hello from Python {i}"
        msg = std_msgs.String(data=message)
        pub.publish(msg)
        print(f"PUB:{i}", flush=True)
        i += 1
        time.sleep(interval)

    print("PUB:DONE", flush=True)


Subscriber (Listener)

def run_listener(ctx, topic: str, timeout: float):
    """Run the listener (subscriber)."""
    node = ctx.create_node("listener").build()
    sub = node.create_subscriber(topic, std_msgs.String)

    print("SUB:READY", flush=True)

    start = time.time()
    received = 0

    while timeout == 0 or (time.time() - start) < timeout:
        msg = sub.recv(timeout=1.0)
        if msg is not None:
            print(f"SUB:{msg.data}", flush=True)
            received += 1

    print(f"SUB:TOTAL:{received}", flush=True)


Key Components

ComponentPurposePython API
ZContextBuilderConfigure ros-z environmentZContextBuilder().with_domain_id(0).build()
ZContextManages ROS 2 connectionsEntry point for creating nodes
NodeLogical unit of computationctx.create_node("name").build()
PublisherSends messages to topicsnode.create_publisher(topic, type)
SubscriberReceives messages from topicsnode.create_subscriber(topic, type)
ClientSends service requestsnode.create_client(service, type)
ServerHandles service requestsnode.create_server(service, type)

Service Patterns

Examples from ros-z-py/examples/service_demo.py:

Service Server

def run_server(ctx, service: str, max_requests: int):
    """Run the AddTwoInts service server."""
    node = ctx.create_node("add_two_ints_server").build()
    server = node.create_server(service, example_interfaces.AddTwoIntsRequest)

    print("SERVER:READY", flush=True)

    handled = 0
    while max_requests == 0 or handled < max_requests:
        request_id, req = server.take_request()
        result = req.a + req.b
        print(f"SERVER:{req.a}+{req.b}={result}", flush=True)

        resp = example_interfaces.AddTwoIntsResponse(sum=result)
        server.send_response(resp, request_id)
        handled += 1

    print("SERVER:DONE", flush=True)


Service Client

def run_client(ctx, service: str, a: int, b: int, timeout: float):
    """Run the AddTwoInts service client."""
    node = ctx.create_node("add_two_ints_client").build()
    client = node.create_client(service, example_interfaces.AddTwoIntsRequest)

    # Wait for service discovery
    time.sleep(1.0)

    print(f"CLIENT:REQUEST:{a}+{b}", flush=True)

    req = example_interfaces.AddTwoIntsRequest(a=a, b=b)
    client.send_request(req)

    resp = client.take_response(timeout=timeout)

    if resp is not None:
        print(f"CLIENT:RESPONSE:{resp.sum}", flush=True)
    else:
        print("CLIENT:ERROR:no response", flush=True)
        sys.exit(1)


Tip

Service servers use a pull model: take_request() blocks until a request arrives. This gives you explicit control over when to process requests.

Complex Messages

Python bindings support nested message types like geometry_msgs/Twist:

from ros_z_py import geometry_msgs

# Create a Twist message with nested Vector3
twist = geometry_msgs.Twist(
    linear=geometry_msgs.Vector3(x=1.0, y=0.0, z=0.0),
    angular=geometry_msgs.Vector3(x=0.0, y=0.0, z=0.5)
)

pub = node.create_publisher("/cmd_vel", "geometry_msgs/msg/Twist")
pub.publish(twist)

Context Configuration

Connect to Specific Endpoint

ctx = (
    ros_z_py.ZContextBuilder()
    .with_connect_endpoints(["tcp/192.168.1.100:7447"])
    .build()
)

Disable Multicast Scouting

ctx = (
    ros_z_py.ZContextBuilder()
    .with_domain_id(0)
    .disable_multicast_scouting()
    .build()
)

Custom Namespace

node = ctx.create_node("my_node").with_namespace("/robot1").build()

Performance: Zero-Copy Large Payloads

When working with large byte arrays (sensor data, images, point clouds), ros-z-py minimizes memory copies using a zero-copy optimization for uint8[] and byte[] fields.

ZBufView

When a subscriber receives a message, byte array fields are exposed as a ZBufView — a zero-copy view into the received network buffer. ZBufView implements Python's buffer protocol:

msg = sub.recv(timeout=1.0)
# msg.data is a ZBufView — no copy has occurred

# Zero-copy access via buffer protocol
mv = memoryview(msg.data)
header = mv[:8]  # Slice without copying the entire payload

# Only copies when explicitly converted
data = bytes(msg.data)

Echo Without Copying

For relay and echo patterns, pass ZBufView fields directly to a new message. The derive macro detects ZBufView and extracts the inner buffer via reference counting — no data copy occurs:

# Receive and re-publish — zero-copy for byte array fields
msg = sub.recv(timeout=1.0)
echo = std_msgs.ByteMultiArray(data=msg.data)  # No copy!
pub.publish(echo)

ZBufView API

MethodDescription
len(view)Number of bytes
bool(view)True if not empty
view[i]Single byte access
view[start:stop]Slice (returns bytes)
memoryview(view)Zero-copy buffer protocol access
bytes(view)Convert to bytes (copies)
view.is_zero_copyWhether the view avoids internal copying

Tip

For best performance with large payloads, avoid calling bytes() on ZBufView fields. Use memoryview() for read access, or pass the ZBufView directly when re-publishing.

How It Works

The optimization operates at three layers:

  1. Deserialization bypass: When the Python subscriber receives a message, the raw network buffer (ZBuf) is stored in a thread-local. During CDR deserialization, byte array fields create sub-views into this buffer instead of copying (ZSlice::subslice()).

  2. Buffer protocol: ZBufView wraps the ZBuf and exposes its bytes to Python via __getbuffer__/__releasebuffer__. For contiguous buffers (the common case), this is a direct pointer — no copy at all.

  3. Pass-through re-publish: The FromPyMessage derive macro recognizes ZBufView inputs and extracts the inner ZBuf via clone(), which only increments a reference count on the underlying memory.

ROS 2 Interoperability

Python nodes work seamlessly with Rust and ROS 2 nodes:

# Terminal 1: Start Zenoh router
cargo run --example zenoh_router

# Terminal 2: Run Python publisher (using topic_demo.py)
cd ros-z-py
source .venv/bin/activate
python examples/topic_demo.py -r talker

# Terminal 3: Listen with ROS 2 CLI
ros2 topic echo /chatter std_msgs/msg/String

Success

Messages published from Python are received by ROS 2 CLI, Rust nodes, and any other ROS 2-compatible system connected via Zenoh.

Running Tests

# Python unit tests
cd ros-z-py
source .venv/bin/activate
python -m pytest tests/ -v

# Python-Rust interop tests
cargo test --features python-interop -p ros-z-tests --test python_interop -- --test-threads=1

Troubleshooting

Import errors when using ros_z_py

This error occurs when the package hasn't been built or installed correctly.

Solution:

Rebuild and install the package:

cd ros-z-py
source .venv/bin/activate
pip install -e ../crates/ros-z-msgs/python/
maturin develop

Message type not found

This error occurs when trying to use a message type that isn't registered.

Solution:

Check which message types are available:

from ros_z_py import ros_z_msgs
print(ros_z_msgs.list_registered_types())

Ensure the message package is enabled in ros-z-msgs/Cargo.toml features.

recv() always returns None

This happens when no messages are being received within the timeout period.

Solution:

  • Check the topic name matches exactly (including leading /)
  • Verify the publisher is running and connected to the same Zenoh network
  • Increase the timeout value
  • Use --nocapture with pytest to see debug output: python -m pytest tests/ -v --capture=no

Resources

Start with the pub/sub example to understand the basics, then explore services for request-response patterns.