5#include "../include/threadpool.h"
7#include "../include/align.h"
8#include "../include/aligned_alloc.h"
9#include "../include/lock.h"
10#include "../include/thread.h"
20#define thread_yield() SwitchToThread()
24#define thread_yield() sched_yield()
104#define DEQUE_SIZE (1u << 12)
105#define DEQUE_MASK (DEQUE_SIZE - 1)
106#define GLOBAL_Q_SIZE (1u << 14)
107#define GLOBAL_Q_MASK (GLOBAL_Q_SIZE - 1)
113#define CACHE_LINE_SIZE 64
114#define YIELD_THRESHOLD 8
116#define CACHE_ALIGNED ALIGN(CACHE_LINE_SIZE)
118typedef enum { STEAL_SUCCESS, STEAL_EMPTY, STEAL_ABORT } StealResult;
122 CACHE_ALIGNED atomic_size_t bottom;
123 CACHE_ALIGNED atomic_size_t top;
124 CACHE_ALIGNED
Task tasks[DEQUE_SIZE];
129 CACHE_ALIGNED
Lock mutex;
130 CACHE_ALIGNED atomic_uint head;
131 CACHE_ALIGNED atomic_uint tail;
132 CACHE_ALIGNED
Task tasks[GLOBAL_Q_SIZE];
138typedef struct worker {
139 CACHE_ALIGNED WorkStealDeque deque;
140 CACHE_ALIGNED
Thread pthread;
141 CACHE_ALIGNED
size_t index;
147 CACHE_ALIGNED atomic_int shutdown;
148 CACHE_ALIGNED atomic_int num_threads_alive;
150 CACHE_ALIGNED worker** workers;
151 CACHE_ALIGNED
size_t num_workers;
154 CACHE_ALIGNED atomic_size_t workers_ready;
156 CACHE_ALIGNED GlobalQueue gq;
159 CACHE_ALIGNED
Lock park_lock;
160 CACHE_ALIGNED atomic_int num_parked;
164 CACHE_ALIGNED
Lock idle_lock;
165 CACHE_ALIGNED atomic_int num_active;
170static _Thread_local
size_t tls_worker_index = SIZE_MAX;
176static void gq_init(GlobalQueue* gq,
struct Threadpool* pool) {
179 atomic_store_explicit(&gq->head, 0, memory_order_relaxed);
180 atomic_store_explicit(&gq->tail, 0, memory_order_relaxed);
184static void gq_destroy(GlobalQueue* gq) {
190static bool gq_push(GlobalQueue* gq,
Task task) {
193 while (((atomic_load_explicit(&gq->head, memory_order_relaxed) + 1) & GLOBAL_Q_MASK) ==
194 atomic_load_explicit(&gq->tail, memory_order_relaxed)) {
195 if (atomic_load_explicit(&gq->pool->shutdown, memory_order_acquire)) {
202 uint32_t h = atomic_load_explicit(&gq->head, memory_order_relaxed);
204 atomic_store_explicit(&gq->head, (h + 1) & GLOBAL_Q_MASK, memory_order_release);
222static size_t gq_push_batch(GlobalQueue* gq,
Task* tasks,
size_t count) {
225 while (pushed < count) {
229 while (((atomic_load_explicit(&gq->head, memory_order_relaxed) + 1) & GLOBAL_Q_MASK) ==
230 atomic_load_explicit(&gq->tail, memory_order_relaxed)) {
231 if (atomic_load_explicit(&gq->pool->shutdown, memory_order_acquire)) {
243 uint32_t h = atomic_load_explicit(&gq->head, memory_order_relaxed);
244 uint32_t t = atomic_load_explicit(&gq->tail, memory_order_relaxed);
245 size_t free = (GLOBAL_Q_SIZE - 1) - ((h - t) & GLOBAL_Q_MASK);
246 size_t todo = count - pushed;
247 size_t n = todo < free ? todo : free;
249 for (
size_t i = 0; i < n; i++) {
250 gq->tasks[(h + i) & GLOBAL_Q_MASK] = tasks[pushed + i];
252 atomic_store_explicit(&gq->head, (h + (uint32_t)n) & GLOBAL_Q_MASK, memory_order_release);
273static int gq_pull_batch(GlobalQueue* gq,
Task* out,
size_t max) {
276 uint32_t h = atomic_load_explicit(&gq->head, memory_order_relaxed);
277 uint32_t t = atomic_load_explicit(&gq->tail, memory_order_relaxed);
281 return atomic_load_explicit(&gq->pool->shutdown, memory_order_acquire) ? -1 : 0;
284 size_t available = (h - t) & GLOBAL_Q_MASK;
285 size_t to_take = available < max ? available : max;
287 for (
size_t i = 0; i < to_take; i++) {
288 out[i] = gq->tasks[(t + i) & GLOBAL_Q_MASK];
290 atomic_store_explicit(&gq->tail, (t + (uint32_t)to_take) & GLOBAL_Q_MASK, memory_order_release);
302static void deque_init(WorkStealDeque* dq) {
303 atomic_store_explicit(&dq->bottom, 0, memory_order_relaxed);
304 atomic_store_explicit(&dq->top, 0, memory_order_relaxed);
308static bool deque_push_bottom(WorkStealDeque* dq,
Task task) {
309 size_t b = atomic_load_explicit(&dq->bottom, memory_order_relaxed);
310 size_t t = atomic_load_explicit(&dq->top, memory_order_acquire);
312 if (b - t >= DEQUE_SIZE)
return false;
314 dq->tasks[b & DEQUE_MASK] = task;
315 atomic_store_explicit(&dq->bottom, b + 1, memory_order_release);
332static bool deque_pop_bottom(WorkStealDeque* dq,
Task* out) {
333 size_t b = atomic_load_explicit(&dq->bottom, memory_order_relaxed);
334 size_t t = atomic_load_explicit(&dq->top, memory_order_acquire);
337 if (b == t)
return false;
345 atomic_store_explicit(&dq->bottom, b, memory_order_seq_cst);
346 t = atomic_load_explicit(&dq->top, memory_order_seq_cst);
348 if ((ptrdiff_t)(b - t) > 0) {
350 *out = dq->tasks[b & DEQUE_MASK];
356 *out = dq->tasks[b & DEQUE_MASK];
358 bool won = atomic_compare_exchange_strong_explicit(&dq->top, &expected, t + 1, memory_order_seq_cst,
359 memory_order_relaxed);
361 atomic_store_explicit(&dq->bottom, b + 1, memory_order_relaxed);
366 atomic_store_explicit(&dq->bottom, b + 1, memory_order_relaxed);
371static StealResult deque_steal_top(WorkStealDeque* dq,
Task* out) {
372 size_t t = atomic_load_explicit(&dq->top, memory_order_acquire);
373 size_t b = atomic_load_explicit(&dq->bottom, memory_order_acquire);
375 if ((ptrdiff_t)(b - t) <= 0)
return STEAL_EMPTY;
377 *out = dq->tasks[t & DEQUE_MASK];
379 if (!atomic_compare_exchange_strong_explicit(&dq->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) {
382 return STEAL_SUCCESS;
389static void worker_park(Threadpool* pool) {
390 atomic_fetch_add_explicit(&pool->num_parked, 1, memory_order_relaxed);
399 bool all_empty =
true;
401 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_acquire);
402 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_acquire);
403 if (h != t) all_empty =
false;
405 for (
size_t i = 0; i < pool->num_workers && all_empty; i++) {
406 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_acquire);
407 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_acquire);
408 if ((ptrdiff_t)(b - t) > 0) all_empty =
false;
411 if (all_empty && !atomic_load_explicit(&pool->shutdown, memory_order_acquire)) {
412 cond_wait(&pool->work_available, &pool->park_lock);
416 atomic_fetch_sub_explicit(&pool->num_parked, 1, memory_order_relaxed);
419static inline void unpark_one(Threadpool* pool) {
420 if (atomic_load_explicit(&pool->num_parked, memory_order_relaxed) > 0) {
427static inline void unpark_all(Threadpool* pool) {
449static bool try_steal(worker* self,
Task* out) {
451 size_t n = pool->num_workers;
454 static _Thread_local uint64_t rng = 0;
455 if (rng == 0) rng = ((uint64_t)self->index + 1) * 6364136223846793005ULL;
459 size_t start = (size_t)(rng % n);
462 for (
size_t i = 0; i < n; i++) {
463 size_t v = (start + i) % n;
464 if (v == self->index)
continue;
465 StealResult r = deque_steal_top(&pool->workers[v]->deque, out);
466 if (r == STEAL_SUCCESS)
return true;
477 Task batch[BATCH_SIZE];
478 int got = gq_pull_batch(&pool->gq, batch, BATCH_SIZE);
479 if (got <= 0)
return false;
482 for (
int i = 1; i < got; i++) {
483 if (!deque_push_bottom(&self->deque, batch[i])) {
489 for (
int j = i; j < got; j++) {
490 gq_push(&pool->gq, batch[j]);
500static void* worker_thread(
void* arg) {
501 worker* self = (worker*)arg;
506 tls_worker_index = self->index;
517 atomic_fetch_add_explicit(&pool->workers_ready, 1, memory_order_release);
518 while (atomic_load_explicit(&pool->workers_ready, memory_order_acquire) < pool->num_workers) {
522 atomic_fetch_add_explicit(&pool->num_threads_alive, 1, memory_order_relaxed);
524 while (!atomic_load_explicit(&pool->shutdown, memory_order_acquire)) {
526 if (deque_pop_bottom(&self->deque, &task))
goto execute;
529 if (try_steal(self, &task))
goto execute;
532 if (spin < YIELD_THRESHOLD) {
544 atomic_fetch_add_explicit(&pool->num_active, 1, memory_order_relaxed);
546 int active = atomic_fetch_sub_explicit(&pool->num_active, 1, memory_order_acq_rel) - 1;
549 bool any_work =
false;
551 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_relaxed);
552 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_relaxed);
553 if (h != t) any_work =
true;
555 for (
size_t i = 0; i < pool->num_workers && !any_work; i++) {
556 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_relaxed);
557 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_relaxed);
558 if ((ptrdiff_t)(b - t) > 0) any_work =
true;
562 if (atomic_load_explicit(&pool->num_active, memory_order_relaxed) == 0) {
570 int alive = atomic_fetch_sub_explicit(&pool->num_threads_alive, 1, memory_order_acq_rel) - 1;
584static int worker_init(Threadpool* pool, worker** w,
size_t index) {
585 *w = (worker*)ALIGNED_ALLOC(CACHE_LINE_SIZE,
sizeof(worker));
589 deque_init(&(*w)->deque);
598 if (num_threads == 0) num_threads = 1;
601 if (!pool)
return NULL;
603 atomic_store_explicit(&pool->shutdown, 0, memory_order_relaxed);
604 atomic_store_explicit(&pool->num_threads_alive, 0, memory_order_relaxed);
605 atomic_store_explicit(&pool->num_parked, 0, memory_order_relaxed);
606 atomic_store_explicit(&pool->num_active, 0, memory_order_relaxed);
607 atomic_store_explicit(&pool->workers_ready, 0, memory_order_relaxed);
609 pool->num_workers = num_threads;
610 gq_init(&pool->gq, pool);
617 pool->workers = (worker**)malloc(num_threads *
sizeof(worker*));
618 if (!pool->workers) {
619 gq_destroy(&pool->gq);
625 for (
size_t i = 0; i < num_threads; i++) pool->workers[i] = NULL;
627 for (
size_t i = 0; i < num_threads; i++) {
628 if (worker_init(pool, &pool->workers[i], i) != 0) {
630 atomic_store_explicit(&pool->workers_ready, num_threads, memory_order_release);
631 pool->num_workers = i;
657 if (!pool || !function)
return false;
659 Task task = {function, arg};
661 if (tls_worker_index != SIZE_MAX) {
662 if (deque_push_bottom(&pool->workers[tls_worker_index]->deque, task)) {
668 bool ok = gq_push(&pool->gq, task);
669 if (ok) unpark_one(pool);
691 if (!pool || !functions || count == 0)
return 0;
693 if (tls_worker_index != SIZE_MAX) {
700 for (
size_t i = 0; i < count; i++) {
701 if (!functions[i])
continue;
702 Task task = {functions[i], args ? args[i] : NULL};
703 if (deque_push_bottom(&pool->workers[tls_worker_index]->deque, task)) {
707 Task* spill = (
Task*)malloc((count - i) *
sizeof(
Task));
710 for (
size_t j = i; j < count; j++) {
712 spill[nspill++] = (
Task){functions[j], args ? args[j] : NULL};
715 pushed += gq_push_batch(&pool->gq, spill, nspill);
720 if (pushed > 0) unpark_one(pool);
732 Task stack_buf[BATCH_SIZE];
733 Task* tasks = (count <= BATCH_SIZE) ? stack_buf : (
Task*)malloc(count *
sizeof(
Task));
734 if (!tasks)
return 0;
737 for (
size_t i = 0; i < count; i++) {
739 tasks[ntasks++] = (
Task){functions[i], args ? args[i] : NULL};
743 size_t pushed = gq_push_batch(&pool->gq, tasks, ntasks);
744 if (tasks != stack_buf) free(tasks);
746 if (pushed > 0) unpark_one(pool);
755 if (atomic_load_explicit(&pool->num_active, memory_order_relaxed) == 0) {
756 bool any_work =
false;
758 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_acquire);
759 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_acquire);
760 if (h != t) any_work =
true;
762 for (
size_t i = 0; i < pool->num_workers && !any_work; i++) {
763 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_acquire);
764 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_acquire);
765 if ((ptrdiff_t)(b - t) > 0) any_work =
true;
767 if (!any_work)
break;
769 cond_wait(&pool->all_idle, &pool->idle_lock);
779 bool any_work =
false;
781 uint32_t h = atomic_load_explicit(&pool->gq.head, memory_order_acquire);
782 uint32_t t = atomic_load_explicit(&pool->gq.tail, memory_order_acquire);
783 if (h != t) any_work =
true;
785 for (
size_t i = 0; i < pool->num_workers && !any_work; i++) {
786 size_t b = atomic_load_explicit(&pool->workers[i]->deque.bottom, memory_order_acquire);
787 size_t t = atomic_load_explicit(&pool->workers[i]->deque.top, memory_order_acquire);
788 if ((ptrdiff_t)(b - t) > 0) any_work =
true;
790 bool busy = atomic_load_explicit(&pool->num_active, memory_order_relaxed) > 0;
791 if (!any_work && !busy)
break;
794 if (r == -1) perror(
"cond_wait_timeout");
797 atomic_store_explicit(&pool->shutdown, 1, memory_order_seq_cst);
802 while (atomic_load_explicit(&pool->num_threads_alive, memory_order_acquire) > 0) {
806 for (
size_t i = 0; i < pool->num_workers; i++) {
807 if (pool->workers[i]) {
809 free(pool->workers[i]);
814 gq_destroy(&pool->gq);
int lock_init(Lock *lock)
int cond_wait(Condition *condition, Lock *lock)
int cond_free(Condition *condition)
int cond_init(Condition *condition)
int lock_acquire(Lock *lock)
int cond_wait_timeout(Condition *condition, Lock *lock, int timeout_ms)
int lock_release(Lock *lock)
int cond_signal(Condition *condition)
int cond_broadcast(Condition *condition)
int lock_free(Lock *lock)
A unit of work submitted to the pool.
void(* function)(void *arg)
int thread_create(Thread *thread, ThreadStartRoutine start_routine, void *data)
int thread_join(Thread tid, void **retval)
bool threadpool_submit(Threadpool *pool, void(*function)(void *), void *arg)
Submit a single task to the pool.
size_t threadpool_submit_batch(Threadpool *pool, void(**functions)(void *), void **args, size_t count)
Submit multiple tasks to the pool in a single call.
Threadpool * threadpool_create(size_t num_threads)
Create a new thread pool.
void threadpool_destroy(Threadpool *pool, int timeout_ms)
Drain all pending tasks, stop all workers, and free the pool.
void threadpool_wait(Threadpool *pool)
Block until all currently submitted tasks have completed.
struct Threadpool Threadpool
Opaque handle to a thread pool instance.