Skip to content

Python Bindings

hiroz provides Python bindings via hiroz-py, enabling Python applications to communicate with Rust and ROS 2 nodes using the same Eclipse Zenoh transport. The bindings use PyO3 for Rust-Python interop and msgspec for efficient message serialization.

Note

Python bindings provide the same pub/sub and service patterns as Rust, with Pythonic APIs. Messages are automatically serialized/deserialized between Python objects and CDR format for ROS 2 compatibility.

Installation

Pre-built wheels are available for Linux (x86_64, aarch64) and macOS (aarch64), Python 3.11+. No Rust toolchain required.

Pick a release version from the Releases page and substitute <version> below (e.g. v0.2.0).

Jazzy / Kilted / Rolling:

pip install https://github.com/ZettaScaleLabs/hiroz/releases/download/<version>/hiroz_msgs_py-<version>-py3-none-any.whl
pip install https://github.com/ZettaScaleLabs/hiroz/releases/download/<version>/hiroz_py-<version>-0jazzy-cp311-abi3-linux_x86_64.whl

Humble:

pip install https://github.com/ZettaScaleLabs/hiroz/releases/download/<version>/hiroz_msgs_py-<version>-py3-none-any.whl
pip install https://github.com/ZettaScaleLabs/hiroz/releases/download/<version>/hiroz_py-<version>-0humble-cp311-abi3-linux_x86_64.whl

Tip

hiroz-msgs-py (message definitions) is a dependency of hiroz-py and is installed automatically when using pip install. The explicit install above is only needed when pinning a specific version or installing from a direct URL.

Local Dry-Run (before releasing)

To verify the wheel build locally before pushing a release tag, use the build script which mirrors the CI release workflow exactly:

# Build jazzy + humble wheels into crates/hiroz-py/dist/
./scripts/build-python-wheels.nu

# Build and immediately install into crates/hiroz-py/.venv
./scripts/build-python-wheels.nu --install jazzy

# Build a single distro only
./scripts/build-python-wheels.nu jazzy

The script produces the same wheel filenames as CI (with the ‑jazzy / ‑humble distro suffix) and prints ready-to-use pip install commands for the local files.

Build from Source

For development or platforms without pre-built wheels:

Prerequisites

  • Python 3.8+
  • Rust 1.85+ — install via rustup: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
  • maturin — pip install maturin

Setup

# Create and activate virtual environment
cd crates/hiroz-py
python -m venv .venv
source .venv/bin/activate

# Install message types
pip install -e ../hiroz-msgs/python/

# Build and install hiroz-py
maturin develop

Tip

Use maturin develop --release for optimized builds when benchmarking or running in production.

Architecture

graph TD
accTitle: Python bindings architecture from Python code to Zenoh network
accDescr: Python code imports hiroz_py which calls through PyO3 into the Rust hiroz core, which uses the Zenoh transport to reach ROS 2 and Rust nodes; Python messages go through msgspec and CDR serialization.
    A[Python Code] -->|import| B[hiroz_py]
    B -->|PyO3| C[Rust hiroz]
    C -->|Zenoh| D[Network]
    D -->|Zenoh| E[ROS 2 / Rust Nodes]
    F[Python Message] -->|msgspec| G[Struct]
    G -->|serialize| H[CDR Bytes]
    H -->|deserialize| I[Rust Struct]

Quick Start

Here's a complete publisher and subscriber example from crates/hiroz-py/examples/topic_demo.py:

Publisher (Talker)

def run_talker(ctx, topic: str, count: int, interval: float):
    """Run the talker (publisher)."""
    node = ctx.create_node("talker").build()
    pub = node.create_publisher(topic, std_msgs.String)

    print(f"Talker started. Publishing to {topic}...")

    i = 0
    while count == 0 or i < count:
        message = f"Hello from Python {i}"
        msg = std_msgs.String(data=message)
        pub.publish(msg)
        print(f"PUB:{i}", flush=True)
        i += 1
        time.sleep(interval)

    print("PUB:DONE", flush=True)

Subscriber (Listener)

def run_listener(ctx, topic: str, timeout: float):
    """Run the listener (subscriber)."""
    node = ctx.create_node("listener").build()
    sub = node.create_subscriber(topic, std_msgs.String)

    print("SUB:READY", flush=True)

    start = time.time()
    received = 0

    while timeout == 0 or (time.time() - start) < timeout:
        msg = sub.recv(timeout=1.0)
        if msg is not None:
            print(f"SUB:{msg.data}", flush=True)
            received += 1

    print(f"SUB:TOTAL:{received}", flush=True)

Key Components

Component Purpose Python API
ZContextBuilder Configure hiroz environment ZContextBuilder().with_domain_id(0).build()
ZContext Manages ROS 2 connections Entry point for creating nodes
Node Logical unit of computation ctx.create_node("name").build()
Publisher Sends messages to topics node.create_publisher(topic, type)
Subscriber Receives messages from topics node.create_subscriber(topic, type)
Client Sends service requests node.create_client(service, type)
Server Handles service requests node.create_server(service, type)

Service Patterns

Examples from crates/hiroz-py/examples/service_demo.py:

Service Server

def run_server(ctx, service: str, max_requests: int):
    """Run the AddTwoInts service server."""
    node = ctx.create_node("add_two_ints_server").build()
    server = node.create_server(service, example_interfaces.AddTwoIntsRequest)

    print("SERVER:READY", flush=True)

    handled = 0
    while max_requests == 0 or handled < max_requests:
        request_id, req = server.take_request()
        result = req.a + req.b
        print(f"SERVER:{req.a}+{req.b}={result}", flush=True)

        resp = example_interfaces.AddTwoIntsResponse(sum=result)
        server.send_response(resp, request_id)
        handled += 1

    print("SERVER:DONE", flush=True)

Service Client

def run_client(ctx, service: str, a: int, b: int, timeout: float):
    """Run the AddTwoInts service client."""
    node = ctx.create_node("add_two_ints_client").build()
    client = node.create_client(service, example_interfaces.AddTwoIntsRequest)

    # Wait for service discovery
    time.sleep(1.0)

    print(f"CLIENT:REQUEST:{a}+{b}", flush=True)

    req = example_interfaces.AddTwoIntsRequest(a=a, b=b)
    try:
        resp = client.call(req, timeout=timeout)
        print(f"CLIENT:RESPONSE:{resp.sum}", flush=True)
    except RuntimeError as e:
        print(f"CLIENT:ERROR:{e}", flush=True)
        sys.exit(1)

Tip

Service servers use a pull model: take_request() blocks until a request arrives. This gives you explicit control over when to process requests.

Action Patterns

Actions extend services with long-running execution, incremental feedback, and cancellation. A goal is sent by the client, accepted or rejected by the server, executed with feedback published at each step, and finally terminated with a result.

Examples from crates/hiroz-py/examples/action_demo.py.

Defining Message Types

Actions require three message structs: Goal, Result, and Feedback. Each must have a __msgtype__ class attribute:

class CountToGoal(msgspec.Struct):
    __msgtype__: ClassVar[str] = "action_demo/msg/CountToGoal"
    target: int = 10


class CountToResult(msgspec.Struct):
    __msgtype__: ClassVar[str] = "action_demo/msg/CountToResult"
    final_count: int = 0


class CountToFeedback(msgspec.Struct):
    __msgtype__: ClassVar[str] = "action_demo/msg/CountToFeedback"
    current: int = 0

Tip

Types from hiroz_msgs_py already have __msgtype__ and a ROS 2 type hash. Inline msgspec.Struct types (as above) use a zero hash and are Python-to-Python only — they are not compatible with rmw_zenoh_cpp typed actions.

Action Server

def run_server(ctx, action: str):
    node = ctx.create_node("count_to_server").build()
    server = node.create_action_server(
        action, CountToGoal, CountToResult, CountToFeedback
    )

    print("SERVER:READY", flush=True)

    while True:
        request = server.recv_goal(timeout=1.0)
        if request is None:
            continue

        goal = request.goal()
        print(f"SERVER:GOAL:{goal.target}", flush=True)
        executing = request.accept_and_execute()

        count = 0
        while count < goal.target:
            time.sleep(0.2)
            count += 1
            executing.publish_feedback(CountToFeedback(current=count))
            print(f"SERVER:FEEDBACK:{count}", flush=True)

            if executing.is_cancel_requested:
                print(f"SERVER:CANCELED:{count}", flush=True)
                executing.canceled(CountToResult(final_count=count))
                break
        else:
            print(f"SERVER:SUCCEEDED:{count}", flush=True)
            executing.succeed(CountToResult(final_count=count))

Server Lifecycle

  1. recv_goal(timeout) — blocks until a goal arrives; returns None on timeout
  2. request.goal() — deserialize the goal payload
  3. request.accept_and_execute()ServerGoalHandle — or request.reject()
  4. handle.publish_feedback(fb) — send incremental progress
  5. handle.is_cancel_requested — poll for client cancellation
  6. handle.succeed(result) / handle.abort(result) / handle.canceled(result) — terminate

Action Client

def run_client(ctx, action: str, target: int, cancel_after: float | None):
    node = ctx.create_node("count_to_client").build()
    client = node.create_action_client(
        action, CountToGoal, CountToResult, CountToFeedback
    )

    # Give server time to advertise
    time.sleep(1.0)

    print(f"CLIENT:SEND_GOAL:{target}", flush=True)
    handle = client.send_goal(CountToGoal(target=target))

    # Schedule cancellation if requested
    if cancel_after is not None:

        def _cancel():
            time.sleep(cancel_after)
            print("CLIENT:CANCEL", flush=True)
            handle.cancel()

        threading.Thread(target=_cancel, daemon=True).start()

    # Drain feedback
    while True:
        fb = handle.recv_feedback(timeout=0.5)
        if fb is None:
            break
        print(f"CLIENT:FEEDBACK:{fb.current}", flush=True)

    result = handle.get_result(timeout=5.0)
    if result is not None:
        print(f"CLIENT:RESULT:{result.final_count}", flush=True)
    else:
        print("CLIENT:ERROR:no result", flush=True)
        sys.exit(1)

Client Lifecycle

  1. client.send_goal(goal)ActionGoalHandle — blocks until accepted (raises on rejection)
  2. handle.recv_feedback(timeout) — receive next feedback; returns None when channel closes
  3. handle.get_result(timeout) — block until terminal state; returns None on timeout
  4. handle.cancel() — request cancellation (the server decides when to honour it)

Goal Status

GoalStatus constants mirror action_msgs/msg/GoalStatus:

Constant Value is_active() is_terminal()
ACCEPTED 1
EXECUTING 2
CANCELING 3
SUCCEEDED 4
CANCELED 5
ABORTED 6
status = hiroz_py.GoalStatus(handle.status)
if status.is_terminal():
    print("Done:", status)

Warning

Python actions use a byte-wrapping wire format that is not compatible with rmw_zenoh_cpp typed actions. Use the Rust ZAction trait for ROS 2 interop.

Complex Messages

Python bindings support nested message types like geometry_msgs/Twist:

from hiroz_py import geometry_msgs

# Create a Twist message with nested Vector3
twist = geometry_msgs.Twist(
    linear=geometry_msgs.Vector3(x=1.0, y=0.0, z=0.0),
    angular=geometry_msgs.Vector3(x=0.0, y=0.0, z=0.5)
)

pub = node.create_publisher("/cmd_vel", geometry_msgs.Twist)
pub.publish(twist)

Context Configuration

Connect to Specific Endpoint

ctx = (
    hiroz_py.ZContextBuilder()
    .with_connect_endpoints(["tcp/192.168.1.100:7447"])
    .build()
)

Disable Multicast Scouting

ctx = (
    hiroz_py.ZContextBuilder()
    .with_domain_id(0)
    .disable_multicast_scouting()
    .build()
)

Custom Namespace

node = ctx.create_node("my_node").with_namespace("/robot1").build()

Performance: Zero-Copy Large Payloads

When working with large byte arrays (sensor data, images, point clouds), hiroz-py minimizes memory copies using a zero-copy optimization for uint8[] and byte[] fields.

ZBufView

When a subscriber receives a message, hiroz exposes byte array fields as a ZBufView — a zero-copy view into the received network buffer. ZBufView implements Python's buffer protocol:

msg = sub.recv(timeout=1.0)
# msg.data is a ZBufView — no copy has occurred

# Zero-copy access via buffer protocol
mv = memoryview(msg.data)
header = mv[:8]  # Slice without copying the entire payload

# Only copies when explicitly converted
data = bytes(msg.data)

Echo Without Copying

For relay and echo patterns, pass ZBufView fields directly to a new message. The derive macro detects ZBufView and extracts the inner buffer via reference counting — no data copy occurs:

# Receive and re-publish — zero-copy for byte array fields
msg = sub.recv(timeout=1.0)
echo = std_msgs.ByteMultiArray(data=msg.data)  # No copy!
pub.publish(echo)

ZBufView API

Method Description
len(view) Number of bytes
bool(view) True if not empty
view[i] Single byte access
view[start:stop] Slice (returns bytes)
memoryview(view) Zero-copy buffer protocol access
bytes(view) Convert to bytes (copies)
view.is_zero_copy Whether the view avoids internal copying

Tip

For best performance with large payloads, avoid calling bytes() on ZBufView fields. Use memoryview() for read access, or pass the ZBufView directly when re-publishing.

How It Works

The optimization operates at three layers:

  1. Deserialization bypass: When the Python subscriber receives a message, hiroz stores the raw network buffer (ZBuf) in a thread-local. During CDR deserialization, byte array fields create sub-views into this buffer instead of copying (ZSlice::subslice()).

  2. Buffer protocol: ZBufView wraps the ZBuf and exposes its bytes to Python via __getbuffer__/__releasebuffer__. For contiguous buffers (the common case), this is a direct pointer — no copy at all.

  3. Pass-through re-publish: The FromPyMessage derive macro recognizes ZBufView inputs and extracts the inner ZBuf via clone(), which only increments a reference count on the underlying memory.

ROS 2 Interoperability

Python nodes interoperate with ROS 2 C++ nodes via the shared Zenoh transport. See the dedicated ROS 2 Interoperability chapter for setup instructions.

Running Tests

# Python unit tests
cd crates/hiroz-py
source .venv/bin/activate
python -m pytest tests/ -v

# Python-Rust interop tests
cargo test --features python-interop -p hiroz-tests --test python_interop -- --test-threads=1

Troubleshooting

Import errors when using hiroz_py

This error occurs when the package hasn't been built or installed correctly. Solution: Rebuild and install the package:

cd crates/hiroz-py
source .venv/bin/activate
pip install -e ../hiroz-msgs/python/
maturin develop
Message type not found

This error occurs when trying to use a message type that isn't supported by the Python bindings. Solution: Check the supported message types by looking at the match arms in crates/hiroz-py/src/node.rs. Currently supported: std_msgs/String, std_msgs/ByteMultiArray, geometry_msgs/Vector3, geometry_msgs/Twist, sensor_msgs/LaserScan, and example_interfaces/AddTwoInts.

Ensure you are passing a message class object (e.g., std_msgs.String), not a string.

recv() always returns None

This happens when no messages arrive within the timeout period. Solution:

  • Check the topic name matches exactly (including leading /)
  • Verify the publisher is running and connected to the same Zenoh network
  • Increase the timeout value
  • Use --nocapture with pytest to see debug output: python -m pytest tests/ -v --capture=no

Resources

Start with the pub/sub example to understand the basics, then explore services for request-response patterns.