Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python Bindings

ros-z provides Python bindings via ros-z-py, enabling Python applications to communicate with Rust and ROS 2 nodes using the same Zenoh transport. The bindings use PyO3 for Rust-Python interop and msgspec for efficient message serialization.

Note

Python bindings provide the same pub/sub and service patterns as Rust, with Pythonic APIs. Messages are automatically serialized/deserialized between Python objects and CDR format for ROS 2 compatibility.

Visual Flow

graph TD
    A[Python Code] -->|import| B[ros_z_py]
    B -->|PyO3| C[Rust ros-z]
    C -->|Zenoh| D[Network]
    D -->|Zenoh| E[ROS 2 / Rust Nodes]

    F[Python Message] -->|msgspec| G[Struct]
    G -->|serialize| H[CDR Bytes]
    H -->|deserialize| I[Rust Struct]

Installation

Prerequisites

  • Python 3.8+
  • Rust 1.85+ — install via rustup: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
  • maturin — pip install maturin

Using Nix

If you use Nix, run nix develop in the repo root. It provides Rust, maturin, and all build tools automatically — skip the manual install steps above.

Setup

# Create and activate virtual environment
cd crates/ros-z-py
python -m venv .venv
source .venv/bin/activate

# Install message types
pip install -e ../ros-z-msgs/python/

# Build and install ros-z-py
maturin develop

Tip

Use maturin develop --release for optimized builds when benchmarking or running in production.

Quick Start

Here's a complete publisher and subscriber example from crates/ros-z-py/examples/topic_demo.py:

Publisher (Talker)

def run_talker(ctx, topic: str, count: int, interval: float):
    """Run the talker (publisher)."""
    node = ctx.create_node("talker").build()
    pub = node.create_publisher(topic, std_msgs.String)

    print(f"Talker started. Publishing to {topic}...")

    i = 0
    while count == 0 or i < count:
        message = f"Hello from Python {i}"
        msg = std_msgs.String(data=message)
        pub.publish(msg)
        print(f"PUB:{i}", flush=True)
        i += 1
        time.sleep(interval)

    print("PUB:DONE", flush=True)


Subscriber (Listener)

def run_listener(ctx, topic: str, timeout: float):
    """Run the listener (subscriber)."""
    node = ctx.create_node("listener").build()
    sub = node.create_subscriber(topic, std_msgs.String)

    print("SUB:READY", flush=True)

    start = time.time()
    received = 0

    while timeout == 0 or (time.time() - start) < timeout:
        msg = sub.recv(timeout=1.0)
        if msg is not None:
            print(f"SUB:{msg.data}", flush=True)
            received += 1

    print(f"SUB:TOTAL:{received}", flush=True)


Key Components

ComponentPurposePython API
ZContextBuilderConfigure ros-z environmentZContextBuilder().with_domain_id(0).build()
ZContextManages ROS 2 connectionsEntry point for creating nodes
NodeLogical unit of computationctx.create_node("name").build()
PublisherSends messages to topicsnode.create_publisher(topic, type)
SubscriberReceives messages from topicsnode.create_subscriber(topic, type)
ClientSends service requestsnode.create_client(service, type)
ServerHandles service requestsnode.create_server(service, type)

Service Patterns

Examples from crates/ros-z-py/examples/service_demo.py:

Service Server

def run_server(ctx, service: str, max_requests: int):
    """Run the AddTwoInts service server."""
    node = ctx.create_node("add_two_ints_server").build()
    server = node.create_server(service, example_interfaces.AddTwoIntsRequest)

    print("SERVER:READY", flush=True)

    handled = 0
    while max_requests == 0 or handled < max_requests:
        request_id, req = server.take_request()
        result = req.a + req.b
        print(f"SERVER:{req.a}+{req.b}={result}", flush=True)

        resp = example_interfaces.AddTwoIntsResponse(sum=result)
        server.send_response(resp, request_id)
        handled += 1

    print("SERVER:DONE", flush=True)


Service Client

def run_client(ctx, service: str, a: int, b: int, timeout: float):
    """Run the AddTwoInts service client."""
    node = ctx.create_node("add_two_ints_client").build()
    client = node.create_client(service, example_interfaces.AddTwoIntsRequest)

    # Wait for service discovery
    time.sleep(1.0)

    print(f"CLIENT:REQUEST:{a}+{b}", flush=True)

    req = example_interfaces.AddTwoIntsRequest(a=a, b=b)
    client.send_request(req)

    resp = client.take_response(timeout=timeout)

    if resp is not None:
        print(f"CLIENT:RESPONSE:{resp.sum}", flush=True)
    else:
        print("CLIENT:ERROR:no response", flush=True)
        sys.exit(1)


Tip

Service servers use a pull model: take_request() blocks until a request arrives. This gives you explicit control over when to process requests.

Action Patterns

Actions extend services with long-running execution, incremental feedback, and cancellation. A goal is sent by the client, accepted or rejected by the server, executed with feedback published at each step, and finally terminated with a result.

Examples from crates/ros-z-py/examples/action_demo.py.

Defining Message Types

Actions require three message structs: Goal, Result, and Feedback. Each must have a __msgtype__ class attribute:

class CountToGoal(msgspec.Struct):
    __msgtype__: ClassVar[str] = "action_demo/msg/CountToGoal"
    target: int = 10


class CountToResult(msgspec.Struct):
    __msgtype__: ClassVar[str] = "action_demo/msg/CountToResult"
    final_count: int = 0


class CountToFeedback(msgspec.Struct):
    __msgtype__: ClassVar[str] = "action_demo/msg/CountToFeedback"
    current: int = 0


Tip

Types from ros_z_msgs_py already have __msgtype__ and a ROS 2 type hash. Inline msgspec.Struct types (as above) use a zero hash and are Python-to-Python only — they are not compatible with rmw_zenoh_cpp typed actions.

Action Server

def run_server(ctx, action: str):
    node = ctx.create_node("count_to_server").build()
    server = node.create_action_server(
        action, CountToGoal, CountToResult, CountToFeedback
    )

    print("SERVER:READY", flush=True)

    while True:
        request = server.recv_goal(timeout=1.0)
        if request is None:
            continue

        goal = request.goal()
        print(f"SERVER:GOAL:{goal.target}", flush=True)
        executing = request.accept_and_execute()

        count = 0
        while count < goal.target:
            time.sleep(0.2)
            count += 1
            executing.publish_feedback(CountToFeedback(current=count))
            print(f"SERVER:FEEDBACK:{count}", flush=True)

            if executing.is_cancel_requested:
                print(f"SERVER:CANCELED:{count}", flush=True)
                executing.canceled(CountToResult(final_count=count))
                break
        else:
            print(f"SERVER:SUCCEEDED:{count}", flush=True)
            executing.succeed(CountToResult(final_count=count))


Server Lifecycle

  1. recv_goal(timeout) — blocks until a goal arrives; returns None on timeout
  2. request.goal() — deserialize the goal payload
  3. request.accept_and_execute()ServerGoalHandle — or request.reject()
  4. handle.publish_feedback(fb) — send incremental progress
  5. handle.is_cancel_requested — poll for client cancellation
  6. handle.succeed(result) / handle.abort(result) / handle.canceled(result) — terminate

Action Client

def run_client(ctx, action: str, target: int, cancel_after: float | None):
    node = ctx.create_node("count_to_client").build()
    client = node.create_action_client(
        action, CountToGoal, CountToResult, CountToFeedback
    )

    # Give server time to advertise
    time.sleep(1.0)

    print(f"CLIENT:SEND_GOAL:{target}", flush=True)
    handle = client.send_goal(CountToGoal(target=target))

    # Schedule cancellation if requested
    if cancel_after is not None:

        def _cancel():
            time.sleep(cancel_after)
            print("CLIENT:CANCEL", flush=True)
            handle.cancel()

        threading.Thread(target=_cancel, daemon=True).start()

    # Drain feedback
    while True:
        fb = handle.recv_feedback(timeout=0.5)
        if fb is None:
            break
        print(f"CLIENT:FEEDBACK:{fb.current}", flush=True)

    result = handle.get_result(timeout=5.0)
    if result is not None:
        print(f"CLIENT:RESULT:{result.final_count}", flush=True)
    else:
        print("CLIENT:ERROR:no result", flush=True)
        sys.exit(1)


Client Lifecycle

  1. client.send_goal(goal)ActionGoalHandle — blocks until accepted (raises on rejection)
  2. handle.recv_feedback(timeout) — receive next feedback; returns None when channel closes
  3. handle.get_result(timeout) — block until terminal state; returns None on timeout
  4. handle.cancel() — request cancellation (the server decides when to honour it)

Goal Status

GoalStatus constants mirror action_msgs/msg/GoalStatus:

ConstantValueis_active()is_terminal()
ACCEPTED1
EXECUTING2
CANCELING3
SUCCEEDED4
CANCELED5
ABORTED6
status = ros_z_py.GoalStatus(handle.status)
if status.is_terminal():
    print("Done:", status)

Warning

Python actions use a byte-wrapping wire format that is not compatible with rmw_zenoh_cpp typed actions. Use the Rust ZAction trait for ROS 2 interop.

Complex Messages

Python bindings support nested message types like geometry_msgs/Twist:

from ros_z_py import geometry_msgs

# Create a Twist message with nested Vector3
twist = geometry_msgs.Twist(
    linear=geometry_msgs.Vector3(x=1.0, y=0.0, z=0.0),
    angular=geometry_msgs.Vector3(x=0.0, y=0.0, z=0.5)
)

pub = node.create_publisher("/cmd_vel", geometry_msgs.Twist)
pub.publish(twist)

Context Configuration

Connect to Specific Endpoint

ctx = (
    ros_z_py.ZContextBuilder()
    .with_connect_endpoints(["tcp/192.168.1.100:7447"])
    .build()
)

Disable Multicast Scouting

ctx = (
    ros_z_py.ZContextBuilder()
    .with_domain_id(0)
    .disable_multicast_scouting()
    .build()
)

Custom Namespace

node = ctx.create_node("my_node").with_namespace("/robot1").build()

Performance: Zero-Copy Large Payloads

When working with large byte arrays (sensor data, images, point clouds), ros-z-py minimizes memory copies using a zero-copy optimization for uint8[] and byte[] fields.

ZBufView

When a subscriber receives a message, byte array fields are exposed as a ZBufView — a zero-copy view into the received network buffer. ZBufView implements Python's buffer protocol:

msg = sub.recv(timeout=1.0)
# msg.data is a ZBufView — no copy has occurred

# Zero-copy access via buffer protocol
mv = memoryview(msg.data)
header = mv[:8]  # Slice without copying the entire payload

# Only copies when explicitly converted
data = bytes(msg.data)

Echo Without Copying

For relay and echo patterns, pass ZBufView fields directly to a new message. The derive macro detects ZBufView and extracts the inner buffer via reference counting — no data copy occurs:

# Receive and re-publish — zero-copy for byte array fields
msg = sub.recv(timeout=1.0)
echo = std_msgs.ByteMultiArray(data=msg.data)  # No copy!
pub.publish(echo)

ZBufView API

MethodDescription
len(view)Number of bytes
bool(view)True if not empty
view[i]Single byte access
view[start:stop]Slice (returns bytes)
memoryview(view)Zero-copy buffer protocol access
bytes(view)Convert to bytes (copies)
view.is_zero_copyWhether the view avoids internal copying

Tip

For best performance with large payloads, avoid calling bytes() on ZBufView fields. Use memoryview() for read access, or pass the ZBufView directly when re-publishing.

How It Works

The optimization operates at three layers:

  1. Deserialization bypass: When the Python subscriber receives a message, the raw network buffer (ZBuf) is stored in a thread-local. During CDR deserialization, byte array fields create sub-views into this buffer instead of copying (ZSlice::subslice()).

  2. Buffer protocol: ZBufView wraps the ZBuf and exposes its bytes to Python via __getbuffer__/__releasebuffer__. For contiguous buffers (the common case), this is a direct pointer — no copy at all.

  3. Pass-through re-publish: The FromPyMessage derive macro recognizes ZBufView inputs and extracts the inner ZBuf via clone(), which only increments a reference count on the underlying memory.

ROS 2 Interoperability

Python nodes interoperate with ROS 2 C++ nodes via the shared Zenoh transport. See the dedicated ROS 2 Interoperability chapter for setup instructions.

Running Tests

# Python unit tests
cd crates/ros-z-py
source .venv/bin/activate
python -m pytest tests/ -v

# Python-Rust interop tests
cargo test --features python-interop -p ros-z-tests --test python_interop -- --test-threads=1

Troubleshooting

Import errors when using ros_z_py

This error occurs when the package hasn't been built or installed correctly.

Solution:

Rebuild and install the package:

cd crates/ros-z-py
source .venv/bin/activate
pip install -e ../ros-z-msgs/python/
maturin develop

Message type not found

This error occurs when trying to use a message type that isn't supported by the Python bindings.

Solution:

Check the supported message types by looking at the match arms in crates/ros-z-py/src/node.rs. Currently supported: std_msgs/String, std_msgs/ByteMultiArray, geometry_msgs/Vector3, geometry_msgs/Twist, sensor_msgs/LaserScan, and example_interfaces/AddTwoInts.

Ensure you are passing a message class object (e.g., std_msgs.String), not a string.

recv() always returns None

This happens when no messages are being received within the timeout period.

Solution:

  • Check the topic name matches exactly (including leading /)
  • Verify the publisher is running and connected to the same Zenoh network
  • Increase the timeout value
  • Use --nocapture with pytest to see debug output: python -m pytest tests/ -v --capture=no

Resources

Start with the pub/sub example to understand the basics, then explore services for request-response patterns.