Python Bindings
ros-z provides Python bindings via ros-z-py, enabling Python applications to communicate with Rust and ROS 2 nodes using the same Zenoh transport. The bindings use PyO3 for Rust-Python interop and msgspec for efficient message serialization.
Python bindings provide the same pub/sub and service patterns as Rust, with Pythonic APIs. Messages are automatically serialized/deserialized between Python objects and CDR format for ROS 2 compatibility.
Visual Flow
graph TD
A[Python Code] -->|import| B[ros_z_py]
B -->|PyO3| C[Rust ros-z]
C -->|Zenoh| D[Network]
D -->|Zenoh| E[ROS 2 / Rust Nodes]
F[Python Message] -->|msgspec| G[Struct]
G -->|serialize| H[CDR Bytes]
H -->|deserialize| I[Rust Struct]
Installation
Prerequisites
- Python 3.8+
- Rust 1.85+ — install via rustup:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh - maturin —
pip install maturin
If you use Nix, run nix develop in the repo root. It provides Rust, maturin, and all build tools automatically — skip the manual install steps above.
Setup
# Create and activate virtual environment
cd crates/ros-z-py
python -m venv .venv
source .venv/bin/activate
# Install message types
pip install -e ../ros-z-msgs/python/
# Build and install ros-z-py
maturin develop
Quick Start
Here's a complete publisher and subscriber example from crates/ros-z-py/examples/topic_demo.py:
Publisher (Talker)
def run_talker(ctx, topic: str, count: int, interval: float):
"""Run the talker (publisher)."""
node = ctx.create_node("talker").build()
pub = node.create_publisher(topic, std_msgs.String)
print(f"Talker started. Publishing to {topic}...")
i = 0
while count == 0 or i < count:
message = f"Hello from Python {i}"
msg = std_msgs.String(data=message)
pub.publish(msg)
print(f"PUB:{i}", flush=True)
i += 1
time.sleep(interval)
print("PUB:DONE", flush=True)
Subscriber (Listener)
def run_listener(ctx, topic: str, timeout: float):
"""Run the listener (subscriber)."""
node = ctx.create_node("listener").build()
sub = node.create_subscriber(topic, std_msgs.String)
print("SUB:READY", flush=True)
start = time.time()
received = 0
while timeout == 0 or (time.time() - start) < timeout:
msg = sub.recv(timeout=1.0)
if msg is not None:
print(f"SUB:{msg.data}", flush=True)
received += 1
print(f"SUB:TOTAL:{received}", flush=True)
Key Components
| Component | Purpose | Python API |
|---|---|---|
| ZContextBuilder | Configure ros-z environment | ZContextBuilder().with_domain_id(0).build() |
| ZContext | Manages ROS 2 connections | Entry point for creating nodes |
| Node | Logical unit of computation | ctx.create_node("name").build() |
| Publisher | Sends messages to topics | node.create_publisher(topic, type) |
| Subscriber | Receives messages from topics | node.create_subscriber(topic, type) |
| Client | Sends service requests | node.create_client(service, type) |
| Server | Handles service requests | node.create_server(service, type) |
Service Patterns
Examples from crates/ros-z-py/examples/service_demo.py:
Service Server
def run_server(ctx, service: str, max_requests: int):
"""Run the AddTwoInts service server."""
node = ctx.create_node("add_two_ints_server").build()
server = node.create_server(service, example_interfaces.AddTwoIntsRequest)
print("SERVER:READY", flush=True)
handled = 0
while max_requests == 0 or handled < max_requests:
request_id, req = server.take_request()
result = req.a + req.b
print(f"SERVER:{req.a}+{req.b}={result}", flush=True)
resp = example_interfaces.AddTwoIntsResponse(sum=result)
server.send_response(resp, request_id)
handled += 1
print("SERVER:DONE", flush=True)
Service Client
def run_client(ctx, service: str, a: int, b: int, timeout: float):
"""Run the AddTwoInts service client."""
node = ctx.create_node("add_two_ints_client").build()
client = node.create_client(service, example_interfaces.AddTwoIntsRequest)
# Wait for service discovery
time.sleep(1.0)
print(f"CLIENT:REQUEST:{a}+{b}", flush=True)
req = example_interfaces.AddTwoIntsRequest(a=a, b=b)
client.send_request(req)
resp = client.take_response(timeout=timeout)
if resp is not None:
print(f"CLIENT:RESPONSE:{resp.sum}", flush=True)
else:
print("CLIENT:ERROR:no response", flush=True)
sys.exit(1)
Service servers use a pull model: take_request() blocks until a request arrives. This gives you explicit control over when to process requests.
Action Patterns
Actions extend services with long-running execution, incremental feedback, and cancellation. A goal is sent by the client, accepted or rejected by the server, executed with feedback published at each step, and finally terminated with a result.
Examples from crates/ros-z-py/examples/action_demo.py.
Defining Message Types
Actions require three message structs: Goal, Result, and Feedback.
Each must have a __msgtype__ class attribute:
class CountToGoal(msgspec.Struct):
__msgtype__: ClassVar[str] = "action_demo/msg/CountToGoal"
target: int = 10
class CountToResult(msgspec.Struct):
__msgtype__: ClassVar[str] = "action_demo/msg/CountToResult"
final_count: int = 0
class CountToFeedback(msgspec.Struct):
__msgtype__: ClassVar[str] = "action_demo/msg/CountToFeedback"
current: int = 0
Types from ros_z_msgs_py already have __msgtype__ and a ROS 2 type hash. Inline
msgspec.Struct types (as above) use a zero hash and are Python-to-Python only —
they are not compatible with rmw_zenoh_cpp typed actions.
Action Server
def run_server(ctx, action: str):
node = ctx.create_node("count_to_server").build()
server = node.create_action_server(
action, CountToGoal, CountToResult, CountToFeedback
)
print("SERVER:READY", flush=True)
while True:
request = server.recv_goal(timeout=1.0)
if request is None:
continue
goal = request.goal()
print(f"SERVER:GOAL:{goal.target}", flush=True)
executing = request.accept_and_execute()
count = 0
while count < goal.target:
time.sleep(0.2)
count += 1
executing.publish_feedback(CountToFeedback(current=count))
print(f"SERVER:FEEDBACK:{count}", flush=True)
if executing.is_cancel_requested:
print(f"SERVER:CANCELED:{count}", flush=True)
executing.canceled(CountToResult(final_count=count))
break
else:
print(f"SERVER:SUCCEEDED:{count}", flush=True)
executing.succeed(CountToResult(final_count=count))
Server Lifecycle
recv_goal(timeout)— blocks until a goal arrives; returnsNoneon timeoutrequest.goal()— deserialize the goal payloadrequest.accept_and_execute()→ServerGoalHandle— orrequest.reject()handle.publish_feedback(fb)— send incremental progresshandle.is_cancel_requested— poll for client cancellationhandle.succeed(result)/handle.abort(result)/handle.canceled(result)— terminate
Action Client
def run_client(ctx, action: str, target: int, cancel_after: float | None):
node = ctx.create_node("count_to_client").build()
client = node.create_action_client(
action, CountToGoal, CountToResult, CountToFeedback
)
# Give server time to advertise
time.sleep(1.0)
print(f"CLIENT:SEND_GOAL:{target}", flush=True)
handle = client.send_goal(CountToGoal(target=target))
# Schedule cancellation if requested
if cancel_after is not None:
def _cancel():
time.sleep(cancel_after)
print("CLIENT:CANCEL", flush=True)
handle.cancel()
threading.Thread(target=_cancel, daemon=True).start()
# Drain feedback
while True:
fb = handle.recv_feedback(timeout=0.5)
if fb is None:
break
print(f"CLIENT:FEEDBACK:{fb.current}", flush=True)
result = handle.get_result(timeout=5.0)
if result is not None:
print(f"CLIENT:RESULT:{result.final_count}", flush=True)
else:
print("CLIENT:ERROR:no result", flush=True)
sys.exit(1)
Client Lifecycle
client.send_goal(goal)→ActionGoalHandle— blocks until accepted (raises on rejection)handle.recv_feedback(timeout)— receive next feedback; returnsNonewhen channel closeshandle.get_result(timeout)— block until terminal state; returnsNoneon timeouthandle.cancel()— request cancellation (the server decides when to honour it)
Goal Status
GoalStatus constants mirror action_msgs/msg/GoalStatus:
| Constant | Value | is_active() | is_terminal() |
|---|---|---|---|
ACCEPTED | 1 | ✓ | — |
EXECUTING | 2 | ✓ | — |
CANCELING | 3 | ✓ | — |
SUCCEEDED | 4 | — | ✓ |
CANCELED | 5 | — | ✓ |
ABORTED | 6 | — | ✓ |
status = ros_z_py.GoalStatus(handle.status)
if status.is_terminal():
print("Done:", status)
Python actions use a byte-wrapping wire format that is not compatible with
rmw_zenoh_cpp typed actions. Use the Rust ZAction trait for ROS 2 interop.
Complex Messages
Python bindings support nested message types like geometry_msgs/Twist:
from ros_z_py import geometry_msgs
# Create a Twist message with nested Vector3
twist = geometry_msgs.Twist(
linear=geometry_msgs.Vector3(x=1.0, y=0.0, z=0.0),
angular=geometry_msgs.Vector3(x=0.0, y=0.0, z=0.5)
)
pub = node.create_publisher("/cmd_vel", geometry_msgs.Twist)
pub.publish(twist)
Context Configuration
Connect to Specific Endpoint
ctx = (
ros_z_py.ZContextBuilder()
.with_connect_endpoints(["tcp/192.168.1.100:7447"])
.build()
)
Disable Multicast Scouting
ctx = (
ros_z_py.ZContextBuilder()
.with_domain_id(0)
.disable_multicast_scouting()
.build()
)
Custom Namespace
node = ctx.create_node("my_node").with_namespace("/robot1").build()
Performance: Zero-Copy Large Payloads
When working with large byte arrays (sensor data, images, point clouds), ros-z-py minimizes memory copies using a zero-copy optimization for uint8[] and byte[] fields.
ZBufView
When a subscriber receives a message, byte array fields are exposed as a ZBufView — a zero-copy view into the received network buffer. ZBufView implements Python's buffer protocol:
msg = sub.recv(timeout=1.0)
# msg.data is a ZBufView — no copy has occurred
# Zero-copy access via buffer protocol
mv = memoryview(msg.data)
header = mv[:8] # Slice without copying the entire payload
# Only copies when explicitly converted
data = bytes(msg.data)
Echo Without Copying
For relay and echo patterns, pass ZBufView fields directly to a new message. The derive macro detects ZBufView and extracts the inner buffer via reference counting — no data copy occurs:
# Receive and re-publish — zero-copy for byte array fields
msg = sub.recv(timeout=1.0)
echo = std_msgs.ByteMultiArray(data=msg.data) # No copy!
pub.publish(echo)
ZBufView API
| Method | Description |
|---|---|
len(view) | Number of bytes |
bool(view) | True if not empty |
view[i] | Single byte access |
view[start:stop] | Slice (returns bytes) |
memoryview(view) | Zero-copy buffer protocol access |
bytes(view) | Convert to bytes (copies) |
view.is_zero_copy | Whether the view avoids internal copying |
For best performance with large payloads, avoid calling bytes() on ZBufView fields. Use memoryview() for read access, or pass the ZBufView directly when re-publishing.
How It Works
The optimization operates at three layers:
-
Deserialization bypass: When the Python subscriber receives a message, the raw network buffer (ZBuf) is stored in a thread-local. During CDR deserialization, byte array fields create sub-views into this buffer instead of copying (
ZSlice::subslice()). -
Buffer protocol:
ZBufViewwraps the ZBuf and exposes its bytes to Python via__getbuffer__/__releasebuffer__. For contiguous buffers (the common case), this is a direct pointer — no copy at all. -
Pass-through re-publish: The
FromPyMessagederive macro recognizesZBufViewinputs and extracts the inner ZBuf viaclone(), which only increments a reference count on the underlying memory.
ROS 2 Interoperability
Python nodes interoperate with ROS 2 C++ nodes via the shared Zenoh transport. See the dedicated ROS 2 Interoperability chapter for setup instructions.
Running Tests
# Python unit tests
cd crates/ros-z-py
source .venv/bin/activate
python -m pytest tests/ -v
# Python-Rust interop tests
cargo test --features python-interop -p ros-z-tests --test python_interop -- --test-threads=1
Troubleshooting
Import errors when using ros_z_py
Import errors when using ros_z_py
This error occurs when the package hasn't been built or installed correctly.
Solution:
Rebuild and install the package:
cd crates/ros-z-py
source .venv/bin/activate
pip install -e ../ros-z-msgs/python/
maturin develop
Message type not found
Message type not found
This error occurs when trying to use a message type that isn't supported by the Python bindings.
Solution:
Check the supported message types by looking at the match arms in crates/ros-z-py/src/node.rs.
Currently supported: std_msgs/String, std_msgs/ByteMultiArray, geometry_msgs/Vector3,
geometry_msgs/Twist, sensor_msgs/LaserScan, and example_interfaces/AddTwoInts.
Ensure you are passing a message class object (e.g., std_msgs.String), not a string.
recv() always returns None
recv() always returns None
This happens when no messages are being received within the timeout period.
Solution:
- Check the topic name matches exactly (including leading
/) - Verify the publisher is running and connected to the same Zenoh network
- Increase the timeout value
- Use
--nocapturewith pytest to see debug output:python -m pytest tests/ -v --capture=no
Resources
- Code Generation Internals - How Python bindings are generated
- Pub/Sub - Deep dive into pub-sub patterns
- Services - Request-response communication
- Message Generation - How message types work
- Networking - Zenoh router setup and options
Start with the pub/sub example to understand the basics, then explore services for request-response patterns.