solidc
Robust collection of general-purpose cross-platform C libraries and data structures designed for rapid and safe development in C
Loading...
Searching...
No Matches
threadpool.c
1#ifndef _GNU_SOURCE
2#define _GNU_SOURCE
3#endif
4
5#include "../include/threadpool.h"
6
7#include "../include/align.h"
8#include "../include/aligned_alloc.h"
9#include "../include/lock.h"
10#include "../include/thread.h"
11
12#include <stdatomic.h>
13#include <stdint.h>
14#include <stdio.h>
15#include <stdlib.h>
16#include <time.h>
17
18#ifdef _WIN32
19#include <windows.h>
20#define thread_yield() SwitchToThread()
21#else
22#include <sched.h>
23#include <unistd.h>
24#define thread_yield() sched_yield()
25#endif
26
27/*
28 * ============================================================================
29 * Work-Stealing Threadpool — Chase-Lev Deque
30 * ============================================================================
31 *
32 * BUG / PERF HISTORY
33 * ------------------
34 * Bug #1: External thread + worker both called deque_push_bottom on the same
35 * deque concurrently → corrupted bottom.
36 * Fix: external threads use a separate GlobalQueue (mutex-protected).
37 *
38 * Bug #2: tls_worker_index not reset between pool lifetimes → stale index
39 * used against a new pool's workers[] → out-of-bounds access.
40 * Fix: TLS only marks worker identity; external threads use GlobalQueue.
41 *
42 * Bug #3: Workers called try_steal() before pool->workers[] was fully written,
43 * reading NULL pointers → SIGSEGV in deque_steal_top.
44 * Fix: workers_ready startup barrier; every worker spins until
45 * workers_ready == num_workers before entering the main loop.
46 *
47 * Perf #1: gq_pull_batch pulled only 1 task per mutex acquisition.
48 * With 8 workers all draining the global queue, that is 8 mutex
49 * round-trips per 8 tasks — O(N) lock acquisitions per batch.
50 * Fix: pull up to BATCH_SIZE tasks per acquisition, push extras
51 * into the worker's own deque so subsequent iterations are lock-free.
52 *
53 * Perf #2: deque_pop_bottom used `b == 0` as the empty guard. After any
54 * non-trivial run bottom and top both sit at some value > 0, so the
55 * guard never triggers. Every idle loop iteration still executed the
56 * full seq_cst store + seq_cst load + comparison on an empty deque.
57 * Fix: check `b == t` BEFORE decrementing bottom. If already empty,
58 * return false immediately with zero atomic RMWs.
59 *
60 * Perf #3: unpark_one() acquired park_lock on every submission even when
61 * workers are active. The num_parked fast-path skips the lock when
62 * nobody is parked, which is fine — but on the transition from busy
63 * to idle each submit still pays a lock round-trip.
64 * Fix: no change to unpark_one; the real gain is in Perf #1 because
65 * pulling BATCH_SIZE tasks at once means unpark_one is called once
66 * per batch rather than once per task.
67 *
68 * Perf #4: threadpool_submit() is called once per task from external threads.
69 * Each call acquires gq_mutex, writes one task, releases it — O(N)
70 * mutex round-trips for N tasks. At 10M tasks that is 10M separate
71 * mutex acquisitions on the submission side alone.
72 * Fix: threadpool_submit_batch() accepts an array of (function, arg)
73 * pairs and writes up to GLOBAL_Q_SIZE slots under a single mutex
74 * acquisition, flushing in chunks if the batch exceeds queue capacity.
75 * This reduces submission mutex acquisitions from O(N) to
76 * O(N / GLOBAL_Q_SIZE) — a 16384× reduction for large workloads.
77 *
78 * ============================================================================
79 * DESIGN
80 * ============================================================================
81 *
82 * Every worker owns a private Chase-Lev deque:
83 * OWNER — pushes/pops at BOTTOM (no lock, no CAS; just a release store)
84 * THIEVES — steal from TOP (one seq_cst CAS per attempt)
85 *
86 * bottom and top live on separate cache lines (no false sharing between owner
87 * and thieves).
88 *
89 * bottom ──────────────────────────────>
90 * [ tN | ... | t1 | t0 ]
91 * <─── top
92 *
93 * External submitters push to a GlobalQueue (single mutex).
94 * Workers drain it in batches during their steal scan.
95 *
96 * MEMORY ORDERS
97 * deque_push_bottom: task[b]=relaxed, bottom=release
98 * deque_pop_bottom: early-exit if b==t (no atomics); else bottom=seq_cst,
99 * top=seq_cst, last-item CAS=seq_cst
100 * deque_steal_top: top=acquire, bottom=acquire, CAS=seq_cst
101 * ============================================================================
102 */
103
104#define DEQUE_SIZE (1u << 12) /* 4096 slots per private deque */
105#define DEQUE_MASK (DEQUE_SIZE - 1)
106#define GLOBAL_Q_SIZE (1u << 14) /* 16384 slots in the global queue */
107#define GLOBAL_Q_MASK (GLOBAL_Q_SIZE - 1)
108
109#ifndef BATCH_SIZE
110#define BATCH_SIZE 64
111#endif
112
113#define CACHE_LINE_SIZE 64
114#define YIELD_THRESHOLD 8
115
116#define CACHE_ALIGNED ALIGN(CACHE_LINE_SIZE)
117
118typedef enum { STEAL_SUCCESS, STEAL_EMPTY, STEAL_ABORT } StealResult;
119
120/* ── Chase-Lev private deque ─────────────────────────────────────────────── */
121typedef struct {
122 CACHE_ALIGNED atomic_size_t bottom;
123 CACHE_ALIGNED atomic_size_t top;
124 CACHE_ALIGNED Task tasks[DEQUE_SIZE];
125} WorkStealDeque;
126
127/* ── Global submission queue ─────────────────────────────────────────────── */
128typedef struct {
129 CACHE_ALIGNED Lock mutex;
130 CACHE_ALIGNED atomic_uint head;
131 CACHE_ALIGNED atomic_uint tail;
132 CACHE_ALIGNED Task tasks[GLOBAL_Q_SIZE];
133 Condition not_full;
134 struct Threadpool* pool;
135} GlobalQueue;
136
137/* ── Per-worker ──────────────────────────────────────────────────────────── */
138typedef struct worker {
139 CACHE_ALIGNED WorkStealDeque deque;
140 CACHE_ALIGNED Thread pthread;
141 CACHE_ALIGNED size_t index;
142 struct Threadpool* pool;
143} worker;
144
145/* ── Threadpool ──────────────────────────────────────────────────────────── */
146struct Threadpool {
147 CACHE_ALIGNED atomic_int shutdown;
148 CACHE_ALIGNED atomic_int num_threads_alive;
149
150 CACHE_ALIGNED worker** workers;
151 CACHE_ALIGNED size_t num_workers;
152
153 /* Startup barrier — see Bug #3 fix. */
154 CACHE_ALIGNED atomic_size_t workers_ready;
155
156 CACHE_ALIGNED GlobalQueue gq;
157
158 /* Parking */
159 CACHE_ALIGNED Lock park_lock;
160 CACHE_ALIGNED atomic_int num_parked;
161 Condition work_available;
162
163 /* Idle detection */
164 CACHE_ALIGNED Lock idle_lock;
165 CACHE_ALIGNED atomic_int num_active;
166 Condition all_idle;
167};
168
169/* TLS: SIZE_MAX = external thread; anything else = worker index. */
170static _Thread_local size_t tls_worker_index = SIZE_MAX;
171
172/* ============================================================================
173 * Global queue
174 * ============================================================================ */
175
176static void gq_init(GlobalQueue* gq, struct Threadpool* pool) {
177 lock_init(&gq->mutex);
178 cond_init(&gq->not_full);
179 atomic_store_explicit(&gq->head, 0, memory_order_relaxed);
180 atomic_store_explicit(&gq->tail, 0, memory_order_relaxed);
181 gq->pool = pool;
182}
183
184static void gq_destroy(GlobalQueue* gq) {
185 lock_free(&gq->mutex);
186 cond_free(&gq->not_full);
187}
188
189/* Push one task. Blocks if full (only happens if all 16384 slots are in-flight). */
190static bool gq_push(GlobalQueue* gq, Task task) {
191 lock_acquire(&gq->mutex);
192
193 while (((atomic_load_explicit(&gq->head, memory_order_relaxed) + 1) & GLOBAL_Q_MASK) ==
194 atomic_load_explicit(&gq->tail, memory_order_relaxed)) {
195 if (atomic_load_explicit(&gq->pool->shutdown, memory_order_acquire)) {
196 lock_release(&gq->mutex);
197 return false;
198 }
199 cond_wait(&gq->not_full, &gq->mutex);
200 }
201
202 uint32_t h = atomic_load_explicit(&gq->head, memory_order_relaxed);
203 gq->tasks[h] = task;
204 atomic_store_explicit(&gq->head, (h + 1) & GLOBAL_Q_MASK, memory_order_release);
205 lock_release(&gq->mutex);
206 return true;
207}
208
209/*
210 * gq_push_batch — push up to `count` tasks in one mutex acquisition.
211 *
212 * Perf #4 fix: the single-task gq_push pays the full mutex round-trip for
213 * every task. With 10M external submissions that is 10M lock acquisitions.
214 *
215 * Here we hold the lock for the entire write of min(count, free_slots) tasks,
216 * releasing only when the queue is full (at which point we wait on not_full
217 * and continue from where we left off). The caller sees a single logical
218 * operation even if it internally flushes in multiple chunks.
219 *
220 * Returns the number of tasks successfully pushed (== count unless shutdown).
221 */
222static size_t gq_push_batch(GlobalQueue* gq, Task* tasks, size_t count) {
223 size_t pushed = 0;
224
225 while (pushed < count) {
226 lock_acquire(&gq->mutex);
227
228 /* Wait while full. */
229 while (((atomic_load_explicit(&gq->head, memory_order_relaxed) + 1) & GLOBAL_Q_MASK) ==
230 atomic_load_explicit(&gq->tail, memory_order_relaxed)) {
231 if (atomic_load_explicit(&gq->pool->shutdown, memory_order_acquire)) {
232 lock_release(&gq->mutex);
233 return pushed;
234 }
235 cond_wait(&gq->not_full, &gq->mutex);
236 }
237
238 /*
239 * Write as many tasks as fit without releasing the lock.
240 * free_slots = GLOBAL_Q_SIZE - 1 - current occupancy (one slot
241 * wasted as the full/empty sentinel).
242 */
243 uint32_t h = atomic_load_explicit(&gq->head, memory_order_relaxed);
244 uint32_t t = atomic_load_explicit(&gq->tail, memory_order_relaxed);
245 size_t free = (GLOBAL_Q_SIZE - 1) - ((h - t) & GLOBAL_Q_MASK);
246 size_t todo = count - pushed;
247 size_t n = todo < free ? todo : free;
248
249 for (size_t i = 0; i < n; i++) {
250 gq->tasks[(h + i) & GLOBAL_Q_MASK] = tasks[pushed + i];
251 }
252 atomic_store_explicit(&gq->head, (h + (uint32_t)n) & GLOBAL_Q_MASK, memory_order_release);
253 pushed += n;
254
255 lock_release(&gq->mutex);
256 }
257
258 return pushed;
259}
260
261/*
262 * gq_pull_batch — pull up to `max` tasks in one mutex acquisition.
263 *
264 * Perf #1 fix: the previous version pulled only 1 task per call. With N
265 * workers all hitting the global queue, that was N mutex acquisitions per
266 * N tasks — every task paid the full lock round-trip.
267 *
268 * Now we pull min(available, max) tasks at once. The caller pushes the
269 * extras into its own private deque so subsequent iterations are lock-free.
270 *
271 * Returns the number of tasks pulled (0 = empty, -1 = shutdown).
272 */
273static int gq_pull_batch(GlobalQueue* gq, Task* out, size_t max) {
274 lock_acquire(&gq->mutex);
275
276 uint32_t h = atomic_load_explicit(&gq->head, memory_order_relaxed);
277 uint32_t t = atomic_load_explicit(&gq->tail, memory_order_relaxed);
278
279 if (h == t) {
280 lock_release(&gq->mutex);
281 return atomic_load_explicit(&gq->pool->shutdown, memory_order_acquire) ? -1 : 0;
282 }
283
284 size_t available = (h - t) & GLOBAL_Q_MASK;
285 size_t to_take = available < max ? available : max;
286
287 for (size_t i = 0; i < to_take; i++) {
288 out[i] = gq->tasks[(t + i) & GLOBAL_Q_MASK];
289 }
290 atomic_store_explicit(&gq->tail, (t + (uint32_t)to_take) & GLOBAL_Q_MASK, memory_order_release);
291
292 /* Signal any blocked producers now that slots are free. */
293 cond_broadcast(&gq->not_full);
294 lock_release(&gq->mutex);
295 return (int)to_take;
296}
297
298/* ============================================================================
299 * Chase-Lev deque
300 * ============================================================================ */
301
302static void deque_init(WorkStealDeque* dq) {
303 atomic_store_explicit(&dq->bottom, 0, memory_order_relaxed);
304 atomic_store_explicit(&dq->top, 0, memory_order_relaxed);
305}
306
307/* Owner only. Returns false if full (DEQUE_SIZE=4096; should never happen). */
308static bool deque_push_bottom(WorkStealDeque* dq, Task task) {
309 size_t b = atomic_load_explicit(&dq->bottom, memory_order_relaxed);
310 size_t t = atomic_load_explicit(&dq->top, memory_order_acquire);
311
312 if (b - t >= DEQUE_SIZE) return false;
313
314 dq->tasks[b & DEQUE_MASK] = task;
315 atomic_store_explicit(&dq->bottom, b + 1, memory_order_release);
316 return true;
317}
318
319/*
320 * deque_pop_bottom — owner reclaims its own work.
321 *
322 * Perf #2 fix: the previous version used `if (b == 0) return false` as the
323 * empty guard. After any non-trivial run, bottom and top are both nonzero
324 * but equal (deque empty), so that guard never fired. Every idle iteration
325 * still executed the seq_cst store + seq_cst load before discovering the
326 * deque was empty.
327 *
328 * Correct fix: read both bottom and top first. If b == t the deque is empty
329 * and we return immediately with zero atomic RMWs — no seq_cst traffic on the
330 * cache line at all. Only when b > t do we proceed with the full protocol.
331 */
332static bool deque_pop_bottom(WorkStealDeque* dq, Task* out) {
333 size_t b = atomic_load_explicit(&dq->bottom, memory_order_relaxed);
334 size_t t = atomic_load_explicit(&dq->top, memory_order_acquire);
335
336 /* Fast-path empty check: no atomics, no memory barriers needed. */
337 if (b == t) return false;
338
339 b--;
340
341 /*
342 * seq_cst store: must be totally ordered with any thief's seq_cst CAS
343 * on top. This is the linearisation point for the owner's pop.
344 */
345 atomic_store_explicit(&dq->bottom, b, memory_order_seq_cst);
346 t = atomic_load_explicit(&dq->top, memory_order_seq_cst);
347
348 if ((ptrdiff_t)(b - t) > 0) {
349 /* More than one item: no race with thieves is possible. */
350 *out = dq->tasks[b & DEQUE_MASK];
351 return true;
352 }
353
354 if (b == t) {
355 /* Exactly one item: race with at most one thief via CAS. */
356 *out = dq->tasks[b & DEQUE_MASK];
357 size_t expected = t;
358 bool won = atomic_compare_exchange_strong_explicit(&dq->top, &expected, t + 1, memory_order_seq_cst,
359 memory_order_relaxed);
360 /* Restore bottom to a consistent empty state regardless of outcome. */
361 atomic_store_explicit(&dq->bottom, b + 1, memory_order_relaxed);
362 return won;
363 }
364
365 /* b < t: a thief stole the last item while we were decrementing. */
366 atomic_store_explicit(&dq->bottom, b + 1, memory_order_relaxed);
367 return false;
368}
369
370/* Any thread except the owner. */
371static StealResult deque_steal_top(WorkStealDeque* dq, Task* out) {
372 size_t t = atomic_load_explicit(&dq->top, memory_order_acquire);
373 size_t b = atomic_load_explicit(&dq->bottom, memory_order_acquire);
374
375 if ((ptrdiff_t)(b - t) <= 0) return STEAL_EMPTY;
376
377 *out = dq->tasks[t & DEQUE_MASK];
378
379 if (!atomic_compare_exchange_strong_explicit(&dq->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) {
380 return STEAL_ABORT;
381 }
382 return STEAL_SUCCESS;
383}
384
385/* ============================================================================
386 * Parking
387 * ============================================================================ */
388
389static void worker_park(Threadpool* pool) {
390 atomic_fetch_add_explicit(&pool->num_parked, 1, memory_order_relaxed);
391 lock_acquire(&pool->park_lock);
392
393 /*
394 * Re-check under lock: work or shutdown could have arrived between our
395 * decision to park and acquiring park_lock. The signal would have been
396 * sent before we could receive it. Skipping cond_wait here avoids
397 * sleeping on a non-empty queue (lost-wakeup prevention).
398 */
399 bool all_empty = true;
400 {
401 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_acquire);
402 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_acquire);
403 if (h != t) all_empty = false;
404 }
405 for (size_t i = 0; i < pool->num_workers && all_empty; i++) {
406 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_acquire);
407 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_acquire);
408 if ((ptrdiff_t)(b - t) > 0) all_empty = false;
409 }
410
411 if (all_empty && !atomic_load_explicit(&pool->shutdown, memory_order_acquire)) {
412 cond_wait(&pool->work_available, &pool->park_lock);
413 }
414
415 lock_release(&pool->park_lock);
416 atomic_fetch_sub_explicit(&pool->num_parked, 1, memory_order_relaxed);
417}
418
419static inline void unpark_one(Threadpool* pool) {
420 if (atomic_load_explicit(&pool->num_parked, memory_order_relaxed) > 0) {
421 lock_acquire(&pool->park_lock);
422 cond_signal(&pool->work_available);
423 lock_release(&pool->park_lock);
424 }
425}
426
427static inline void unpark_all(Threadpool* pool) {
428 lock_acquire(&pool->park_lock);
429 cond_broadcast(&pool->work_available);
430 lock_release(&pool->park_lock);
431}
432
433/* ============================================================================
434 * Worker thread
435 * ============================================================================ */
436
437/*
438 * try_steal — attempt to find work from another worker or the global queue.
439 *
440 * Private deques are scanned first in randomised order (xorshift64 per
441 * worker, no shared state). The global queue is last: it requires a mutex
442 * so we only pay that cost after exhausting all lock-free steal attempts.
443 *
444 * When the global queue has tasks, we pull a full BATCH_SIZE chunk in one
445 * mutex acquisition (Perf #1 fix). The first task is returned immediately;
446 * the remaining tasks are pushed into the caller's own deque so subsequent
447 * iterations are purely lock-free pops from bottom.
448 */
449static bool try_steal(worker* self, Task* out) {
450 Threadpool* pool = self->pool;
451 size_t n = pool->num_workers;
452
453 /* xorshift64: register-local, zero synchronisation cost. */
454 static _Thread_local uint64_t rng = 0;
455 if (rng == 0) rng = ((uint64_t)self->index + 1) * 6364136223846793005ULL;
456 rng ^= rng << 13;
457 rng ^= rng >> 7;
458 rng ^= rng << 17;
459 size_t start = (size_t)(rng % n);
460
461 /* 1. Scan private deques (lock-free). */
462 for (size_t i = 0; i < n; i++) {
463 size_t v = (start + i) % n;
464 if (v == self->index) continue;
465 StealResult r = deque_steal_top(&pool->workers[v]->deque, out);
466 if (r == STEAL_SUCCESS) return true;
467 }
468
469 /*
470 * 2. Drain global queue in a batch.
471 *
472 * Pull up to BATCH_SIZE tasks in one mutex acquisition. Distribute
473 * them: return tasks[0] to the caller immediately, push tasks[1..N-1]
474 * into our own deque so they are consumed lock-free on subsequent
475 * iterations. One mutex lock amortised over up to 64 tasks.
476 */
477 Task batch[BATCH_SIZE];
478 int got = gq_pull_batch(&pool->gq, batch, BATCH_SIZE);
479 if (got <= 0) return false;
480
481 /* Push extras into own deque (owner-only, no lock needed). */
482 for (int i = 1; i < got; i++) {
483 if (!deque_push_bottom(&self->deque, batch[i])) {
484 /*
485 * Own deque full (extremely unlikely at DEQUE_SIZE=4096).
486 * Push back the remaining tasks to the global queue so they
487 * are not lost. We accept the lock cost here as a rare path.
488 */
489 for (int j = i; j < got; j++) {
490 gq_push(&pool->gq, batch[j]);
491 }
492 break;
493 }
494 }
495
496 *out = batch[0];
497 return true;
498}
499
500static void* worker_thread(void* arg) {
501 worker* self = (worker*)arg;
502 Threadpool* pool = self->pool;
503 Task task;
504 int spin = 0;
505
506 tls_worker_index = self->index;
507
508 /*
509 * Startup barrier (Bug #3 fix).
510 *
511 * Increment workers_ready to signal we have started, then spin until
512 * every slot of pool->workers[] is written. The acquire on the final
513 * load synchronises with the release stores inside each worker's own
514 * fetch_add, ensuring we see all pool->workers[] writes before
515 * proceeding into try_steal().
516 */
517 atomic_fetch_add_explicit(&pool->workers_ready, 1, memory_order_release);
518 while (atomic_load_explicit(&pool->workers_ready, memory_order_acquire) < pool->num_workers) {
519 thread_yield();
520 }
521
522 atomic_fetch_add_explicit(&pool->num_threads_alive, 1, memory_order_relaxed);
523
524 while (!atomic_load_explicit(&pool->shutdown, memory_order_acquire)) {
525 /* Own deque first — zero contention, best cache locality. */
526 if (deque_pop_bottom(&self->deque, &task)) goto execute;
527
528 /* Steal or drain global queue. */
529 if (try_steal(self, &task)) goto execute;
530
531 /* Brief spin before paying the parking overhead. */
532 if (spin < YIELD_THRESHOLD) {
533 spin++;
534 thread_yield();
535 continue;
536 }
537
538 spin = 0;
539 worker_park(pool);
540 continue;
541
542 execute:
543 spin = 0;
544 atomic_fetch_add_explicit(&pool->num_active, 1, memory_order_relaxed);
545 task.function(task.arg);
546 int active = atomic_fetch_sub_explicit(&pool->num_active, 1, memory_order_acq_rel) - 1;
547 if (active == 0) {
548 /* Potential transition to fully idle — check and signal. */
549 bool any_work = false;
550 {
551 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_relaxed);
552 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_relaxed);
553 if (h != t) any_work = true;
554 }
555 for (size_t i = 0; i < pool->num_workers && !any_work; i++) {
556 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_relaxed);
557 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_relaxed);
558 if ((ptrdiff_t)(b - t) > 0) any_work = true;
559 }
560 if (!any_work) {
561 lock_acquire(&pool->idle_lock);
562 if (atomic_load_explicit(&pool->num_active, memory_order_relaxed) == 0) {
563 cond_broadcast(&pool->all_idle);
564 }
565 lock_release(&pool->idle_lock);
566 }
567 }
568 }
569
570 int alive = atomic_fetch_sub_explicit(&pool->num_threads_alive, 1, memory_order_acq_rel) - 1;
571 if (alive == 0) {
572 lock_acquire(&pool->idle_lock);
573 cond_broadcast(&pool->all_idle);
574 lock_release(&pool->idle_lock);
575 }
576
577 return NULL;
578}
579
580/* ============================================================================
581 * Worker init
582 * ============================================================================ */
583
584static int worker_init(Threadpool* pool, worker** w, size_t index) {
585 *w = (worker*)ALIGNED_ALLOC(CACHE_LINE_SIZE, sizeof(worker));
586 if (!*w) return -1;
587 (*w)->pool = pool;
588 (*w)->index = index;
589 deque_init(&(*w)->deque);
590 return thread_create(&(*w)->pthread, worker_thread, *w);
591}
592
593/* ============================================================================
594 * Public API
595 * ============================================================================ */
596
597Threadpool* threadpool_create(size_t num_threads) {
598 if (num_threads == 0) num_threads = 1;
599
600 Threadpool* pool = (Threadpool*)ALIGNED_ALLOC(CACHE_LINE_SIZE, sizeof(Threadpool));
601 if (!pool) return NULL;
602
603 atomic_store_explicit(&pool->shutdown, 0, memory_order_relaxed);
604 atomic_store_explicit(&pool->num_threads_alive, 0, memory_order_relaxed);
605 atomic_store_explicit(&pool->num_parked, 0, memory_order_relaxed);
606 atomic_store_explicit(&pool->num_active, 0, memory_order_relaxed);
607 atomic_store_explicit(&pool->workers_ready, 0, memory_order_relaxed);
608
609 pool->num_workers = num_threads;
610 gq_init(&pool->gq, pool);
611
612 lock_init(&pool->park_lock);
613 cond_init(&pool->work_available);
614 lock_init(&pool->idle_lock);
615 cond_init(&pool->all_idle);
616
617 pool->workers = (worker**)malloc(num_threads * sizeof(worker*));
618 if (!pool->workers) {
619 gq_destroy(&pool->gq);
620 free(pool);
621 return NULL;
622 }
623
624 /* Zero array before spawning; see Bug #3 fix. */
625 for (size_t i = 0; i < num_threads; i++) pool->workers[i] = NULL;
626
627 for (size_t i = 0; i < num_threads; i++) {
628 if (worker_init(pool, &pool->workers[i], i) != 0) {
629 /* Release barrier so partially-started workers can exit. */
630 atomic_store_explicit(&pool->workers_ready, num_threads, memory_order_release);
631 pool->num_workers = i;
632 threadpool_destroy(pool, -1);
633 return NULL;
634 }
635 }
636
637 /*
638 * All pool->workers[] entries are now valid. The barrier in each
639 * worker's fetch_add + the acquire load guarantees they will see all
640 * pool->workers[] writes before entering try_steal().
641 */
642 return pool;
643}
644
645/*
646 * threadpool_submit
647 *
648 * Worker thread → push directly into own deque (zero-contention hot path).
649 * External thread → push to global queue (single mutex, cold path).
650 *
651 * After pushing, wake one parked worker if any are sleeping. Because
652 * try_steal now pulls BATCH_SIZE tasks at once from the global queue,
653 * a single wakeup is sufficient — the woken worker will pick up a full
654 * batch and wake neighbours via the work propagation in its deque.
655 */
656bool threadpool_submit(Threadpool* pool, void (*function)(void*), void* arg) {
657 if (!pool || !function) return false;
658
659 Task task = {function, arg};
660
661 if (tls_worker_index != SIZE_MAX) {
662 if (deque_push_bottom(&pool->workers[tls_worker_index]->deque, task)) {
663 unpark_one(pool);
664 return true;
665 }
666 }
667
668 bool ok = gq_push(&pool->gq, task);
669 if (ok) unpark_one(pool);
670 return ok;
671}
672
673/*
674 * threadpool_submit_batch — push N tasks in one call.
675 *
676 * Worker thread: each task is pushed into the caller's own deque one at a
677 * time (deque_push_bottom is already lock-free, so no batching needed there).
678 * If the deque fills up, remaining tasks spill to the global queue via
679 * gq_push_batch.
680 *
681 * External thread: all tasks go to the global queue via gq_push_batch, which
682 * holds gq_mutex for the entire write of each chunk. For a batch of N tasks
683 * this costs ceil(N / (GLOBAL_Q_SIZE-1)) mutex acquisitions instead of N —
684 * at GLOBAL_Q_SIZE=16384 that is a 16384× reduction in lock traffic for large
685 * submissions.
686 *
687 * Returns the number of tasks successfully submitted. On shutdown this may
688 * be less than count; the caller should treat a short return as an error.
689 */
690size_t threadpool_submit_batch(Threadpool* pool, void (**functions)(void*), void** args, size_t count) {
691 if (!pool || !functions || count == 0) return 0;
692
693 if (tls_worker_index != SIZE_MAX) {
694 /*
695 * Worker thread: push directly into own deque one at a time.
696 * Spill to global queue if the deque fills (extremely rare at
697 * DEQUE_SIZE=4096 but handled correctly).
698 */
699 size_t pushed = 0;
700 for (size_t i = 0; i < count; i++) {
701 if (!functions[i]) continue;
702 Task task = {functions[i], args ? args[i] : NULL};
703 if (deque_push_bottom(&pool->workers[tls_worker_index]->deque, task)) {
704 pushed++;
705 } else {
706 /* Deque full — spill remainder to global queue. */
707 Task* spill = (Task*)malloc((count - i) * sizeof(Task));
708 if (!spill) break;
709 size_t nspill = 0;
710 for (size_t j = i; j < count; j++) {
711 if (functions[j]) {
712 spill[nspill++] = (Task){functions[j], args ? args[j] : NULL};
713 }
714 }
715 pushed += gq_push_batch(&pool->gq, spill, nspill);
716 free(spill);
717 break;
718 }
719 }
720 if (pushed > 0) unpark_one(pool);
721 return pushed;
722 }
723
724 /*
725 * External thread: build a flat Task array and hand it to gq_push_batch
726 * in one call. gq_push_batch flushes in chunks that fit the queue,
727 * blocking only when the queue is completely full.
728 *
729 * Optimisation: if count <= BATCH_SIZE we use a stack-allocated array
730 * to avoid the malloc entirely on small batches.
731 */
732 Task stack_buf[BATCH_SIZE];
733 Task* tasks = (count <= BATCH_SIZE) ? stack_buf : (Task*)malloc(count * sizeof(Task));
734 if (!tasks) return 0;
735
736 size_t ntasks = 0;
737 for (size_t i = 0; i < count; i++) {
738 if (functions[i]) {
739 tasks[ntasks++] = (Task){functions[i], args ? args[i] : NULL};
740 }
741 }
742
743 size_t pushed = gq_push_batch(&pool->gq, tasks, ntasks);
744 if (tasks != stack_buf) free(tasks);
745
746 if (pushed > 0) unpark_one(pool);
747 return pushed;
748}
749
750void threadpool_wait(Threadpool* pool) {
751 if (!pool) return;
752
753 lock_acquire(&pool->idle_lock);
754 for (;;) {
755 if (atomic_load_explicit(&pool->num_active, memory_order_relaxed) == 0) {
756 bool any_work = false;
757 {
758 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_acquire);
759 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_acquire);
760 if (h != t) any_work = true;
761 }
762 for (size_t i = 0; i < pool->num_workers && !any_work; i++) {
763 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_acquire);
764 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_acquire);
765 if ((ptrdiff_t)(b - t) > 0) any_work = true;
766 }
767 if (!any_work) break;
768 }
769 cond_wait(&pool->all_idle, &pool->idle_lock);
770 }
771 lock_release(&pool->idle_lock);
772}
773
774void threadpool_destroy(Threadpool* pool, int timeout_ms) {
775 if (!pool) return;
776
777 lock_acquire(&pool->idle_lock);
778 for (;;) {
779 bool any_work = false;
780 {
781 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_acquire);
782 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_acquire);
783 if (h != t) any_work = true;
784 }
785 for (size_t i = 0; i < pool->num_workers && !any_work; i++) {
786 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_acquire);
787 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_acquire);
788 if ((ptrdiff_t)(b - t) > 0) any_work = true;
789 }
790 bool busy = atomic_load_explicit(&pool->num_active, memory_order_relaxed) > 0;
791 if (!any_work && !busy) break;
792
793 int r = cond_wait_timeout(&pool->all_idle, &pool->idle_lock, timeout_ms);
794 if (r == -1) perror("cond_wait_timeout");
795 }
796
797 atomic_store_explicit(&pool->shutdown, 1, memory_order_seq_cst);
798 lock_release(&pool->idle_lock);
799
800 unpark_all(pool);
801
802 while (atomic_load_explicit(&pool->num_threads_alive, memory_order_acquire) > 0) {
803 thread_yield();
804 }
805
806 for (size_t i = 0; i < pool->num_workers; i++) {
807 if (pool->workers[i]) {
808 thread_join(pool->workers[i]->pthread, NULL);
809 free(pool->workers[i]);
810 }
811 }
812 free(pool->workers);
813
814 gq_destroy(&pool->gq);
815 lock_free(&pool->park_lock);
816 cond_free(&pool->work_available);
817 lock_free(&pool->idle_lock);
818 cond_free(&pool->all_idle);
819 free(pool);
820}
pthread_mutex_t Lock
Definition lock.h:32
int lock_init(Lock *lock)
Definition lock.c:132
int cond_wait(Condition *condition, Lock *lock)
Definition lock.c:262
int cond_free(Condition *condition)
Definition lock.c:316
int cond_init(Condition *condition)
Definition lock.c:220
int lock_acquire(Lock *lock)
Definition lock.c:145
int cond_wait_timeout(Condition *condition, Lock *lock, int timeout_ms)
Definition lock.c:275
int lock_release(Lock *lock)
Definition lock.c:168
int cond_signal(Condition *condition)
Definition lock.c:234
int cond_broadcast(Condition *condition)
Definition lock.c:248
int lock_free(Lock *lock)
Definition lock.c:182
pthread_cond_t Condition
Definition lock.h:34
A unit of work submitted to the pool.
Definition threadpool.h:108
void * arg
Definition threadpool.h:110
void(* function)(void *arg)
Definition threadpool.h:109
int thread_create(Thread *thread, ThreadStartRoutine start_routine, void *data)
Definition thread.c:95
int thread_join(Thread tid, void **retval)
Definition thread.c:184
pthread_t Thread
Definition thread.h:78
bool threadpool_submit(Threadpool *pool, void(*function)(void *), void *arg)
Submit a single task to the pool.
Definition threadpool.c:656
size_t threadpool_submit_batch(Threadpool *pool, void(**functions)(void *), void **args, size_t count)
Submit multiple tasks to the pool in a single call.
Definition threadpool.c:690
Threadpool * threadpool_create(size_t num_threads)
Create a new thread pool.
Definition threadpool.c:597
void threadpool_destroy(Threadpool *pool, int timeout_ms)
Drain all pending tasks, stop all workers, and free the pool.
Definition threadpool.c:774
void threadpool_wait(Threadpool *pool)
Block until all currently submitted tasks have completed.
Definition threadpool.c:750
struct Threadpool Threadpool
Opaque handle to a thread pool instance.
Definition threadpool.h:99