CPS Walk: The Downward Pass
walk_cps is the core of the funnel executor. It processes one node
at a time: initializes the fold heap, iterates children through the
graph’s push-based visitor, and branches on the child count. It is a
void function — results flow through continuations, not return
values. This is what makes cross-thread result delivery possible
without blocking.
The algorithm
#![allow(unused)]
fn main() {
pub(crate) fn walk_cps<N, H, R, F, G, P: FunnelPolicy>(
wctx: &WorkerCtx<N, H, R, F, G, P>,
mut node: N,
mut cont: Cont<H, R>,
) where
F: FoldOps<N, H, R> + 'static,
G: TreeOps<N> + 'static,
N: Clone + Send + 'static,
H: 'static,
R: Send + 'static,
{
let ctx = wctx.ctx;
loop {
let fold = ctx.fold_ref();
let graph = ctx.graph_ref();
let chain_arena = ctx.chain_arena();
let cont_arena = ctx.cont_arena();
let heap = fold.init(&node);
let mut child_count = 0u32;
let mut first_child: Option<N> = None;
let mut chain_idx: Option<super::super::infra::arena::ArenaIdx> = None;
let mut heap_opt = Some(heap);
let mut cont_opt = Some(cont);
wctx.reset_wake();
graph.visit(&node, &mut |child: &N| {
child_count += 1;
if child_count == 1 {
first_child = Some(child.clone());
} else {
if child_count == 2 {
let cn = ChainNode::new(heap_opt.take().unwrap(), cont_opt.take().unwrap());
let idx = chain_arena.alloc(cn);
// SAFETY: idx was just returned by chain_arena.alloc
// (or a prior iteration within this visit closure) and
// the arena lives for the pool duration.
let node_ref = unsafe { chain_arena.get(idx) };
node_ref.chain.append_slot();
chain_idx = Some(idx);
}
let idx = chain_idx.unwrap();
// SAFETY: idx was just returned by chain_arena.alloc
// (or a prior iteration within this visit closure) and
// the arena lives for the pool duration.
let node_ref = unsafe { chain_arena.get(idx) };
let slot = node_ref.chain.append_slot();
wctx.push_task(FunnelTask::Walk {
child: child.clone(),
cont: Cont::Slot { node: idx, slot },
});
}
});
match child_count {
0 => {
let heap = heap_opt.take().unwrap();
let cont = cont_opt.take().unwrap();
let result = fold.finalize(&heap);
fire_cont::<N, H, R, F, G, P>(ctx, cont, result);
return;
}
1 => {
let child = first_child.unwrap();
let heap = heap_opt.take().unwrap();
let parent_cont = cont_opt.take().unwrap();
let parent_idx = cont_arena.alloc(parent_cont);
node = child;
cont = Cont::Direct { heap, parent_idx };
}
_ => {
let idx = chain_idx.unwrap();
// SAFETY: idx came from chain_arena.alloc above.
let cn = unsafe { chain_arena.get(idx) };
let fold = ctx.fold_ref();
let set_total_result = P::Accumulate::set_total(&cn.chain, fold);
if let Some(finalized) = set_total_result {
let parent = cn.take_parent_cont();
fire_cont::<N, H, R, F, G, P>(ctx, parent, finalized);
return;
}
let child = first_child.unwrap();
node = child;
cont = Cont::Slot { node: idx, slot: SlotRef(0) };
}
}
}
}
}
The function takes (wctx, node, cont):
wctx: per-worker context (queue handle + wake state)node: the graph node to processcont: what to do with this node’s result
It loops (trampolined for the inline child case), processing one node per iteration.
Child-count branching
After graph.visit returns, the child count determines the control flow:
Leaf (0 children): Finalize the heap and call
fire_cont with the original continuation. This is
the base case — the upward cascade begins here.
Single child (1): No ChainNode needed. The heap moves into a
Cont::Direct, the parent continuation is stored in the ContArena,
and the loop continues with the child. Zero queue interaction, zero
atomic operations.
Multi-child (2+): A ChainNode is allocated in the arena
(lazily, on child 2 — not child 1). Children 1..K are pushed as
FunnelTask::Walk to the queue. Then set_total records the child
count in the ticket system. The loop continues
with child 0 (inline walk).
First-child inlining
Child 0 is ALWAYS walked inline — a continuation of the current thread’s DFS spine, with zero queue overhead. Siblings are pushed to the queue for workers to steal. This gives every active thread a guaranteed DFS path from its entry point to a leaf:
Red edges = inline walks (zero queue cost). Dashed = queue submissions. Thread 0 walks root → c0 → c00 → … → leaf without touching the queue at any level. This is structurally equivalent to Cilk’s continuation-stealing, inverted: we push sibling tasks (child stealing) instead of stealing the parent’s continuation.
Three compounding effects make this critical:
- Zero-queue spine. For depth D, one thread processes D nodes with no push/pop overhead (~20-50ns saved per level).
- Cache warmth.
ChainNodes allocated on the way down are in L1 cache on the way up viafire_cont. - Reduced contention. One fewer task per level competing for deque access.
Defunctionalization
Tasks are data, not closures:
#![allow(unused)]
fn main() {
pub enum FunnelTask<N, H, R> {
Walk { child: N, cont: Cont<H, R> },
}
}
FunnelTask::Walk pairs a child node with its continuation — plain
data stored inline in deque slots. No Box<dyn FnOnce>, no closure
capture, no vtable. The execute_task function is the apply:
#![allow(unused)]
fn main() {
pub(crate) fn execute_task<N, H, R, F, G, P: FunnelPolicy>(
wctx: &WorkerCtx<N, H, R, F, G, P>,
task: FunnelTask<N, H, R>,
) where
F: FoldOps<N, H, R> + 'static,
G: TreeOps<N> + 'static,
N: Clone + Send + 'static,
H: 'static,
R: Send + 'static,
{
match task {
FunnelTask::Walk { child, cont } => walk_cps(wctx, child, cont),
}
}
}
This is the Reynolds/Danvy defunctionalization transformation applied to parallel work items.
Streaming submission
Children are pushed to the queue during graph.visit, not after.
Workers can steal siblings while the parent is still discovering
more children. append_slot is called per child inside the callback;
set_total is called after graph.visit returns. Between these two
events, workers may deliver results to already-appended slots. The
ticket system handles this race.
Task submission and wake
#![allow(unused)]
fn main() {
pub(crate) fn push_task(&self, task: FunnelTask<N, H, R>) {
if let Some(overflow) = self.handle.push(task) {
execute_task(self, overflow);
return;
}
let mut state = self.wake_state.get();
if P::Wake::should_notify(&mut state) {
self.view().notify_idle();
}
self.wake_state.set(state);
}
}
push goes through the policy’s queue handle. If the queue is
full, the task is executed inline (Cilk overflow protocol). Otherwise,
the wake strategy decides whether to notify a parked worker.
Worked example
A sum fold over tree R(A(D,E), B, C) where D, E, B, C are leaves.
Thread 0 is the caller; threads 1-2 are workers.
- Thread 0 walks the left spine (R→A→D) inline
- Thread 1 steals B, then E — becomes finalizer for A, cascades A’s result to R
- Thread 2 steals C — becomes finalizer for R, fires
Cont::Root - The fold completes when any thread fires
Cont::Root
Cross-references
- Continuations —
Cont,FunnelTask,ChainNode - Cascade —
fire_cont: the trampolined upward pass - Ticket system — how
set_totaldetermines the finalizer - Queue strategies — how
push_taskdispatches to PerWorker or Shared