Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

CPS Walk: The Downward Pass

walk_cps is the core of the funnel executor. It processes one node at a time: initializes the fold heap, iterates children through the graph’s push-based visitor, and branches on the child count. It is a void function — results flow through continuations, not return values. This is what makes cross-thread result delivery possible without blocking.

The algorithm

#![allow(unused)]
fn main() {
pub(crate) fn walk_cps<N, H, R, F, G, P: FunnelPolicy>(
    wctx: &WorkerCtx<N, H, R, F, G, P>,
    mut node: N,
    mut cont: Cont<H, R>,
) where
    F: FoldOps<N, H, R> + 'static,
    G: TreeOps<N> + 'static,
    N: Clone + Send + 'static,
    H: 'static,
    R: Send + 'static,
{
    let ctx = wctx.ctx;
    loop {
        let fold = ctx.fold_ref();
        let graph = ctx.graph_ref();
        let chain_arena = ctx.chain_arena();
        let cont_arena = ctx.cont_arena();
        let heap = fold.init(&node);

        let mut child_count = 0u32;
        let mut first_child: Option<N> = None;
        let mut chain_idx: Option<super::super::infra::arena::ArenaIdx> = None;
        let mut heap_opt = Some(heap);
        let mut cont_opt = Some(cont);

        wctx.reset_wake();
        graph.visit(&node, &mut |child: &N| {
            child_count += 1;
            if child_count == 1 {
                first_child = Some(child.clone());
            } else {
                if child_count == 2 {
                    let cn = ChainNode::new(heap_opt.take().unwrap(), cont_opt.take().unwrap());
                    let idx = chain_arena.alloc(cn);
                    // SAFETY: idx was just returned by chain_arena.alloc
                // (or a prior iteration within this visit closure) and
                // the arena lives for the pool duration.
                let node_ref = unsafe { chain_arena.get(idx) };
                    node_ref.chain.append_slot();
                    chain_idx = Some(idx);
                }
                let idx = chain_idx.unwrap();
                // SAFETY: idx was just returned by chain_arena.alloc
                // (or a prior iteration within this visit closure) and
                // the arena lives for the pool duration.
                let node_ref = unsafe { chain_arena.get(idx) };
                let slot = node_ref.chain.append_slot();
                wctx.push_task(FunnelTask::Walk {
                    child: child.clone(),
                    cont: Cont::Slot { node: idx, slot },
                });
            }
        });

        match child_count {
            0 => {
                let heap = heap_opt.take().unwrap();
                let cont = cont_opt.take().unwrap();
                let result = fold.finalize(&heap);
                fire_cont::<N, H, R, F, G, P>(ctx, cont, result);
                return;
            }
            1 => {
                let child = first_child.unwrap();
                let heap = heap_opt.take().unwrap();
                let parent_cont = cont_opt.take().unwrap();
                let parent_idx = cont_arena.alloc(parent_cont);
                node = child;
                cont = Cont::Direct { heap, parent_idx };
            }
            _ => {
                let idx = chain_idx.unwrap();
                // SAFETY: idx came from chain_arena.alloc above.
                let cn = unsafe { chain_arena.get(idx) };
                let fold = ctx.fold_ref();
                let set_total_result = P::Accumulate::set_total(&cn.chain, fold);
                if let Some(finalized) = set_total_result {
                    let parent = cn.take_parent_cont();
                    fire_cont::<N, H, R, F, G, P>(ctx, parent, finalized);
                    return;
                }
                let child = first_child.unwrap();
                node = child;
                cont = Cont::Slot { node: idx, slot: SlotRef(0) };
            }
        }
    }
}
}

The function takes (wctx, node, cont):

  • wctx: per-worker context (queue handle + wake state)
  • node: the graph node to process
  • cont: what to do with this node’s result

It loops (trampolined for the inline child case), processing one node per iteration.

Child-count branching

After graph.visit returns, the child count determines the control flow:

%3walkwalk_cps(node, cont)initfold.init(&node)graph.visit(callback)walk->initleaf0 children (leaf)finalize → fire_cont(cont, result)init->leaf0single1 childCont::Direct { heap }loop continues with childinit->single1multi2+ childrenChainNode + FoldChainset_totalloop continues with child₀init->multi≥2pushchildren 1..K:push_task(Walk{child, Slot{i}})init->pushduring visit

Leaf (0 children): Finalize the heap and call fire_cont with the original continuation. This is the base case — the upward cascade begins here.

Single child (1): No ChainNode needed. The heap moves into a Cont::Direct, the parent continuation is stored in the ContArena, and the loop continues with the child. Zero queue interaction, zero atomic operations.

Multi-child (2+): A ChainNode is allocated in the arena (lazily, on child 2 — not child 1). Children 1..K are pushed as FunnelTask::Walk to the queue. Then set_total records the child count in the ticket system. The loop continues with child 0 (inline walk).

First-child inlining

Child 0 is ALWAYS walked inline — a continuation of the current thread’s DFS spine, with zero queue overhead. Siblings are pushed to the queue for workers to steal. This gives every active thread a guaranteed DFS path from its entry point to a leaf:

%3rootrootThread 0c0c0Thread 0 (inline)root->c0inlinec1c1Thread 1 (stolen)root->c1c2c2Thread 2 (stolen)root->c2c00c00Thread 0 (inline)c0->c00inlinec01c01stolenc0->c01

Red edges = inline walks (zero queue cost). Dashed = queue submissions. Thread 0 walks root → c0 → c00 → … → leaf without touching the queue at any level. This is structurally equivalent to Cilk’s continuation-stealing, inverted: we push sibling tasks (child stealing) instead of stealing the parent’s continuation.

Three compounding effects make this critical:

  • Zero-queue spine. For depth D, one thread processes D nodes with no push/pop overhead (~20-50ns saved per level).
  • Cache warmth. ChainNodes allocated on the way down are in L1 cache on the way up via fire_cont.
  • Reduced contention. One fewer task per level competing for deque access.

Defunctionalization

Tasks are data, not closures:

#![allow(unused)]
fn main() {
pub enum FunnelTask<N, H, R> {
    Walk { child: N, cont: Cont<H, R> },
}
}

FunnelTask::Walk pairs a child node with its continuation — plain data stored inline in deque slots. No Box<dyn FnOnce>, no closure capture, no vtable. The execute_task function is the apply:

#![allow(unused)]
fn main() {
pub(crate) fn execute_task<N, H, R, F, G, P: FunnelPolicy>(
    wctx: &WorkerCtx<N, H, R, F, G, P>,
    task: FunnelTask<N, H, R>,
) where
    F: FoldOps<N, H, R> + 'static,
    G: TreeOps<N> + 'static,
    N: Clone + Send + 'static,
    H: 'static,
    R: Send + 'static,
{
    match task {
        FunnelTask::Walk { child, cont } => walk_cps(wctx, child, cont),
    }
}
}

This is the Reynolds/Danvy defunctionalization transformation applied to parallel work items.

Streaming submission

Children are pushed to the queue during graph.visit, not after. Workers can steal siblings while the parent is still discovering more children. append_slot is called per child inside the callback; set_total is called after graph.visit returns. Between these two events, workers may deliver results to already-appended slots. The ticket system handles this race.

Task submission and wake

#![allow(unused)]
fn main() {
    pub(crate) fn push_task(&self, task: FunnelTask<N, H, R>) {
        if let Some(overflow) = self.handle.push(task) {
            execute_task(self, overflow);
            return;
        }
        let mut state = self.wake_state.get();
        if P::Wake::should_notify(&mut state) {
            self.view().notify_idle();
        }
        self.wake_state.set(state);
    }
}

push goes through the policy’s queue handle. If the queue is full, the task is executed inline (Cilk overflow protocol). Otherwise, the wake strategy decides whether to notify a parked worker.

Worked example

A sum fold over tree R(A(D,E), B, C) where D, E, B, C are leaves. Thread 0 is the caller; threads 1-2 are workers.

%3cluster_t0Thread 0 (caller)cluster_t1Thread 1cluster_t2Thread 2t0_1walk_cps(R, Root)init heap_Rvisit: A=child₀B → push Walk{B, Slot{R,1}}C → push Walk{C, Slot{R,2}}set_total(3)t0_2walk_cps(A, Slot{R,0})← inlineD=child₀, E → pushset_total(2)t0_1->t0_2t1_1steal Walk{B, Slot{R,1}}leaf → fire_contdeliver → not lastt0_1->t1_1push Bt2_1steal Walk{C, Slot{R,2}}leaf → fire_contLAST for R → sweep→ fire_cont(Root)fold_done = truet0_1->t2_1push Ct0_3walk_cps(D, Slot{A,0})leaf → fire_contt0_2->t0_3t1_2steal Walk{E, Slot{A,1}}leaf → fire_contLAST for A → sweep→ fire_cont(Slot{R,0})not last for Rt0_2->t1_2push Et0_4fire_cont(Slot{A,0})deliver → not lastreturn to help loopt0_3->t0_4t1_1->t1_2

  • Thread 0 walks the left spine (R→A→D) inline
  • Thread 1 steals B, then E — becomes finalizer for A, cascades A’s result to R
  • Thread 2 steals C — becomes finalizer for R, fires Cont::Root
  • The fold completes when any thread fires Cont::Root

Cross-references