diff options
Diffstat (limited to 'absl/synchronization/internal')
-rw-r--r-- | absl/synchronization/internal/futex.h | 70 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.cc | 106 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.h | 3 |
3 files changed, 151 insertions, 28 deletions
diff --git a/absl/synchronization/internal/futex.h b/absl/synchronization/internal/futex.h index 9cf9841d..62bb40f7 100644 --- a/absl/synchronization/internal/futex.h +++ b/absl/synchronization/internal/futex.h @@ -16,9 +16,7 @@ #include "absl/base/config.h" -#ifdef _WIN32 -#include <windows.h> -#else +#ifndef _WIN32 #include <sys/time.h> #include <unistd.h> #endif @@ -85,34 +83,60 @@ namespace synchronization_internal { class FutexImpl { public: - static int WaitUntil(std::atomic<int32_t> *v, int32_t val, + // Atomically check that `*v == val`, and if it is, then sleep until the + // timeout `t` has been reached, or until woken by `Wake()`. + static int WaitUntil(std::atomic<int32_t>* v, int32_t val, KernelTimeout t) { - long err = 0; // NOLINT(runtime/int) - if (t.has_timeout()) { - // https://locklessinc.com/articles/futex_cheat_sheet/ - // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. - struct timespec abs_timeout = t.MakeAbsTimespec(); - // Atomically check that the futex value is still 0, and if it - // is, sleep until abs_timeout or until woken by FUTEX_WAKE. - err = syscall( - SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, - &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); + if (!t.has_timeout()) { + return Wait(v, val); + } else if (t.is_absolute_timeout()) { + auto abs_timespec = t.MakeAbsTimespec(); + return WaitAbsoluteTimeout(v, val, &abs_timespec); } else { - // Atomically check that the futex value is still 0, and if it - // is, sleep until woken by FUTEX_WAKE. - err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr); + auto rel_timespec = t.MakeRelativeTimespec(); + return WaitRelativeTimeout(v, val, &rel_timespec); } - if (ABSL_PREDICT_FALSE(err != 0)) { + } + + // Atomically check that `*v == val`, and if it is, then sleep until the until + // woken by `Wake()`. + static int Wait(std::atomic<int32_t>* v, int32_t val) { + return WaitAbsoluteTimeout(v, val, nullptr); + } + + // Atomically check that `*v == val`, and if it is, then sleep until + // CLOCK_REALTIME reaches `*abs_timeout`, or until woken by `Wake()`. + static int WaitAbsoluteTimeout(std::atomic<int32_t>* v, int32_t val, + const struct timespec* abs_timeout) { + // https://locklessinc.com/articles/futex_cheat_sheet/ + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. + auto err = + syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, + val, abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); + if (err != 0) { + return -errno; + } + return 0; + } + + // Atomically check that `*v == val`, and if it is, then sleep until + // `*rel_timeout` has elapsed, or until woken by `Wake()`. + static int WaitRelativeTimeout(std::atomic<int32_t>* v, int32_t val, + const struct timespec* rel_timeout) { + // Atomically check that the futex value is still 0, and if it + // is, sleep until abs_timeout or until woken by FUTEX_WAKE. + auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + FUTEX_PRIVATE_FLAG, val, rel_timeout); + if (err != 0) { return -errno; } return 0; } - static int Wake(std::atomic<int32_t> *v, int32_t count) { - // NOLINTNEXTLINE(runtime/int) - long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + // Wakes at most `count` waiters that have entered the sleep state on `v`. + static int Wake(std::atomic<int32_t>* v, int32_t count) { + auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); if (ABSL_PREDICT_FALSE(err < 0)) { return -errno; diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc index f2051d67..883ae811 100644 --- a/absl/synchronization/internal/waiter.cc +++ b/absl/synchronization/internal/waiter.cc @@ -67,11 +67,9 @@ static void MaybeBecomeIdle() { #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX -Waiter::Waiter() { - futex_.store(0, std::memory_order_relaxed); -} +Waiter::Waiter() : futex_(0) {} -bool Waiter::Wait(KernelTimeout t) { +bool Waiter::WaitAbsoluteTimeout(KernelTimeout t) { // Loop until we can atomically decrement futex from a positive // value, waiting on a futex while we believe it is zero. // Note that, since the thread ticker is just reset, we don't need to check @@ -90,7 +88,88 @@ bool Waiter::Wait(KernelTimeout t) { } if (!first_pass) MaybeBecomeIdle(); - const int err = Futex::WaitUntil(&futex_, 0, t); + auto abs_timeout = t.MakeAbsTimespec(); + const int err = Futex::WaitAbsoluteTimeout(&futex_, 0, &abs_timeout); + if (err != 0) { + if (err == -EINTR || err == -EWOULDBLOCK) { + // Do nothing, the loop will retry. + } else if (err == -ETIMEDOUT) { + return false; + } else { + ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); + } + } + first_pass = false; + } +} + +#if defined(CLOCK_MONOTONIC) + +// Subtracts the timespec `sub` from `in` if the result would not be negative, +// and returns true. Returns false if the result would be negative, and leaves +// `in` unchanged. +static bool TimespecSubtract(struct timespec& in, const struct timespec& sub) { + if (in.tv_sec < sub.tv_sec) { + return false; + } + if (in.tv_nsec < sub.tv_nsec) { + if (in.tv_sec == sub.tv_sec) { + return false; + } + // Borrow from tv_sec. + in.tv_sec -= 1; + in.tv_nsec += 1'000'000'000; + } + in.tv_sec -= sub.tv_sec; + in.tv_nsec -= sub.tv_nsec; + return true; +} + +// On some platforms a background thread periodically calls `Poke()` to briefly +// wake waiter threads so that they may call `MaybeBecomeIdle()`. This means +// that `WaitRelativeTimeout()` differs slightly from `WaitAbsoluteTimeout()` +// because it must adjust the timeout by the amount of time that it has already +// slept. +bool Waiter::WaitRelativeTimeout(KernelTimeout t) { + struct timespec start; + ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &start) == 0, + "clock_gettime() failed"); + + // Loop until we can atomically decrement futex from a positive + // value, waiting on a futex while we believe it is zero. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + + while (true) { + int32_t x = futex_.load(std::memory_order_relaxed); + while (x != 0) { + if (!futex_.compare_exchange_weak(x, x - 1, + std::memory_order_acquire, + std::memory_order_relaxed)) { + continue; // Raced with someone, retry. + } + return true; // Consumed a wakeup, we are done. + } + + auto relative_timeout = t.MakeRelativeTimespec(); + if (!first_pass) { + MaybeBecomeIdle(); + + // Adjust relative_timeout for `Poke()`s. + struct timespec now; + ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &now) == 0, + "clock_gettime() failed"); + // If TimespecSubstract(now, start) returns false, then the clock isn't + // truly monotonic. + if (TimespecSubtract(now, start)) { + if (!TimespecSubtract(relative_timeout, now)) { + return false; // Timeout. + } + } + } + + const int err = Futex::WaitRelativeTimeout(&futex_, 0, &relative_timeout); if (err != 0) { if (err == -EINTR || err == -EWOULDBLOCK) { // Do nothing, the loop will retry. @@ -104,6 +183,23 @@ bool Waiter::Wait(KernelTimeout t) { } } +#else // CLOCK_MONOTONIC + +// No support for CLOCK_MONOTONIC. +// KernelTimeout will automatically convert to an absolute timeout. +bool Waiter::WaitRelativeTimeout(KernelTimeout t) { + return WaitAbsoluteTimeout(t); +} + +#endif // CLOCK_MONOTONIC + +bool Waiter::Wait(KernelTimeout t) { + if (t.is_absolute_timeout()) { + return WaitAbsoluteTimeout(t); + } + return WaitRelativeTimeout(t); +} + void Waiter::Post() { if (futex_.fetch_add(1, std::memory_order_release) == 0) { // We incremented from 0, need to wake a potential waiter. diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h index b8adfeb5..c206cc3f 100644 --- a/absl/synchronization/internal/waiter.h +++ b/absl/synchronization/internal/waiter.h @@ -110,6 +110,9 @@ class Waiter { ~Waiter() = delete; #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX + bool WaitAbsoluteTimeout(KernelTimeout t); + bool WaitRelativeTimeout(KernelTimeout t); + // Futexes are defined by specification to be 32-bits. // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. std::atomic<int32_t> futex_; |