Halide 13.0.2
Halide compiler and libraries
synchronization_common.h
Go to the documentation of this file.
1#include "HalideRuntime.h"
2#include "printer.h"
3#include "scoped_spin_lock.h"
4
5/* This provides an implementation of pthreads-like mutex and
6 * condition variables with fast default case performance. The code
7 * is based on the "parking lot" design and specifically Amanieu
8 * d'Antras' Rust implementation:
9 * https://github.com/Amanieu/parking_lot
10 * and the one in WTF:
11 * https://webkit.org/blog/6161/locking-in-webkit/
12 *
13 * Neither of the above implementations were used directly largely for
14 * dependency management. This implementation lacks a few features
15 * relative to those two. Specifically timeouts are not
16 * supported. Nor is optional fairness or deadlock detection.
17 * This implementation should provide a faily standalone "one file"
18 * fast synchronization layer on top of readily available system primitives.
19 *
20 * TODO: Implement pthread_once equivalent.
21 * TODO: Add read/write lock and move SharedExclusiveSpinLock from tracing.cpp
22 * to this mechanism.
23 * TODO: Add timeouts and optional fairness if needed.
24 * TODO: Relying on condition variables has issues for old versions of Windows
25 * and likely has portability issues to some very bare bones embedded OSes.
26 * Doing an implementation using only semaphores or event counters should
27 * be doable.
28 */
29
30// Copied from tsan_interface.h
31#ifndef TSAN_ANNOTATIONS
32#define TSAN_ANNOTATIONS 0
33#endif
34
35#if TSAN_ANNOTATIONS
36extern "C" {
37const unsigned __tsan_mutex_linker_init = 1 << 0;
38void __tsan_mutex_pre_lock(void *addr, unsigned flags);
39void __tsan_mutex_post_lock(void *addr, unsigned flags, int recursion);
40int __tsan_mutex_pre_unlock(void *addr, unsigned flags);
41void __tsan_mutex_post_unlock(void *addr, unsigned flags);
42void __tsan_mutex_pre_signal(void *addr, unsigned flags);
43void __tsan_mutex_post_signal(void *addr, unsigned flags);
44}
45#endif
46
47namespace Halide {
48namespace Runtime {
49namespace Internal {
50
51namespace Synchronization {
52
53namespace {
54
55#if TSAN_ANNOTATIONS
56ALWAYS_INLINE void if_tsan_pre_lock(void *mutex) {
57 __tsan_mutex_pre_lock(mutex, __tsan_mutex_linker_init);
58};
59// TODO(zalman|dvyukov): Is 1 the right value for a non-recursive lock? pretty sure value is ignored.
60ALWAYS_INLINE void if_tsan_post_lock(void *mutex) {
61 __tsan_mutex_post_lock(mutex, __tsan_mutex_linker_init, 1);
62}
63// TODO(zalman|dvyukov): Is it safe to ignore return value here if locks are not recursive?
64ALWAYS_INLINE void if_tsan_pre_unlock(void *mutex) {
65 (void)__tsan_mutex_pre_unlock(mutex, __tsan_mutex_linker_init);
66}
67ALWAYS_INLINE void if_tsan_post_unlock(void *mutex) {
68 __tsan_mutex_post_unlock(mutex, __tsan_mutex_linker_init);
69}
70ALWAYS_INLINE void if_tsan_pre_signal(void *cond) {
71 __tsan_mutex_pre_signal(cond, 0);
72}
73ALWAYS_INLINE void if_tsan_post_signal(void *cond) {
74 __tsan_mutex_post_signal(cond, 0);
75}
76#else
77ALWAYS_INLINE void if_tsan_pre_lock(void *) {
78}
79ALWAYS_INLINE void if_tsan_post_lock(void *) {
80}
81ALWAYS_INLINE void if_tsan_pre_unlock(void *) {
82}
83ALWAYS_INLINE void if_tsan_post_unlock(void *) {
84}
85ALWAYS_INLINE void if_tsan_pre_signal(void *) {
86}
87ALWAYS_INLINE void if_tsan_post_signal(void *) {
88}
89#endif
90
91#ifdef BITS_32
92ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
93 return __sync_and_and_fetch(addr, val);
94}
95
96template<typename T>
97ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
98 return __sync_fetch_and_add(addr, val);
99}
100
101template<typename T>
102ALWAYS_INLINE bool cas_strong_sequentially_consistent_helper(T *addr, T *expected, T *desired) {
103 T oldval = *expected;
104 T gotval = __sync_val_compare_and_swap(addr, oldval, *desired);
105 *expected = gotval;
106 return oldval == gotval;
107}
108
109ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
110 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
111}
112
113ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
114 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
115}
116
117template<typename T>
118ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
119 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
120}
121
122ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
123 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
124}
125
126ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
127 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
128}
129
130ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
131 return __sync_fetch_and_and(addr, val);
132}
133
134template<typename T>
135ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
136 *val = *addr;
137}
138
139template<typename T>
140ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
141 __sync_synchronize();
142 *val = *addr;
143}
144
145ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
146 return __sync_or_and_fetch(addr, val);
147}
148
149ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
150 *addr = *val;
151}
152
153template<typename T>
154ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
155 *addr = *val;
156 __sync_synchronize();
157}
158
159ALWAYS_INLINE void atomic_thread_fence_acquire() {
160 __sync_synchronize();
161}
162
163#else
164
165ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
166 return __atomic_and_fetch(addr, val, __ATOMIC_RELEASE);
167}
168
169template<typename T>
170ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
171 return __atomic_fetch_add(addr, val, __ATOMIC_ACQ_REL);
172}
173
174ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
175 return __atomic_compare_exchange(addr, expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
176}
177
178template<typename T>
179ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
180 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
181}
182
183ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
184 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
185}
186
187ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
188 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
189}
190
191ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
192 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
193}
194
195ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
196 return __atomic_fetch_and(addr, val, __ATOMIC_RELEASE);
197}
198
199template<typename T>
200ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
201 __atomic_load(addr, val, __ATOMIC_RELAXED);
202}
203
204template<typename T>
205ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
206 __atomic_load(addr, val, __ATOMIC_ACQUIRE);
207}
208
209ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
210 return __atomic_or_fetch(addr, val, __ATOMIC_RELAXED);
211}
212
213ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
214 __atomic_store(addr, val, __ATOMIC_RELAXED);
215}
216
217template<typename T>
218ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
219 __atomic_store(addr, val, __ATOMIC_RELEASE);
220}
221
222ALWAYS_INLINE void atomic_thread_fence_acquire() {
223 __atomic_thread_fence(__ATOMIC_ACQUIRE);
224}
225
226#endif
227
228} // namespace
229
231 // Everyone says this should be 40. Have not measured it.
232 int spin_count = 40;
233
234public:
236 if (spin_count > 0) {
237 spin_count--;
238 }
239 return spin_count > 0;
240 }
241
243 spin_count = 40;
244 }
245};
246
247// Low order two bits are used for locking state,
248static constexpr uint8_t lock_bit = 0x01;
249static constexpr uint8_t queue_lock_bit = 0x02;
250static constexpr uint8_t parked_bit = 0x02;
251
253 thread_parker parker; // TODO: member or pointer?
254
255 // This design is from the Rust parking lot implementation by Amanieu d'Antras.
256 // Comment from original:
257 //
258 // Linked list of threads in the queue. The queue is split into two parts:
259 // the processed part and the unprocessed part. When new nodes are added to
260 // the list, they only have the next pointer set, and queue_tail is null.
261 //
262 // Nodes are processed with the queue lock held, which consists of setting
263 // the prev pointer for each node and setting the queue_tail pointer on the
264 // first processed node of the list.
265 //
266 // This setup allows nodes to be added to the queue without a lock, while
267 // still allowing O(1) removal of nodes from the processed part of the list.
268 // The only cost is the O(n) processing, but this only needs to be done
269 // once for each node, and therefore isn't too expensive.
270
274};
275
277 uintptr_t state = 0;
278
279 void lock_full();
280 void unlock_full();
281
282public:
284 if_tsan_pre_lock(this);
285
286 uintptr_t expected = 0;
287 uintptr_t desired = lock_bit;
288 // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
289 if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
290 lock_full();
291 }
292
293 if_tsan_post_lock(this);
294 }
295
297 if_tsan_pre_unlock(this);
298
299 uintptr_t val = atomic_fetch_and_release(&state, ~(uintptr_t)lock_bit);
300 // If another thread is currently queueing, that thread will ensure
301 // it acquires the lock or wakes a waiting thread.
302 bool no_thread_queuing = (val & queue_lock_bit) == 0;
303 // Only need to do a wakeup if there are threads waiting.
304 bool some_queued = (val & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0;
305 if (no_thread_queuing && some_queued) {
306 unlock_full();
307 }
308
309 if_tsan_post_unlock(this);
310 }
311};
312
313WEAK void word_lock::lock_full() {
314 spin_control spinner;
315 uintptr_t expected;
316 atomic_load_relaxed(&state, &expected);
317
318 while (true) {
319 if (!(expected & lock_bit)) {
320 uintptr_t desired = expected | lock_bit;
321
322 if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
323 return;
324 }
325 continue;
326 }
327
328 if (((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0) && spinner.should_spin()) {
330 atomic_load_relaxed(&state, &expected);
331 continue;
332 }
333
334 word_lock_queue_data node;
335
336 node.parker.prepare_park();
337 // TODO set up prelinkage parking state
338
339 word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
340 if (head == nullptr) {
341 node.tail = &node;
342 // constructor set node.prev = nullptr;
343 } else {
344 // Mark the tail as nullptr. The unlock routine will walk the list and wakeup
345 // the thread at the end.
346 // constructor set node.tail = nullptr;
347 // constructor set node.prev = nullptr;
348 node.next = head;
349 }
350
351 uintptr_t desired = ((uintptr_t)&node) | (expected & (queue_lock_bit | lock_bit));
352 if (atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
353 node.parker.park();
354 spinner.reset();
355 atomic_load_relaxed(&state, &expected);
356 }
357 }
358}
359
360WEAK void word_lock::unlock_full() {
361 uintptr_t expected;
362 atomic_load_relaxed(&state, &expected);
363
364 while (true) {
365 // If another thread is currently queueing, that thread will ensure
366 // it acquires the lock or wakes a waiting thread.
367 bool thread_queuing = (expected & queue_lock_bit);
368 // Only need to do a wakeup if there are threads waiting.
369 bool none_queued = (expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0;
370 if (thread_queuing || none_queued) {
371 return;
372 }
373
374 uintptr_t desired = expected | queue_lock_bit;
375 if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
376 break;
377 }
378 }
379
380 while (true) {
381 word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
382 word_lock_queue_data *current = head;
383 word_lock_queue_data *tail = current->tail;
384 int times_through = 0;
385 while (tail == nullptr) {
386 word_lock_queue_data *next = current->next;
387 halide_assert(nullptr, next != nullptr);
388 next->prev = current;
389 current = next;
390 tail = current->tail;
391 times_through++;
392 }
393 head->tail = tail;
394
395 // If the lock is now locked, unlock the queue and have the thread
396 // that currently holds the lock do the wakeup
397 if (expected & lock_bit) {
398 uintptr_t desired = expected & ~(uintptr_t)queue_lock_bit;
399 if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
400 return;
401 }
402 atomic_thread_fence_acquire();
403 continue;
404 }
405
406 word_lock_queue_data *new_tail = tail->prev;
407 if (new_tail == nullptr) {
408 bool continue_outer = false;
409 while (!continue_outer) {
410 uintptr_t desired = expected & lock_bit;
411 if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
412 break;
413 }
414 if ((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0) {
415 continue;
416 } else {
417 atomic_thread_fence_acquire();
418 continue_outer = true;
419 }
420 }
421
422 if (continue_outer) {
423 continue;
424 }
425 } else {
426 head->tail = new_tail;
427 atomic_and_fetch_release(&state, ~(uintptr_t)queue_lock_bit);
428 }
429
430 // TODO: The reason there are three calls here is other things can happen between them.
431 // Also it is not clear if unpark_start has to return the mutex/flag used by unpark
432 // and unpark_finish due to memory lifetime reasons.
433 tail->parker.unpark_start();
434 tail->parker.unpark();
435 tail->parker.unpark_finish();
436 break;
437 }
438}
439
441 thread_parker parker; // TODO: member or pointer?
442
443 uintptr_t sleep_address = 0;
444 queue_data *next = nullptr;
445 uintptr_t unpark_info = 0;
446};
447
448// Must be a power of two.
449constexpr int LOAD_FACTOR = 4;
450
453
454 queue_data *head = nullptr; // Is this queue_data or thread_data?
455 queue_data *tail = nullptr; // Is this queue_data or thread_data?
456};
457
458constexpr int HASH_TABLE_SIZE = MAX_THREADS * LOAD_FACTOR;
461};
463
464constexpr int HASH_TABLE_BITS = 10;
465static_assert((1 << HASH_TABLE_BITS) >= MAX_THREADS * LOAD_FACTOR);
466
467#if 0
468WEAK void dump_hash() {
469 int i = 0;
470 for (auto &bucket : table.buckets) {
471 queue_data *head = bucket.head;
472 while (head != nullptr) {
473 print(nullptr) << "Bucket index " << i << " addr " << (void *)head->sleep_address << "\n";
474 head = head->next;
475 }
476 i++;
477 }
478}
479#endif
480
481static ALWAYS_INLINE uintptr_t addr_hash(uintptr_t addr) {
482 // Fibonacci hashing. The golden ratio is 1.9E3779B97F4A7C15F39...
483 // in hexadecimal.
484 if (sizeof(uintptr_t) >= 8) {
485 return (addr * (uintptr_t)0x9E3779B97F4A7C15) >> (64 - HASH_TABLE_BITS);
486 } else {
487 return (addr * (uintptr_t)0x9E3779B9) >> (32 - HASH_TABLE_BITS);
488 }
489}
490
491#ifdef DEBUG_RUNTIME
492// Any hash calculated by addr_hash() should be incapable of being outside this range.
493ALWAYS_INLINE void check_hash(uintptr_t hash) {
494 halide_assert(nullptr, hash < HASH_TABLE_SIZE);
495}
496#endif // DEBUG_RUNTIME
497
498WEAK hash_bucket &lock_bucket(uintptr_t addr) {
499 uintptr_t hash = addr_hash(addr);
500
501#ifdef DEBUG_RUNTIME
502 check_hash(hash);
503#endif
504
505 // TODO: if resizing is implemented, loop, etc.
506 hash_bucket &bucket = table.buckets[hash];
507
508 bucket.mutex.lock();
509
510 return bucket;
511}
512
516
518 : from(from), to(to) {
519 }
520};
521
522WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to) {
523 // TODO: if resizing is implemented, loop, etc.
524 uintptr_t hash_from = addr_hash(addr_from);
525 uintptr_t hash_to = addr_hash(addr_to);
526
527#ifdef DEBUG_RUNTIME
528 check_hash(hash_from);
529 check_hash(hash_to);
530#endif
531
532 // Lock the bucket with the smaller hash first in order
533 // to prevent deadlock.
534 if (hash_from == hash_to) {
535 hash_bucket &first = table.buckets[hash_from];
536 first.mutex.lock();
537 return bucket_pair(first, first);
538 } else if (hash_from < hash_to) {
539 hash_bucket &first = table.buckets[hash_from];
540 hash_bucket &second = table.buckets[hash_to];
541 first.mutex.lock();
542 second.mutex.lock();
543 return bucket_pair(first, second);
544 } else {
545 hash_bucket &first = table.buckets[hash_to];
546 hash_bucket &second = table.buckets[hash_from];
547 first.mutex.lock();
548 second.mutex.lock();
549 return bucket_pair(second, first);
550 }
551}
552
554 // In the lock routine, the buckets are locked smaller hash index first.
555 // Here we reverse this ordering by comparing the pointers. This works
556 // since the pointers are obtained by indexing an array with the hash
557 // values.
558 if (&buckets.from == &buckets.to) {
559 buckets.from.mutex.unlock();
560 } else if (&buckets.from > &buckets.to) {
561 buckets.from.mutex.unlock();
562 buckets.to.mutex.unlock();
563 } else {
564 buckets.to.mutex.unlock();
565 buckets.from.mutex.unlock();
566 }
567}
568
570 bool unpark_one = false;
571 uintptr_t invalid_unpark_info = 0;
572};
573
575 uintptr_t park(uintptr_t addr);
576 uintptr_t unpark_one(uintptr_t addr);
577 int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info);
578
579protected:
580 virtual bool validate(validate_action &action) {
581 return true;
582 }
583 virtual void before_sleep() {
584 // nothing
585 }
586 virtual uintptr_t unpark(int unparked, bool more_waiters) {
587 return 0;
588 }
589 virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) {
590 // nothing
591 }
592};
593
594// TODO: Do we need a park_result thing here?
595WEAK uintptr_t parking_control::park(uintptr_t addr) {
597
598 hash_bucket &bucket = lock_bucket(addr);
599
600 validate_action action;
601 if (!validate(action)) {
602 bucket.mutex.unlock();
603 return action.invalid_unpark_info;
604 }
605
606 queue_data.next = nullptr;
608 queue_data.parker.prepare_park();
609 if (bucket.head != nullptr) {
610 bucket.tail->next = &queue_data;
611 } else {
612 bucket.head = &queue_data;
613 }
614 bucket.tail = &queue_data;
615 bucket.mutex.unlock();
616
617 before_sleep();
618
619 queue_data.parker.park();
620
621 return queue_data.unpark_info;
622
623 // TODO: handling timeout.
624}
625
626WEAK uintptr_t parking_control::unpark_one(uintptr_t addr) {
627 hash_bucket &bucket = lock_bucket(addr);
628
629 queue_data **data_location = &bucket.head;
630 queue_data *prev = nullptr;
631 queue_data *data = *data_location;
632 while (data != nullptr) {
633 uintptr_t cur_addr;
634 atomic_load_relaxed(&data->sleep_address, &cur_addr);
635 if (cur_addr == addr) {
636 queue_data *next = data->next;
637 *data_location = next;
638
639 bool more_waiters = false;
640
641 if (bucket.tail == data) {
642 bucket.tail = prev;
643 } else {
644 queue_data *data2 = next;
645 while (data2 != nullptr && !more_waiters) {
646 uintptr_t cur_addr2;
647 atomic_load_relaxed(&data2->sleep_address, &cur_addr2);
648 more_waiters = (cur_addr2 == addr);
649 data2 = data2->next;
650 }
651 }
652
653 data->unpark_info = unpark(1, more_waiters);
654
655 data->parker.unpark_start();
656 bucket.mutex.unlock();
657 data->parker.unpark();
658 data->parker.unpark_finish();
659
660 // TODO: Figure out ret type.
661 return more_waiters ? 1 : 0;
662 } else {
663 data_location = &data->next;
664 prev = data;
665 data = data->next;
666 }
667 }
668
669 unpark(0, false);
670
671 bucket.mutex.unlock();
672
673 // TODO: decide if this is the right return value.
674 return 0;
675}
676
677WEAK int parking_control::unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info) {
678 bucket_pair buckets = lock_bucket_pair(addr_from, addr_to);
679
680 validate_action action;
681 if (!validate(action)) {
682 unlock_bucket_pair(buckets);
683 return 0;
684 }
685
686 queue_data **data_location = &buckets.from.head;
687 queue_data *prev = nullptr;
688 queue_data *data = *data_location;
689 queue_data *requeue = nullptr;
690 queue_data *requeue_tail = nullptr;
691 queue_data *wakeup = nullptr;
692
693 while (data != nullptr) {
694 uintptr_t cur_addr;
695 atomic_load_relaxed(&data->sleep_address, &cur_addr);
696
697 queue_data *next = data->next;
698 if (cur_addr == addr_from) {
699 *data_location = next;
700
701 if (buckets.from.tail == data) {
702 buckets.from.tail = prev;
703 }
704
705 if (action.unpark_one && wakeup == nullptr) {
706 wakeup = data;
707 } else {
708 if (requeue == nullptr) {
709 requeue = data;
710 } else {
711 requeue_tail->next = data;
712 }
713
714 requeue_tail = data;
715 atomic_store_relaxed(&data->sleep_address, &addr_to);
716 }
717 data = next;
718 // TODO: prev ptr?
719 } else {
720 data_location = &data->next;
721 prev = data;
722 data = next;
723 }
724 }
725
726 if (requeue != nullptr) {
727 requeue_tail->next = nullptr;
728 if (buckets.to.head == nullptr) {
729 buckets.to.head = requeue;
730 } else {
731 buckets.to.tail->next = requeue;
732 }
733 buckets.to.tail = requeue_tail;
734 }
735
736 requeue_callback(action, wakeup != nullptr, requeue != nullptr);
737
738 if (wakeup != nullptr) {
739 wakeup->unpark_info = unpark_info;
740 wakeup->parker.unpark_start();
741 unlock_bucket_pair(buckets);
742 wakeup->parker.unpark();
743 wakeup->parker.unpark_finish();
744 } else {
745 unlock_bucket_pair(buckets);
746 }
747
748 return wakeup != nullptr && action.unpark_one;
749}
750
752 uintptr_t *const lock_state;
753
756 }
757
758protected:
759 bool validate(validate_action &action) final {
760 uintptr_t result;
761 atomic_load_relaxed(lock_state, &result);
762 return result == (lock_bit | parked_bit);
763 }
764
765 uintptr_t unpark(int unparked, bool more_waiters) final {
766 // TODO: consider handling fairness.
767 uintptr_t return_state = more_waiters ? parked_bit : 0;
768 atomic_store_release(lock_state, &return_state);
769 return 0;
770 }
771};
772
774 uintptr_t state = 0;
775
776 ALWAYS_INLINE void lock_full() {
777 // Everyone says this should be 40. Have not measured it.
778 spin_control spinner;
779 uintptr_t expected;
780 atomic_load_relaxed(&state, &expected);
781
782 while (true) {
783 if (!(expected & lock_bit)) {
784 uintptr_t desired = expected | lock_bit;
785 if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
786 return;
787 }
788 continue;
789 }
790
791 // Spin with spin count. Note that this occurs even if
792 // threads are parked. We're prioritizing throughput over
793 // fairness by letting sleeping threads lie.
794 if (spinner.should_spin()) {
796 atomic_load_relaxed(&state, &expected);
797 continue;
798 }
799
800 // Mark mutex as having parked threads if not already done.
801 if ((expected & parked_bit) == 0) {
802 uintptr_t desired = expected | parked_bit;
803 if (!atomic_cas_weak_relaxed_relaxed(&state, &expected, &desired)) {
804 continue;
805 }
806 }
807
808 // TODO: consider handling fairness, timeout
809 mutex_parking_control control(&state);
810 uintptr_t result = control.park((uintptr_t)this);
811 if (result == (uintptr_t)this) {
812 return;
813 }
814
815 spinner.reset();
816 atomic_load_relaxed(&state, &expected);
817 }
818 }
819
820 ALWAYS_INLINE void unlock_full() {
821 uintptr_t expected = lock_bit;
822 uintptr_t desired = 0;
823 // Try for a fast release of the lock. Redundant with code in lock, but done
824 // to make unlock_full a standalone unlock that can be called directly.
825 if (atomic_cas_strong_release_relaxed(&state, &expected, &desired)) {
826 return;
827 }
828
829 mutex_parking_control control(&state);
830 control.unpark_one((uintptr_t)this);
831 }
832
833public:
835 uintptr_t expected = 0;
836 uintptr_t desired = lock_bit;
837 // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
838 if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
839 lock_full();
840 }
841 }
842
844 uintptr_t expected = lock_bit;
845 uintptr_t desired = 0;
846 // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
847 if (!atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
848 unlock_full();
849 }
850 }
851
853 uintptr_t val;
854 atomic_load_relaxed(&state, &val);
855 while (true) {
856 if (!(val & lock_bit)) {
857 return false;
858 }
859
860 uintptr_t desired = val | parked_bit;
861 if (atomic_cas_weak_relaxed_relaxed(&state, &val, &desired)) {
862 return true;
863 }
864 }
865 }
866
868 atomic_or_fetch_relaxed(&state, parked_bit);
869 }
870};
871
873 uintptr_t *const cond_state;
875
878 }
879
880protected:
881 uintptr_t unpark(int unparked, bool more_waiters) final {
882 if (!more_waiters) {
883 uintptr_t val = 0;
884 atomic_store_relaxed(cond_state, &val);
885 }
886
887#if 0 // TODO: figure out why this was here.
888 return (uintptr_t)mutex;
889#else
890 return 0;
891#endif
892 }
893};
894
896 uintptr_t *const cond_state;
898
901 }
902
903protected:
904 bool validate(validate_action &action) final {
905 uintptr_t val;
906 atomic_load_relaxed(cond_state, &val);
907 // By the time this broadcast locked everything and was processed, the cond
908 // has progressed to a new mutex, do nothing since any waiting threads have
909 // to be waiting on what is effectively a different condition.
910 if (val != (uintptr_t)mutex) {
911 return false;
912 }
913 // Clear the cond's connection to the mutex as all waiting threads are going to reque onto the mutex.
914 val = 0;
915 atomic_store_relaxed(cond_state, &val);
916 action.unpark_one = !mutex->make_parked_if_locked();
917 return true;
918 }
919
920 void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final {
921 if (action.unpark_one && some_requeued) {
923 }
924 }
925};
926
928 uintptr_t *const cond_state;
930
933 }
934
935protected:
936 bool validate(validate_action &action) final {
937 uintptr_t val;
938 atomic_load_relaxed(cond_state, &val);
939
940 if (val == 0) {
941 val = (uintptr_t)mutex;
942 atomic_store_relaxed(cond_state, &val);
943 } else if (val != (uintptr_t)mutex) {
944 // TODO: signal error.
945 action.invalid_unpark_info = (uintptr_t)mutex;
946 return false;
947 }
948
949 return true;
950 }
951
952 void before_sleep() final {
953 mutex->unlock();
954 }
955
956 uintptr_t unpark(int unparked, bool more_waiters) final {
957 if (!more_waiters) {
958 uintptr_t val = 0;
959 atomic_store_relaxed(cond_state, &val);
960 }
961 return 0;
962 }
963};
964
966 uintptr_t state = 0;
967
968public:
970 if_tsan_pre_signal(this);
971
972 uintptr_t val;
973 atomic_load_relaxed(&state, &val);
974 if (val == 0) {
975 if_tsan_post_signal(this);
976 return;
977 }
978 signal_parking_control control(&state, (fast_mutex *)val);
979 control.unpark_one((uintptr_t)this);
980 if_tsan_post_signal(this);
981 }
982
984 if_tsan_pre_signal(this);
985 uintptr_t val;
986 atomic_load_relaxed(&state, &val);
987 if (val == 0) {
988 if_tsan_post_signal(this);
989 return;
990 }
991 broadcast_parking_control control(&state, (fast_mutex *)val);
992 control.unpark_requeue((uintptr_t)this, val, 0);
993 if_tsan_post_signal(this);
994 }
995
997 wait_parking_control control(&state, mutex);
998 uintptr_t result = control.park((uintptr_t)this);
999 if (result != (uintptr_t)mutex) {
1000 mutex->lock();
1001 } else {
1002 if_tsan_pre_lock(mutex);
1003
1004 // TODO: this is debug only.
1005 uintptr_t val;
1006 atomic_load_relaxed((uintptr_t *)mutex, &val);
1007 halide_assert(nullptr, val & 0x1);
1008
1009 if_tsan_post_lock(mutex);
1010 }
1011 }
1012};
1013
1014} // namespace Synchronization
1015
1016} // namespace Internal
1017} // namespace Runtime
1018} // namespace Halide
1019
1020extern "C" {
1021
1025 fast_mutex->lock();
1026}
1027
1031 fast_mutex->unlock();
1032}
1033
1037 fast_cond->broadcast();
1038}
1039
1043 fast_cond->signal();
1044}
1045
1046WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex) {
1051 fast_cond->wait(fast_mutex);
1052}
1053
1054// Actual definition of the mutex array.
1057};
1058
1060 // TODO: If sz is huge, we should probably hash it down to something smaller
1061 // in the accessors below. Check for deadlocks before doing so.
1063 nullptr, sizeof(halide_mutex_array));
1064 if (array == nullptr) {
1065 // Will result in a failed assertion and a call to halide_error.
1066 return nullptr;
1067 }
1068 array->array = (halide_mutex *)halide_malloc(
1069 nullptr, sz * sizeof(halide_mutex));
1070 if (array->array == nullptr) {
1071 halide_free(nullptr, array);
1072 // Will result in a failed assertion and a call to halide_error.
1073 return nullptr;
1074 }
1075 memset(array->array, 0, sz * sizeof(halide_mutex));
1076 return array;
1077}
1078
1080 struct halide_mutex_array *arr_ptr = (struct halide_mutex_array *)array;
1081 halide_free(user_context, arr_ptr->array);
1082 halide_free(user_context, arr_ptr);
1083}
1084
1086 halide_mutex_lock(&array->array[entry]);
1087 return 0;
1088}
1089
1091 halide_mutex_unlock(&array->array[entry]);
1092 return 0;
1093}
1094}
This file declares the routines used by Halide internally in its runtime.
void * halide_malloc(void *user_context, size_t x)
Halide calls these functions to allocate and free memory.
void halide_free(void *user_context, void *ptr)
WEAK hash_bucket & lock_bucket(uintptr_t addr)
WEAK void unlock_bucket_pair(bucket_pair &buckets)
WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to)
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
Expr print(const std::vector< Expr > &values)
Create an Expr that prints out its value whenever it is evaluated.
void * user_context
Definition: printer.h:33
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
#define ALWAYS_INLINE
void * memset(void *s, int val, size_t n)
#define halide_assert(user_context, cond)
#define WEAK
ALWAYS_INLINE broadcast_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final
ALWAYS_INLINE bucket_pair(hash_bucket &from, hash_bucket &to)
virtual uintptr_t unpark(int unparked, bool more_waiters)
virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued)
int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info)
ALWAYS_INLINE signal_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
uintptr_t unpark(int unparked, bool more_waiters) final
ALWAYS_INLINE wait_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
Cross platform condition variable.
struct halide_mutex * array
Cross-platform mutex.
WEAK int halide_mutex_array_lock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_array_destroy(void *user_context, void *array)
WEAK void halide_mutex_unlock(halide_mutex *mutex)
WEAK void halide_cond_signal(struct halide_cond *cond)
WEAK int halide_mutex_array_unlock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_lock(halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
WEAK void halide_cond_broadcast(struct halide_cond *cond)
WEAK halide_mutex_array * halide_mutex_array_create(int sz)