Sync Engine
Sync protocols from calimero-node · calimero-node-primitives
Purpose
Two independent synchronization systems keep peers converged: application state sync (context DAG) and group governance sync (governance DAG). The SyncManager runs periodic and on-demand sync cycles. Protocols are selected dynamically via handshake negotiation based on divergence metrics.
Protocol Selection
Peers exchange a SyncHandshake to compare state. The comparison metrics determine which protocol minimizes bandwidth and latency for the current divergence level.
Application State Protocols
Four protocols for synchronizing context application state. Each is optimized for a different divergence scenario.
Hash Comparison — Merkle tree walk
The lightest sync protocol. Both peers share their Merkle tree root hash. If roots differ, they exchange hashes at progressively deeper levels to locate the divergent subtrees. Only the changed subtrees are transferred.
Algorithm
Root Compare
Exchange root hashes. If equal, sync is a no-op. If different, descend to the next level.
Level Descent
Exchange hash nodes at the current depth. Mark subtrees with matching hashes as "synced" and skip them. Continue descending into differing subtrees.
Leaf Transfer
At leaf level, exchange the actual data entries for divergent nodes. Apply incoming entries to local storage and update the Merkle tree.
Best for: Small, localized changes — a few keys updated in a large state tree.
crates/node/src/syncLevel-wise — wide-tree traversal
Optimized for broad, shallow divergence where many branches differ but the tree isn't deep. Instead of drilling into individual branches, this protocol processes entire levels in parallel.
Algorithm
Level Exchange
Both peers serialize their entire tree at the current level and exchange the batch. This avoids the round-trip overhead of per-branch negotiation.
Batch Diff
Compare the received level against the local level. Identify additions, deletions, and modifications in a single pass.
Descend Parallel
For each differing node, descend to the next level. Process all descendants in parallel rather than sequentially.
Best for: Many keys changed across different parts of the tree — bulk updates, migrations.
Snapshot — full state transfer
Fallback protocol for maximum divergence or when a peer has no prior state (new joiner). Transfers the entire state as paged chunks with a DeltaBuffer to capture mutations during transfer.
Algorithm
Snapshot Begin
Source peer takes a consistent read snapshot and begins chunking the state into pages of snapshot_chunk_size bytes. A DeltaBuffer is opened to capture any writes that occur during the transfer.
Chunk Transfer
Pages are streamed to the target peer. Each page includes a sequence number and hash for integrity verification. The target applies pages to a staging area.
DeltaBuffer Replay
After all pages are transferred, the accumulated DeltaBuffer entries (mutations that occurred during the snapshot) are replayed on top of the restored state.
Commit
The staging area is atomically swapped into the live state. The Merkle tree is rebuilt from the new state.
Best for: New peers joining a context, or peers that have been offline for a very long time.
Delta Sync — direct DAG exchange
Direct exchange of causal delta entries between peers. Uses bloom filters to efficiently identify which deltas the remote peer is missing, and subtree prefetch to minimize round-trips.
Algorithm
Bloom Filter Exchange
Each peer builds a bloom filter of their DAG entry hashes and exchanges it. This allows each side to quickly identify entries the other is likely missing without enumerating the full DAG.
Delta Request
Based on bloom filter results, the receiver requests specific delta entries by hash. The sender responds with the full CausalDelta payloads.
Subtree Prefetch
When a requested delta has parent hashes that the receiver also lacks, the sender proactively includes the parent chain (subtree prefetch) to reduce additional round-trips.
Apply + Verify
Received deltas are verified for causal consistency (all parents present) and applied to the local DAG in topological order. The Merkle tree is incrementally updated.
Best for: Moderate divergence with known recent activity — the most commonly used protocol during normal operation.
Group Governance Sync
Governance operations form a separate DAG and use dedicated sync mechanisms. Three complementary approaches ensure governance state converges across all group members.
Real-time Gossip
primarySignedGroupOpV1 operations are published via gossipsub on dedicated group topics. Each operation is verified on ingress — signature check, capability check, parent hash validation. Valid operations are fed directly into the local DagStore.
pub fn publish_group_op(
topic: &GroupTopic,
op: &SignedGroupOpV1,
) -> Result<()>
Heartbeat Comparison
periodicGroupStateHeartbeat is broadcast every 30 seconds. Contains the group's current dag_heads (latest operation hashes). Peers compare heads and trigger catch-up if they differ.
group_id: GroupId,
dag_heads: Vec<Hash>,
op_count: u64,
}
Stream Catch-Up
on-demandWhen heartbeat comparison reveals a mismatch, a direct stream is opened to the divergent peer. The peer exchanges GroupDeltaRequest specifying known heads, and receives a GroupDeltaResponse with the missing operations scanned from the remote OpLog.
group_id: GroupId,
known_heads: Vec<Hash>,
}
pub struct GroupDeltaResponse {
ops: Vec<SignedGroupOpV1>,
has_more: bool,
}
Startup Recovery
bootOn node startup, reload_group_dags reads the persistent OpLog from RocksDB and reconstructs all group DAGs in memory. The DAG heads are then compared with peers via the first heartbeat cycle to catch up on any operations missed while offline.
pub async fn reload_group_dags(
store: &RocksDBStore,
) -> Result<HashMap<GroupId, DagState>>
SyncManager
Long-running async task (not an Actix actor) that orchestrates all synchronization. Runs periodic sync cycles and handles on-demand sync triggered by incoming streams.
Configuration
timeout: Duration, // per-sync timeout
interval: Duration, // periodic sync interval
frequency: u32, // syncs per interval
max_concurrent: usize, // max parallel sync streams
snapshot_chunk_size: usize, // bytes per snapshot page
delta_sync_threshold: u64, // entity count for delta vs snapshot
}
Core Loop
start() — Spawn Loop
The start() method spawns the main sync loop as a Tokio task. It runs indefinitely, waking on a timer tick or when an incoming stream is opened by a remote peer.
Periodic Tick
On each interval tick, iterates over all active contexts. For each context, builds a SyncState (current root_hash, dag_heads, entity count) and selects a peer for synchronization.
Peer Selection
Selects a sync peer based on: most recently seen heartbeat, lowest latency, and whether they reported a different state hash. Avoids peers already in an active sync session.
Concurrent Futures
Sync operations run as bounded concurrent futures via FuturesUnordered, capped at max_concurrent. Each future handles one context-peer sync session from handshake through completion.
handle_opened_stream()
When a remote peer opens a stream, reads the StreamMessage::Init frame, extracts the InitPayload, and dispatches to the appropriate sync protocol handler based on the negotiated protocol type.
Stream Wire Types
Init(InitPayload),
Message(MessagePayload),
OpaqueError,
}
context_id: ContextId,
protocol: SyncProtocol,
handshake: SyncHandshake,
}
HashNodes(Vec<HashNode>),
LevelData(LevelBatch),
SnapshotChunk(PageData),
DeltaEntries(Vec<CausalDelta>),
BloomFilter(BloomData),
}
Transport
The sync transport layer wraps libp2p streams with framing, encryption, and authentication.
SyncTransport
Wraps a raw libp2p stream with length-delimited framing. Each frame is a Borsh-serialized StreamMessage. Handles backpressure via Tokio's AsyncWrite flow control.
framed: Framed<Stream, LengthCodec>,
encryption: EncryptionState,
}
EncryptionState
Manages per-stream encryption state. After the initial handshake, streams are encrypted with a shared key derived from the context's encryption domain. Supports both Encrypted and Plaintext modes (configurable per context).
Plaintext,
Encrypted {
cipher: ChaCha20Poly1305,
nonce_counter: u64,
},
}
Challenge Domain
Each sync stream authenticates with a challenge derived from the context's encryption domain. The initiating peer signs a challenge nonce, and the responder verifies membership before accepting the stream. This prevents unauthorized peers from syncing context data.