Sync Engine

Sync protocols from calimero-node · calimero-node-primitives

Purpose

Two independent synchronization systems keep peers converged: application state sync (context DAG) and group governance sync (governance DAG). The SyncManager runs periodic and on-demand sync cycles. Protocols are selected dynamically via handshake negotiation based on divergence metrics.

4
app sync protocols
3
governance mechanisms
30s
heartbeat interval
N
concurrent syncs

Protocol Selection

Peers exchange a SyncHandshake to compare state. The comparison metrics determine which protocol minimizes bandwidth and latency for the current divergence level.

Peer A root_hash, dag_heads entities, depth Peer B root_hash, dag_heads entities, depth SyncHandshake Compare root_hash + dag_heads entity count + tree depth divergence? Hash Comparison Merkle tree walk Small diffs, few nodes LOW bandwidth small Level-wise Wide-tree traversal Broad, shallow divergence MEDIUM bandwidth broad Snapshot Full state transfer Paged chunks + DeltaBuffer HIGH bandwidth large Delta Sync Direct DAG exchange Bloom filter + subtree prefetch OPTIMAL bandwidth direct DAG Hash Comparison Level-wise Snapshot Delta Sync

Application State Protocols

Four protocols for synchronizing context application state. Each is optimized for a different divergence scenario.

Hash Comparison — Merkle tree walk

The lightest sync protocol. Both peers share their Merkle tree root hash. If roots differ, they exchange hashes at progressively deeper levels to locate the divergent subtrees. Only the changed subtrees are transferred.

Algorithm

1

Root Compare

Exchange root hashes. If equal, sync is a no-op. If different, descend to the next level.

2

Level Descent

Exchange hash nodes at the current depth. Mark subtrees with matching hashes as "synced" and skip them. Continue descending into differing subtrees.

3

Leaf Transfer

At leaf level, exchange the actual data entries for divergent nodes. Apply incoming entries to local storage and update the Merkle tree.

Best for: Small, localized changes — a few keys updated in a large state tree.

crates/node/src/sync
Level-wise — wide-tree traversal

Optimized for broad, shallow divergence where many branches differ but the tree isn't deep. Instead of drilling into individual branches, this protocol processes entire levels in parallel.

Algorithm

1

Level Exchange

Both peers serialize their entire tree at the current level and exchange the batch. This avoids the round-trip overhead of per-branch negotiation.

2

Batch Diff

Compare the received level against the local level. Identify additions, deletions, and modifications in a single pass.

3

Descend Parallel

For each differing node, descend to the next level. Process all descendants in parallel rather than sequentially.

Best for: Many keys changed across different parts of the tree — bulk updates, migrations.

Snapshot — full state transfer

Fallback protocol for maximum divergence or when a peer has no prior state (new joiner). Transfers the entire state as paged chunks with a DeltaBuffer to capture mutations during transfer.

Algorithm

1

Snapshot Begin

Source peer takes a consistent read snapshot and begins chunking the state into pages of snapshot_chunk_size bytes. A DeltaBuffer is opened to capture any writes that occur during the transfer.

2

Chunk Transfer

Pages are streamed to the target peer. Each page includes a sequence number and hash for integrity verification. The target applies pages to a staging area.

3

DeltaBuffer Replay

After all pages are transferred, the accumulated DeltaBuffer entries (mutations that occurred during the snapshot) are replayed on top of the restored state.

4

Commit

The staging area is atomically swapped into the live state. The Merkle tree is rebuilt from the new state.

Best for: New peers joining a context, or peers that have been offline for a very long time.

Delta Sync — direct DAG exchange

Direct exchange of causal delta entries between peers. Uses bloom filters to efficiently identify which deltas the remote peer is missing, and subtree prefetch to minimize round-trips.

Algorithm

1

Bloom Filter Exchange

Each peer builds a bloom filter of their DAG entry hashes and exchanges it. This allows each side to quickly identify entries the other is likely missing without enumerating the full DAG.

2

Delta Request

Based on bloom filter results, the receiver requests specific delta entries by hash. The sender responds with the full CausalDelta payloads.

3

Subtree Prefetch

When a requested delta has parent hashes that the receiver also lacks, the sender proactively includes the parent chain (subtree prefetch) to reduce additional round-trips.

4

Apply + Verify

Received deltas are verified for causal consistency (all parents present) and applied to the local DAG in topological order. The Merkle tree is incrementally updated.

Best for: Moderate divergence with known recent activity — the most commonly used protocol during normal operation.

Group Governance Sync

Governance operations form a separate DAG and use dedicated sync mechanisms. Three complementary approaches ensure governance state converges across all group members.

Projection-Authoritative Membership on the Gossip Path

When the live membership resolver rejects a delta (DeltaAuthOutcome::MembershipReject), the node no longer drops it immediately. Instead it calls projection_member_at_cut_authoritative, which first refreshes the projection fold via refresh_projection_for_cut and then reads membership with the authoritative (complete-ancestry, no materialized-fallback) check. If the projection returns Some(true) the delta falls through to the normal apply path; None or Some(false) still reject.

This makes the projection the final arbiter of membership on the primary gossip ingestion path — live's result is a cross-check, not the last word. A divergence-gate warning (unified_projection_divergence / membership-cut-grant) is emitted whenever the projection overrides a live reject, and observe_peer_identity is called on the grant path so projection-only peers continue to feed sync peer selection.

Scope: This override applies only to the primary gossip path (handle_state_delta). Other MembershipReject sites — sync delta-request and parent-fetch replay — intentionally remain live-only.

Visibility: projection_member_at_cut is now pub(crate) so the sync manager can call it directly, ensuring both the data-write path and the inbound-sync authorization path fold the projection through the same refreshing backfill logic.

// Grant path: refresh fold then authoritative read
pub fn projection_member_at_cut_authoritative(
    ctx: &DeltaContext,
    cut: &Cut,
) -> Option<bool>

// Shared fold refresh (no author_id, no return value)
pub fn refresh_projection_for_cut(
    ctx: &DeltaContext,
    cut: &Cut,
)

Both projection_member_at_cut (deny arm) and projection_member_at_cut_authoritative (grant arm) call refresh_projection_for_cut so both arms see the same fold state.

Projection-Authoritative Drain (drain_governance_pending)

State deltas that arrived before their governance ancestry was locally available are buffered and re-evaluated by drain_governance_pending once new governance ops are applied. This drain now uses member_at_cut_authoritative (projection) as its sole decision authority, replacing the previous acl_view_at (live DAG walk) primary path.

The three projection outcomes map directly to the three drain outcomes:

  • Some(true)re-apply — delta is re-submitted to the normal apply path.
  • Some(false)drop — delta is discarded; the author is definitively not a member at that cut.
  • Nonere-buffer — governance ancestry is still incomplete; the delta stays pending for the next drain cycle.

Because the projection never returns an error, the previous Err match arm that dropped a buffered delta and recorded a "lookup_error" metric has been eliminated entirely.

Divergence Cross-check

On decisive verdicts (Some(true) or Some(false)), acl_view_at is still called as a secondary cross-check to detect disagreement between the projection and the live resolver. On disagreement a unified_projection_divergence warning is emitted:

  • Some(true) + live returns Removed or NeverMemberplane = "membership-cut-grant" (dangerous grant-plane divergence).
  • Some(false) + live returns Memberplane = "membership-cut" (deny-plane divergence).

acl_view_at is deliberately not called on None to avoid false positives while ancestry is still being folded.

Metric Changes

The governance_drain_outcome metric label for drop events is now best-effort rather than authoritative. It is derived from the live resolver's secondary response: "removed" if live says Removed, "never_member" if live says NeverMember, and the new fallback "not_member" if the live resolver disagrees or is inconclusive. The "lookup_error" label is no longer emitted.

The debug log emitted when a delta is re-buffered no longer includes a needed_count field, because the projection returns a boolean None rather than a structured variant enumerating missing DAG heads.

// Drain outcomes — now driven by projection
// Some(true) → re-apply
// Some(false) → drop (metric label: "removed" | "never_member" | "not_member")
// None → re-buffer (no needed_count in log)
pub fn member_at_cut_authoritative(
    ctx: &DeltaContext,
    cut: &Cut,
) -> Option<bool>

Operators: Monitor for unified_projection_divergence warn logs on the drain path during the F5 migration window. Update any metric dashboards to handle the new "not_member" label and the absence of "lookup_error" on governance_drain_outcome.

Inbound-Sync Peer Authorization (peer_is_group_member)

A new private method SyncManager::peer_is_group_member encapsulates membership authorization for inbound sync peers. It first checks MembershipRepository::is_member (live resolver), then resolves the current governance heads via ScopeProjections::namespace_current_heads and calls projection_member_at_cut to obtain the projection's verdict. When the two disagree a unified_projection_divergence warning is emitted with plane membership-sync. The projection answer is used as the authoritative decision; if the projection returns None (cold or partially folded governance), the method falls back to the live result so no peer is spuriously rejected while governance ancestry is still being caught up — the existing catch-up retry will re-resolve once governance advances.

Two call sites inside SyncManager that previously called MembershipRepository::new(store).is_member(...) directly — the materialization-wait verification and the inherited-member check in verify_inbound_member — now call self.peer_is_group_member(...), so the projection-first logic with gated cross-check applies uniformly to all inbound peer authorization.

// Authorizes an inbound sync peer: projection-first, live fallback
async fn peer_is_group_member(
    &self,
    group_id: GroupId,
    peer_id: PeerId,
) -> Result<bool>

// projection returns None → fall back to live; Some(x) → authoritative
// divergence logged as unified_projection_divergence plane=membership-sync

Operators: Monitor unified_projection_divergence warn logs with plane = "membership-sync" during governance catch-up windows. A None projection result is not an error — it means governance ancestry is still being folded and the live result is used as a safe temporary answer.

Real-time Gossip

primary

SignedGroupOpV1 operations are published via gossipsub on dedicated group topics. Each operation is verified on ingress — signature check, capability check, parent hash validation. Valid operations are fed directly into the local DagStore.

// Gossip on group topic
pub fn publish_group_op(
    topic: &GroupTopic,
    op: &SignedGroupOpV1,
) -> Result<()>

Heartbeat Comparison

periodic

GroupStateHeartbeat is broadcast every 30 seconds. Contains the group's current dag_heads (latest operation hashes). Peers compare heads and trigger catch-up if they differ.

pub struct GroupStateHeartbeat {
    group_id: GroupId,
    dag_heads: Vec<Hash>,
    op_count: u64,
}

Stream Catch-Up

on-demand

When heartbeat comparison reveals a mismatch, a direct stream is opened to the divergent peer. The peer exchanges GroupDeltaRequest specifying known heads, and receives a GroupDeltaResponse with the missing operations scanned from the remote OpLog.

pub struct GroupDeltaRequest {
    group_id: GroupId,
    known_heads: Vec<Hash>,
}

pub struct GroupDeltaResponse {
    ops: Vec<SignedGroupOpV1>,
    has_more: bool,
}

Startup Recovery

boot

On node startup, reload_group_dags reads the persistent OpLog from RocksDB and reconstructs all group DAGs in memory. The DAG heads are then compared with peers via the first heartbeat cycle to catch up on any operations missed while offline.

// Called at node boot
pub async fn reload_group_dags(
    store: &RocksDBStore,
) -> Result<HashMap<GroupId, DagState>>
GroupOp Created sign + apply local Gossip Broadcast group topic Peer Ingestion verify + DagStore Heartbeat Check dag_heads compare Stream Catch-Up DeltaReq / DeltaResp
Real-time gossip
Heartbeat periodic
Stream catch-up
Startup recovery

SyncManager

Long-running async task (not an Actix actor) that orchestrates all synchronization. Runs periodic sync cycles and handles on-demand sync triggered by incoming streams.

Configuration

pub struct SyncConfig {
    timeout: Duration,             // per-sync timeout
    interval: Duration,            // periodic sync interval
    frequency: u32,                // syncs per interval
    max_concurrent: usize,         // max parallel sync streams
    snapshot_chunk_size: usize,    // bytes per snapshot page
    delta_sync_threshold: u64,     // entity count for delta vs snapshot
}

Core Loop

1

start() — Spawn Loop

The start() method spawns the main sync loop as a Tokio task. It runs indefinitely, waking on a timer tick or when an incoming stream is opened by a remote peer.

2

Periodic Tick

On each interval tick, iterates over all active contexts. For each context, builds a SyncState (current root_hash, dag_heads, entity count) and selects a peer for synchronization.

3

Peer Selection

Selects a sync peer based on: most recently seen heartbeat, lowest latency, and whether they reported a different state hash. Avoids peers already in an active sync session.

4

Concurrent Futures

Sync operations run as bounded concurrent futures via FuturesUnordered, capped at max_concurrent. Each future handles one context-peer sync session from handshake through completion.

5

handle_opened_stream()

When a remote peer opens a stream, reads the StreamMessage::Init frame, extracts the InitPayload, and dispatches to the appropriate sync protocol handler based on the negotiated protocol type.

Stream Wire Types

pub enum StreamMessage {
    Init(InitPayload),
    Message(MessagePayload),
    OpaqueError,
}
pub struct InitPayload {
    context_id: ContextId,
    protocol: SyncProtocol,
    handshake: SyncHandshake,
}
pub enum MessagePayload {
    HashNodes(Vec<HashNode>),
    LevelData(LevelBatch),
    SnapshotChunk(PageData),
    DeltaEntries(Vec<CausalDelta>),
    BloomFilter(BloomData),
}

Transport

The sync transport layer wraps libp2p streams with framing, encryption, and authentication.

SyncTransport

Wraps a raw libp2p stream with length-delimited framing. Each frame is a Borsh-serialized StreamMessage. Handles backpressure via Tokio's AsyncWrite flow control.

pub struct SyncTransport {
    framed: Framed<Stream, LengthCodec>,
    encryption: EncryptionState,
}

EncryptionState

Manages per-stream encryption state. After the initial handshake, streams are encrypted with a shared key derived from the context's encryption domain. Supports both Encrypted and Plaintext modes (configurable per context).

pub enum EncryptionState {
    Plaintext,
    Encrypted {
        cipher: ChaCha20Poly1305,
        nonce_counter: u64,
    },
}

Challenge Domain

Each sync stream authenticates with a challenge derived from the context's encryption domain. The initiating peer signs a challenge nonce, and the responder verifies membership before accepting the stream. This prevents unauthorized peers from syncing context data.

SyncManager open stream Challenge Auth sign + verify Encryption ChaCha20Poly1305 Framed Stream length-delimited StreamMessage Borsh serialized
Transport framing
Encryption
Authentication
Wire format