Sync Engine

Sync protocols from calimero-node · calimero-node-primitives

Purpose

Two independent synchronization systems keep peers converged: application state sync (context DAG) and group governance sync (governance DAG). The SyncManager runs periodic and on-demand sync cycles. Protocols are selected dynamically via handshake negotiation based on divergence metrics.

4
app sync protocols
3
governance mechanisms
30s
heartbeat interval
N
concurrent syncs

Protocol Selection

Peers exchange a SyncHandshake to compare state. The comparison metrics determine which protocol minimizes bandwidth and latency for the current divergence level.

Peer A root_hash, dag_heads entities, depth Peer B root_hash, dag_heads entities, depth SyncHandshake Compare root_hash + dag_heads entity count + tree depth divergence? Hash Comparison Merkle tree walk Small diffs, few nodes LOW bandwidth small Level-wise Wide-tree traversal Broad, shallow divergence MEDIUM bandwidth broad Snapshot Full state transfer Paged chunks + DeltaBuffer HIGH bandwidth large Delta Sync Direct DAG exchange Bloom filter + subtree prefetch OPTIMAL bandwidth direct DAG Hash Comparison Level-wise Snapshot Delta Sync

Application State Protocols

Four protocols for synchronizing context application state. Each is optimized for a different divergence scenario.

Hash Comparison — Merkle tree walk

The lightest sync protocol. Both peers share their Merkle tree root hash. If roots differ, they exchange hashes at progressively deeper levels to locate the divergent subtrees. Only the changed subtrees are transferred.

Algorithm

1

Root Compare

Exchange root hashes. If equal, sync is a no-op. If different, descend to the next level.

2

Level Descent

Exchange hash nodes at the current depth. Mark subtrees with matching hashes as "synced" and skip them. Continue descending into differing subtrees.

3

Leaf Transfer

At leaf level, exchange the actual data entries for divergent nodes. Apply incoming entries to local storage and update the Merkle tree.

Best for: Small, localized changes — a few keys updated in a large state tree.

crates/node/src/sync
Level-wise — wide-tree traversal

Optimized for broad, shallow divergence where many branches differ but the tree isn't deep. Instead of drilling into individual branches, this protocol processes entire levels in parallel.

Algorithm

1

Level Exchange

Both peers serialize their entire tree at the current level and exchange the batch. This avoids the round-trip overhead of per-branch negotiation.

2

Batch Diff

Compare the received level against the local level. Identify additions, deletions, and modifications in a single pass.

3

Descend Parallel

For each differing node, descend to the next level. Process all descendants in parallel rather than sequentially.

Best for: Many keys changed across different parts of the tree — bulk updates, migrations.

Snapshot — full state transfer

Fallback protocol for maximum divergence or when a peer has no prior state (new joiner). Transfers the entire state as paged chunks with a DeltaBuffer to capture mutations during transfer.

Algorithm

1

Snapshot Begin

Source peer takes a consistent read snapshot and begins chunking the state into pages of snapshot_chunk_size bytes. A DeltaBuffer is opened to capture any writes that occur during the transfer.

2

Chunk Transfer

Pages are streamed to the target peer. Each page includes a sequence number and hash for integrity verification. The target applies pages to a staging area.

3

DeltaBuffer Replay

After all pages are transferred, the accumulated DeltaBuffer entries (mutations that occurred during the snapshot) are replayed on top of the restored state.

4

Commit

The staging area is atomically swapped into the live state. The Merkle tree is rebuilt from the new state.

Best for: New peers joining a context, or peers that have been offline for a very long time.

Delta Sync — direct DAG exchange

Direct exchange of causal delta entries between peers. Uses bloom filters to efficiently identify which deltas the remote peer is missing, and subtree prefetch to minimize round-trips.

Algorithm

1

Bloom Filter Exchange

Each peer builds a bloom filter of their DAG entry hashes and exchanges it. This allows each side to quickly identify entries the other is likely missing without enumerating the full DAG.

2

Delta Request

Based on bloom filter results, the receiver requests specific delta entries by hash. The sender responds with the full CausalDelta payloads.

3

Subtree Prefetch

When a requested delta has parent hashes that the receiver also lacks, the sender proactively includes the parent chain (subtree prefetch) to reduce additional round-trips.

4

Apply + Verify

Received deltas are verified for causal consistency (all parents present) and applied to the local DAG in topological order. The Merkle tree is incrementally updated.

Best for: Moderate divergence with known recent activity — the most commonly used protocol during normal operation.

Group Governance Sync

Governance operations form a separate DAG and use dedicated sync mechanisms. Three complementary approaches ensure governance state converges across all group members.

Real-time Gossip

primary

SignedGroupOpV1 operations are published via gossipsub on dedicated group topics. Each operation is verified on ingress — signature check, capability check, parent hash validation. Valid operations are fed directly into the local DagStore.

// Gossip on group topic
pub fn publish_group_op(
    topic: &GroupTopic,
    op: &SignedGroupOpV1,
) -> Result<()>

Heartbeat Comparison

periodic

GroupStateHeartbeat is broadcast every 30 seconds. Contains the group's current dag_heads (latest operation hashes). Peers compare heads and trigger catch-up if they differ.

pub struct GroupStateHeartbeat {
    group_id: GroupId,
    dag_heads: Vec<Hash>,
    op_count: u64,
}

Stream Catch-Up

on-demand

When heartbeat comparison reveals a mismatch, a direct stream is opened to the divergent peer. The peer exchanges GroupDeltaRequest specifying known heads, and receives a GroupDeltaResponse with the missing operations scanned from the remote OpLog.

pub struct GroupDeltaRequest {
    group_id: GroupId,
    known_heads: Vec<Hash>,
}

pub struct GroupDeltaResponse {
    ops: Vec<SignedGroupOpV1>,
    has_more: bool,
}

Startup Recovery

boot

On node startup, reload_group_dags reads the persistent OpLog from RocksDB and reconstructs all group DAGs in memory. The DAG heads are then compared with peers via the first heartbeat cycle to catch up on any operations missed while offline.

// Called at node boot
pub async fn reload_group_dags(
    store: &RocksDBStore,
) -> Result<HashMap<GroupId, DagState>>
GroupOp Created sign + apply local Gossip Broadcast group topic Peer Ingestion verify + DagStore Heartbeat Check dag_heads compare Stream Catch-Up DeltaReq / DeltaResp
Real-time gossip
Heartbeat periodic
Stream catch-up
Startup recovery

SyncManager

Long-running async task (not an Actix actor) that orchestrates all synchronization. Runs periodic sync cycles and handles on-demand sync triggered by incoming streams.

Configuration

pub struct SyncConfig {
    timeout: Duration,             // per-sync timeout
    interval: Duration,            // periodic sync interval
    frequency: u32,                // syncs per interval
    max_concurrent: usize,         // max parallel sync streams
    snapshot_chunk_size: usize,    // bytes per snapshot page
    delta_sync_threshold: u64,     // entity count for delta vs snapshot
}

Core Loop

1

start() — Spawn Loop

The start() method spawns the main sync loop as a Tokio task. It runs indefinitely, waking on a timer tick or when an incoming stream is opened by a remote peer.

2

Periodic Tick

On each interval tick, iterates over all active contexts. For each context, builds a SyncState (current root_hash, dag_heads, entity count) and selects a peer for synchronization.

3

Peer Selection

Selects a sync peer based on: most recently seen heartbeat, lowest latency, and whether they reported a different state hash. Avoids peers already in an active sync session.

4

Concurrent Futures

Sync operations run as bounded concurrent futures via FuturesUnordered, capped at max_concurrent. Each future handles one context-peer sync session from handshake through completion.

5

handle_opened_stream()

When a remote peer opens a stream, reads the StreamMessage::Init frame, extracts the InitPayload, and dispatches to the appropriate sync protocol handler based on the negotiated protocol type.

Stream Wire Types

pub enum StreamMessage {
    Init(InitPayload),
    Message(MessagePayload),
    OpaqueError,
}
pub struct InitPayload {
    context_id: ContextId,
    protocol: SyncProtocol,
    handshake: SyncHandshake,
}
pub enum MessagePayload {
    HashNodes(Vec<HashNode>),
    LevelData(LevelBatch),
    SnapshotChunk(PageData),
    DeltaEntries(Vec<CausalDelta>),
    BloomFilter(BloomData),
}

Transport

The sync transport layer wraps libp2p streams with framing, encryption, and authentication.

SyncTransport

Wraps a raw libp2p stream with length-delimited framing. Each frame is a Borsh-serialized StreamMessage. Handles backpressure via Tokio's AsyncWrite flow control.

pub struct SyncTransport {
    framed: Framed<Stream, LengthCodec>,
    encryption: EncryptionState,
}

EncryptionState

Manages per-stream encryption state. After the initial handshake, streams are encrypted with a shared key derived from the context's encryption domain. Supports both Encrypted and Plaintext modes (configurable per context).

pub enum EncryptionState {
    Plaintext,
    Encrypted {
        cipher: ChaCha20Poly1305,
        nonce_counter: u64,
    },
}

Challenge Domain

Each sync stream authenticates with a challenge derived from the context's encryption domain. The initiating peer signs a challenge nonce, and the responder verifies membership before accepting the stream. This prevents unauthorized peers from syncing context data.

SyncManager open stream Challenge Auth sign + verify Encryption ChaCha20Poly1305 Framed Stream length-delimited StreamMessage Borsh serialized
Transport framing
Encryption
Authentication
Wire format