Pool and Dispatch
The pool provides persistent threads. The executor provides the work.
A thin Job struct (two words) bridges them. The dispatch function
encapsulates the full lifecycle: publish → body → seal → latch.
PoolState
#![allow(unused)]
fn main() {
pub(crate) struct PoolState {
pub shutdown: AtomicBool,
pub job_ptr: AtomicPtr<()>,
pub wake: EventCount,
/// Threads currently between loading job_ptr and returning from
/// the job call. dispatch waits for this to reach 0 before returning.
pub in_job: AtomicU32,
pub n_threads: usize,
pub dispatch_lock: Mutex<()>,
}
}
job_ptr: points to a stack-localJobduring dispatch, null otherwisein_job: threads currently in the job-handling region (the latch counter)wake: futex-basedEventCountfor thread parkingdispatch_lock: serializes folds (one fold at a time per pool)
Job
#![allow(unused)]
fn main() {
#[repr(C)]
pub(crate) struct Job {
pub call: unsafe fn(*const (), usize),
pub data: *const (),
}
}
call is a monomorphized worker_entry::<N, H, R, F, G, P> —
a concrete function pointer, not vtable dispatch. data points to
a stack-local FoldState. Two words, no allocation.
The dispatch lifecycle
#![allow(unused)]
fn main() {
// CPS lifecycle: publish → body → seal → latch.
// The body just does fold work and returns a result.
// All pool-thread synchronization is dispatch's responsibility.
pub(crate) fn dispatch<R>(state: &PoolState, job: &Job, body: impl FnOnce() -> R) -> R {
let _guard = state.dispatch_lock.lock().unwrap();
// Publish: make job visible to workers
state.job_ptr.store(job as *const Job as *mut (), Ordering::Release);
state.wake.notify_all();
// Body: caller participates in the fold
let result = body();
// Seal: prevent new workers from entering
state.job_ptr.store(std::ptr::null_mut(), Ordering::Release);
// Latch: wait for all workers to leave the job region in pool_thread.
// in_job brackets the entire load-job_ptr → call-worker_entry → return
// sequence, so in_job==0 guarantees no thread holds a reference to
// the stack-local Job or FoldState.
let mut spins = 0u32;
while state.in_job.load(Ordering::Acquire) > 0 {
spins += 1;
if spins > 5_000_000 {
panic!("dispatch latch: {} threads still in job region",
state.in_job.load(Ordering::Relaxed));
}
std::hint::spin_loop();
}
result
}
````<div class="mdbook-graphviz-output"><!-- Generated by graphviz version 2.43.0 (0) --><!-- Title: %3 Pages: 1 --><svg width="156pt" height="342pt" viewBox="0.00 0.00 156.00 342.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink"><g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 338)"><title>%3</title><polygon fill="white" stroke="transparent" points="-4,4 -4,-338 152,-338 152,4 -4,4"/><!-- publish --><g id="node1" class="node"><title>publish</title><path fill="#fff3cd" stroke="black" d="M124,-334C124,-334 24,-334 24,-334 18,-334 12,-328 12,-322 12,-322 12,-305 12,-305 12,-299 18,-293 24,-293 24,-293 124,-293 124,-293 130,-293 136,-299 136,-305 136,-305 136,-322 136,-322 136,-328 130,-334 124,-334"/><text text-anchor="middle" x="74" y="-322" font-family="sans-serif" font-size="10.00">Publish</text><text text-anchor="middle" x="74" y="-311" font-family="sans-serif" font-size="10.00">job_ptr.store(Release)</text><text text-anchor="middle" x="74" y="-300" font-family="sans-serif" font-size="10.00">wake.notify_all()</text></g><!-- body --><g id="node2" class="node"><title>body</title><path fill="#d4edda" stroke="black" d="M122.5,-257C122.5,-257 25.5,-257 25.5,-257 19.5,-257 13.5,-251 13.5,-245 13.5,-245 13.5,-228 13.5,-228 13.5,-222 19.5,-216 25.5,-216 25.5,-216 122.5,-216 122.5,-216 128.5,-216 134.5,-222 134.5,-228 134.5,-228 134.5,-245 134.5,-245 134.5,-251 128.5,-257 122.5,-257"/><text text-anchor="middle" x="74" y="-245" font-family="sans-serif" font-size="10.00">Body</text><text text-anchor="middle" x="74" y="-234" font-family="sans-serif" font-size="10.00">walk_cps + help loop</text><text text-anchor="middle" x="74" y="-223" font-family="sans-serif" font-size="10.00">(caller is a worker)</text></g><!-- publish->body --><g id="edge1" class="edge"><title>publish->body</title><path fill="none" stroke="black" d="M74,-292.79C74,-284.96 74,-275.77 74,-267.16"/><polygon fill="black" stroke="black" points="77.5,-267.07 74,-257.07 70.5,-267.07 77.5,-267.07"/></g><!-- seal --><g id="node3" class="node"><title>seal</title><path fill="#cce5ff" stroke="black" d="M136,-180C136,-180 12,-180 12,-180 6,-180 0,-174 0,-168 0,-168 0,-156 0,-156 0,-150 6,-144 12,-144 12,-144 136,-144 136,-144 142,-144 148,-150 148,-156 148,-156 148,-168 148,-168 148,-174 142,-180 136,-180"/><text text-anchor="middle" x="74" y="-165" font-family="sans-serif" font-size="10.00">Seal</text><text text-anchor="middle" x="74" y="-154" font-family="sans-serif" font-size="10.00">job_ptr.store(null, Release)</text></g><!-- body->seal --><g id="edge2" class="edge"><title>body->seal</title><path fill="none" stroke="black" d="M74,-215.69C74,-207.91 74,-198.84 74,-190.45"/><polygon fill="black" stroke="black" points="77.5,-190.32 74,-180.32 70.5,-190.32 77.5,-190.32"/></g><!-- latch --><g id="node4" class="node"><title>latch</title><path fill="#f8d7da" stroke="black" d="M123,-108C123,-108 25,-108 25,-108 19,-108 13,-102 13,-96 13,-96 13,-84 13,-84 13,-78 19,-72 25,-72 25,-72 123,-72 123,-72 129,-72 135,-78 135,-84 135,-84 135,-96 135,-96 135,-102 129,-108 123,-108"/><text text-anchor="middle" x="74" y="-93" font-family="sans-serif" font-size="10.00">Latch</text><text text-anchor="middle" x="74" y="-82" font-family="sans-serif" font-size="10.00">while in_job > 0: spin</text></g><!-- seal->latch --><g id="edge3" class="edge"><title>seal->latch</title><path fill="none" stroke="black" d="M74,-143.7C74,-135.98 74,-126.71 74,-118.11"/><polygon fill="black" stroke="black" points="77.5,-118.1 74,-108.1 70.5,-118.1 77.5,-118.1"/></g><!-- ret --><g id="node5" class="node"><title>ret</title><path fill="#d4edda" stroke="black" d="M121.5,-36C121.5,-36 26.5,-36 26.5,-36 20.5,-36 14.5,-30 14.5,-24 14.5,-24 14.5,-12 14.5,-12 14.5,-6 20.5,0 26.5,0 26.5,0 121.5,0 121.5,0 127.5,0 133.5,-6 133.5,-12 133.5,-12 133.5,-24 133.5,-24 133.5,-30 127.5,-36 121.5,-36"/><text text-anchor="middle" x="74" y="-21" font-family="sans-serif" font-size="10.00">Return result</text><text text-anchor="middle" x="74" y="-10" font-family="sans-serif" font-size="10.00">stack safe to destroy</text></g><!-- latch->ret --><g id="edge4" class="edge"><title>latch->ret</title><path fill="none" stroke="black" d="M74,-71.7C74,-63.98 74,-54.71 74,-46.11"/><polygon fill="black" stroke="black" points="77.5,-46.1 74,-36.1 70.5,-46.1 77.5,-46.1"/></g></g></svg></div>
1. **Publish**: store the `Job` pointer, wake all threads
1. **Body**: the caller participates in the fold (walk root, help loop)
1. **Seal**: clear `job_ptr` — no new threads can enter
1. **Latch**: spin until `in_job == 0` — all threads have left the
job-handling region
1. **Return**: the `Job` and `FoldState` on the stack are safe to drop
The body knows nothing about pool lifecycle — it’s pure fold logic.
All synchronization is dispatch’s responsibility.
# Pool thread
````rust
fn pool_thread(state: &PoolState, thread_idx: usize) {
let mut last_epoch = 0u32;
loop {
loop {
let token = state.wake.prepare();
if state.shutdown.load(Ordering::Acquire) { return; }
if token.epoch() > last_epoch {
last_epoch = token.epoch();
break;
}
state.wake.wait(token);
}
// in_job MUST be incremented BEFORE loading job_ptr.
// This closes the TOCTOU gap: the body cannot return (destroying
// the Job/FoldState on the stack) while any thread is between
// loading job_ptr and finishing the job call.
state.in_job.fetch_add(1, Ordering::Acquire);
let ptr = state.job_ptr.load(Ordering::Acquire);
if !ptr.is_null() {
// SAFETY: non-null ptr was published by dispatch, which
// holds the dispatch_lock and does not seal (nor drop the
// Job) until `in_job` returns to zero. We incremented
// `in_job` before loading ptr, so the seal cannot have
// happened yet — the referent is live.
let job = unsafe { &*(ptr as *const Job) };
// SAFETY: `job.call` is the worker_entry function for the
// matching FoldState; `job.data` is a `*const FoldState<…>`
// cast erased at the Job boundary. The caller (dispatch)
// guarantees the FoldState is live for the duration of
// this call via the same `in_job` latch.
unsafe { (job.call)(job.data, thread_idx); }
}
state.in_job.fetch_sub(1, Ordering::Release);
}
}
}
The critical ordering: in_job increment happens before
job_ptr load. This closes the TOCTOU gap:
Without this ordering, a thread could load job_ptr (valid), then
the body returns and destroys the stack, then the thread dereferences
the destroyed pointer → SIGSEGV. The in_job counter makes the
thread visible to the latch before it touches the pointer.
run_fold
#![allow(unused)]
fn main() {
pub(crate) fn run_fold<N, H, R, F, G, P: FunnelPolicy>(
fold: &F, graph: &G, root: &N,
pool_state: &PoolState, spec: &Spec<P>,
) -> R
where
F: FoldOps<N, H, R> + 'static, G: TreeOps<N> + 'static,
N: Clone + Send + 'static, H: 'static, R: Send + 'static,
{
let store = P::Queue::create_store(&spec.queue, pool_state.n_threads);
let chain_arena = Arena::<ChainNode<H, R>>::new();
let cont_arena = ContArena::<Cont<H, R>>::new();
let root_cell = RootCell::new();
let view = FoldView {
pool_state,
fold_done: AtomicBool::new(false),
idle_count: AtomicU32::new(0),
n_workers: pool_state.n_threads,
};
let ctx = WalkCtx {
fold,
graph,
view: &view,
chain_arena: &chain_arena,
cont_arena: &cont_arena,
_policy: std::marker::PhantomData,
};
let state = FoldState::<N, H, R, F, G, P> {
ctx: &ctx,
store: &store,
};
// The ONE unsafe boundary: erase typed FoldState to *const () for the Job.
let job = Job {
call: worker_entry::<N, H, R, F, G, P>,
data: &state as *const FoldState<N, H, R, F, G, P> as *const (),
};
dispatch(pool_state, &job, || {
let caller_idx = view.n_workers;
let handle = P::Queue::handle(&store, caller_idx);
let wake_state = Cell::new(P::Wake::init_state(&spec.wake));
let wctx = WorkerCtx::<N, H, R, F, G, P> { ctx: &ctx, handle, wake_state };
walk_cps(&wctx, root.clone(), Cont::Root(&root_cell as *const RootCell<R>));
let mut spins = 0u64;
while !root_cell.is_done() {
if let Some(task) = wctx.handle.try_acquire() {
execute_task(&wctx, task);
spins = 0;
} else {
spins += 1;
if spins > 10_000_000 {
panic!("run_fold hung: root_done={}", root_cell.is_done());
}
std::hint::spin_loop();
}
}
root_cell.take()
})
}
}
Creates per-fold state (store, arenas, root cell, view, context),
erases it to *const () for the Job, and delegates to dispatch.
The body walks the root and help-loops until root_cell.is_done().
Scoped pool
Pool::with(n, |pool| ...) uses std::thread::scope — threads
are joined when the closure returns. No leaked threads, no lifetime
footguns.
The pool is the executor’s Resource (defined by the Resource
GAT on ExecutorSpec). It can be provided explicitly via .attach(),
or created internally by .run() / .session():
#![allow(unused)]
fn main() {
use hylic::prelude::*;
// One-shot: pool created + destroyed per fold
exec(funnel::Spec::default(8)).run(&fold, &graph, &root);
// Session: pool shared across folds
exec(funnel::Spec::default(8)).session(|s| {
s.run(&fold, &graph, &root);
s.run(&fold, &graph, &root);
});
// Explicit attach: manual pool, multiple policies
funnel::Pool::with(8, |pool| {
let pw = exec(funnel::Spec::default(8)).attach(pool);
let sh = exec(funnel::Spec::for_wide_light(8)).attach(pool);
pw.run(&fold, &graph, &root);
sh.run(&fold, &graph, &root);
});
}
Thread spawn/join cost is paid once per pool scope. Each .run()
allocates working memory fresh — only threads are shared.