Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Message Cache

ZCache<T> is a timestamp-indexed, capacity-bounded sliding-window cache of received messages. It mirrors the ROS 2 message_filters::Cache<T> API, letting you query a time-ordered history of messages without a background thread or executor.

Note

ZCache<T> wraps a Zenoh subscriber and retains the N most recent messages in a BTreeMap keyed by timestamp. Queries run in O(log n) using Rust's range API.

Quick Start

use ros_z::prelude::*;
use ros_z_msgs::sensor_msgs::Imu;
use std::time::{Duration, SystemTime};

let ctx = ZContextBuilder::default().build()?;
let node = ctx.create_node("cache_node").build()?;

// Retain the 200 most recent messages, indexed by Zenoh transport timestamp.
let cache = node.create_cache::<Imu>("/imu/data", 200).build()?;

// Query the last 100 ms.
let now = SystemTime::now();
let window = cache.get_interval(now - Duration::from_millis(100), now);
println!("Messages in window: {}", window.len());

Tip

use ros_z::Builder; must be in scope to call .build(). It is re-exported by ros_z::prelude::*.

Stamp Strategies

Two indexing strategies are available and selected at compile time via the type-state builder:

StrategyTypeWhen to use
ZenohStamp (default)ZenohStampZero-config; works for any message type. Indexed by the Zenoh transport timestamp.
ExtractorStampExtractorStamp<T, F>Sensor fusion: align by logical time (e.g. header.stamp).

ZenohStamp (default)

No configuration needed. The cache reads the uhlc::Timestamp that the Zenoh transport attaches to every published sample and converts it to SystemTime. If timestamping is disabled on the peer, the cache falls back to SystemTime::now() at receive time and logs a one-time warning.

use ros_z::prelude::*;
use ros_z_msgs::sensor_msgs::LaserScan;

let cache = node.create_cache::<LaserScan>("/scan", 100).build()?;

ExtractorStamp (application-level)

Supply a closure that extracts a SystemTime from the deserialized message. Use this when you need messages aligned by their logical capture time rather than network arrival time — the classic sensor fusion use case.

use ros_z::prelude::*;
use ros_z_msgs::sensor_msgs::Imu;
use std::time::{Duration, SystemTime};

let cache = node
    .create_cache::<Imu>("/imu/data", 200)
    .with_stamp(|msg: &Imu| {
        // Read header.stamp from the message.
        SystemTime::UNIX_EPOCH
            + Duration::new(msg.header.stamp.sec as u64, msg.header.stamp.nanosec)
    })
    .build()?;

Query API

All query methods take and return SystemTime values and return clones of the stored messages. They are safe to call from multiple threads.

get_interval(t_start, t_end) -> Vec<T>

Returns all messages with timestamp in [t_start, t_end], inclusive, ordered by timestamp ascending. If t_start > t_end the result is empty (no panic).

let msgs = cache.get_interval(
    SystemTime::now() - Duration::from_millis(500),
    SystemTime::now(),
);

get_before(t) -> Option<T>

The most recent message with timestamp ≤ t. Returns None if the cache is empty or all messages are strictly after t.

let latest = cache.get_before(SystemTime::now());

get_after(t) -> Option<T>

The earliest message with timestamp ≥ t. Returns None if the cache is empty or all messages are strictly before t.

let next = cache.get_after(camera_stamp);

get_nearest(t) -> Option<T>

The message whose timestamp is nearest to t (either side). When two messages are equidistant, the one with the earlier timestamp is returned. Returns None if the cache is empty.

// Align an IMU sample to a camera frame timestamp.
let imu = imu_cache.get_nearest(camera_stamp);

Introspection

cache.oldest_stamp()   // Option<SystemTime> — timestamp of the oldest entry
cache.newest_stamp()   // Option<SystemTime> — timestamp of the newest entry
cache.len()            // usize — number of messages currently stored
cache.is_empty()       // bool
cache.clear()          // remove all messages

Capacity and Eviction

The cache holds at most capacity messages. When a new message arrives and the cache is full, the oldest message (smallest timestamp) is evicted first, regardless of insertion order. Capacity can be changed at build time:

let cache = node
    .create_cache::<RosString>("/topic", 10)  // initial capacity
    .with_capacity(50)                         // override
    .build()?;

Comparison with message_filters::Cache<T>

ZCache<T> covers the core message_filters::Cache<T> API with some differences:

Featuremessage_filters::Cache<T>ZCache<T>
Storagestd::deque (insertion-sorted)BTreeMap (always sorted, O(log n) range)
Stamp strategiesRuntime function pointer (hasHeader / receive time)Compile-time type-state (ZenohStamp / ExtractorStamp)
Headerless messagesallow_headerless flag, falls back to receive timeZenohStamp handles any type — no header needed
getIntervalget_interval
getElemBeforeTimeStrict < t (exclusive)get_before: inclusive ≤ t
getElemAfterTimeStrict > t (exclusive)get_after: inclusive ≥ t
getSurroundingIntervalNot implemented
get_nearestNot present✅ nearest-neighbor lookup
clear / len / is_emptyNot present
Filter chaining (signalMessage)✅ (passes through to downstream)Not applicable (ros-z has no filter chain)
Subscriber lifecycleExternal, via connectInputOwned by ZCache, dropped on ZCache drop

Note

get_before and get_after use inclusive bounds ( / ), unlike the C++ getElemBeforeTime / getElemAfterTime which are exclusive (< / >). This matches the query semantics of get_interval.

Running the Examples

Three focused examples demonstrate the cache. Run them in separate terminals:

# Terminal 1 — publisher (sends msg-0, msg-1, … every 100 ms)
cargo run --example z_cache_talker

# Terminal 2 — cache consumer with Zenoh transport timestamp (default)
cargo run --example z_cache_zenoh_stamp

# Terminal 2 — cache consumer with application-level timestamp
cargo run --example z_cache_app_stamp

Resources

  • Pub/Sub — Publisher and subscriber patterns that ZCache builds on
  • Services — Request/reply communication
  • API docscargo doc -p ros-z --open, then navigate to ros_z::cache