diff options
Diffstat (limited to 'src/utils/threadpool.cc')
-rw-r--r-- | src/utils/threadpool.cc | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/src/utils/threadpool.cc b/src/utils/threadpool.cc new file mode 100644 index 0000000..8c8f4fe --- /dev/null +++ b/src/utils/threadpool.cc @@ -0,0 +1,323 @@ +// Copyright 2019 The libgav1 Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/utils/threadpool.h" + +#if defined(_MSC_VER) +#include <process.h> +#include <windows.h> +#else // defined(_MSC_VER) +#include <pthread.h> +#endif // defined(_MSC_VER) +#if defined(__ANDROID__) || defined(__GLIBC__) +#include <sys/types.h> +#include <unistd.h> +#endif +#include <algorithm> +#include <cassert> +#include <cinttypes> +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <new> +#include <utility> + +#if defined(__ANDROID__) +#include <chrono> // NOLINT (unapproved c++11 header) +#endif + +// The glibc wrapper for the gettid() system call was added in glibc 2.30. +// Emulate it for older versions of glibc. +#if defined(__GLIBC_PREREQ) +#if !__GLIBC_PREREQ(2, 30) + +#include <sys/syscall.h> + +static pid_t gettid() { return static_cast<pid_t>(syscall(SYS_gettid)); } + +#endif +#endif // defined(__GLIBC_PREREQ) + +namespace libgav1 { + +#if defined(__ANDROID__) +namespace { + +using Clock = std::chrono::steady_clock; +using Duration = Clock::duration; +constexpr Duration kBusyWaitDuration = + std::chrono::duration_cast<Duration>(std::chrono::duration<double>(2e-3)); + +} // namespace +#endif // defined(__ANDROID__) + +// static +std::unique_ptr<ThreadPool> ThreadPool::Create(int num_threads) { + return Create(/*name_prefix=*/"", num_threads); +} + +// static +std::unique_ptr<ThreadPool> ThreadPool::Create(const char name_prefix[], + int num_threads) { + if (name_prefix == nullptr || num_threads <= 0) return nullptr; + std::unique_ptr<WorkerThread*[]> threads(new (std::nothrow) + WorkerThread*[num_threads]); + if (threads == nullptr) return nullptr; + std::unique_ptr<ThreadPool> pool(new (std::nothrow) ThreadPool( + name_prefix, std::move(threads), num_threads)); + if (pool != nullptr && !pool->StartWorkers()) { + pool = nullptr; + } + return pool; +} + +ThreadPool::ThreadPool(const char name_prefix[], + std::unique_ptr<WorkerThread*[]> threads, + int num_threads) + : threads_(std::move(threads)), num_threads_(num_threads) { + threads_[0] = nullptr; + assert(name_prefix != nullptr); + const size_t name_prefix_len = + std::min(strlen(name_prefix), sizeof(name_prefix_) - 1); + memcpy(name_prefix_, name_prefix, name_prefix_len); + name_prefix_[name_prefix_len] = '\0'; +} + +ThreadPool::~ThreadPool() { Shutdown(); } + +void ThreadPool::Schedule(std::function<void()> closure) { + LockMutex(); + if (!queue_.GrowIfNeeded()) { + // queue_ is full and we can't grow it. Run |closure| directly. + UnlockMutex(); + closure(); + return; + } + queue_.Push(std::move(closure)); + UnlockMutex(); + SignalOne(); +} + +int ThreadPool::num_threads() const { return num_threads_; } + +// A simple implementation that mirrors the non-portable Thread. We may +// choose to expand this in the future as a portable implementation of +// Thread, or replace it at such a time as one is implemented. +class ThreadPool::WorkerThread : public Allocable { + public: + // Creates and starts a thread that runs pool->WorkerFunction(). + explicit WorkerThread(ThreadPool* pool); + + // Not copyable or movable. + WorkerThread(const WorkerThread&) = delete; + WorkerThread& operator=(const WorkerThread&) = delete; + + // REQUIRES: Join() must have been called if Start() was called and + // succeeded. + ~WorkerThread() = default; + + LIBGAV1_MUST_USE_RESULT bool Start(); + + // Joins with the running thread. + void Join(); + + private: +#if defined(_MSC_VER) + static unsigned int __stdcall ThreadBody(void* arg); +#else + static void* ThreadBody(void* arg); +#endif + + void SetupName(); + void Run(); + + ThreadPool* pool_; +#if defined(_MSC_VER) + HANDLE handle_; +#else + pthread_t thread_; +#endif +}; + +ThreadPool::WorkerThread::WorkerThread(ThreadPool* pool) : pool_(pool) {} + +#if defined(_MSC_VER) + +bool ThreadPool::WorkerThread::Start() { + // Since our code calls the C run-time library (CRT), use _beginthreadex + // rather than CreateThread. Microsoft documentation says "If a thread + // created using CreateThread calls the CRT, the CRT may terminate the + // process in low-memory conditions." + uintptr_t handle = _beginthreadex( + /*security=*/nullptr, /*stack_size=*/0, ThreadBody, this, + /*initflag=*/CREATE_SUSPENDED, /*thrdaddr=*/nullptr); + if (handle == 0) return false; + handle_ = reinterpret_cast<HANDLE>(handle); + ResumeThread(handle_); + return true; +} + +void ThreadPool::WorkerThread::Join() { + WaitForSingleObject(handle_, INFINITE); + CloseHandle(handle_); +} + +unsigned int ThreadPool::WorkerThread::ThreadBody(void* arg) { + auto* thread = static_cast<WorkerThread*>(arg); + thread->Run(); + return 0; +} + +void ThreadPool::WorkerThread::SetupName() { + // Not currently supported on Windows. +} + +#else // defined(_MSC_VER) + +bool ThreadPool::WorkerThread::Start() { + return pthread_create(&thread_, nullptr, ThreadBody, this) == 0; +} + +void ThreadPool::WorkerThread::Join() { pthread_join(thread_, nullptr); } + +void* ThreadPool::WorkerThread::ThreadBody(void* arg) { + auto* thread = static_cast<WorkerThread*>(arg); + thread->Run(); + return nullptr; +} + +void ThreadPool::WorkerThread::SetupName() { + if (pool_->name_prefix_[0] != '\0') { +#if defined(__APPLE__) + // Apple's version of pthread_setname_np takes one argument and operates on + // the current thread only. Also, pthread_mach_thread_np is Apple-specific. + // The maximum size of the |name| buffer was noted in the Chromium source + // code and was confirmed by experiments. + char name[64]; + mach_port_t id = pthread_mach_thread_np(pthread_self()); + int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_, + static_cast<int64_t>(id)); + assert(rv >= 0); + rv = pthread_setname_np(name); + assert(rv == 0); + static_cast<void>(rv); +#elif defined(__ANDROID__) || defined(__GLIBC__) + // If the |name| buffer is longer than 16 bytes, pthread_setname_np fails + // with error 34 (ERANGE) on Android. + char name[16]; + pid_t id = gettid(); + int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_, + static_cast<int64_t>(id)); + assert(rv >= 0); + rv = pthread_setname_np(pthread_self(), name); + assert(rv == 0); + static_cast<void>(rv); +#endif + } +} + +#endif // defined(_MSC_VER) + +void ThreadPool::WorkerThread::Run() { + SetupName(); + pool_->WorkerFunction(); +} + +bool ThreadPool::StartWorkers() { + if (!queue_.Init()) return false; + for (int i = 0; i < num_threads_; ++i) { + threads_[i] = new (std::nothrow) WorkerThread(this); + if (threads_[i] == nullptr) return false; + if (!threads_[i]->Start()) { + delete threads_[i]; + threads_[i] = nullptr; + return false; + } + } + return true; +} + +void ThreadPool::WorkerFunction() { + LockMutex(); + while (true) { + if (queue_.Empty()) { + if (exit_threads_) { + break; // Queue is empty and exit was requested. + } +#if defined(__ANDROID__) + // On android, if we go to a conditional wait right away, the CPU governor + // kicks in and starts shutting the cores down. So we do a very small busy + // wait to see if we get our next job within that period. This + // significantly improves the performance of common cases of tile parallel + // decoding. If we don't receive a job in the busy wait time, we then go + // to an actual conditional wait as usual. + UnlockMutex(); + bool found_job = false; + const auto wait_start = Clock::now(); + while (Clock::now() - wait_start < kBusyWaitDuration) { + LockMutex(); + if (!queue_.Empty()) { + found_job = true; + break; + } + UnlockMutex(); + } + // If |found_job| is true, we simply continue since we already hold the + // mutex and we know for sure that the |queue_| is not empty. + if (found_job) continue; + // Since |found_job_| was false, the mutex is not being held at this + // point. + LockMutex(); + // Ensure that the queue is still empty. + if (!queue_.Empty()) continue; + if (exit_threads_) { + break; // Queue is empty and exit was requested. + } +#endif // defined(__ANDROID__) + // Queue is still empty, wait for signal or broadcast. + Wait(); + } else { + // Take a job from the queue. + std::function<void()> job = std::move(queue_.Front()); + queue_.Pop(); + + UnlockMutex(); + // Note that it is good practice to surround this with a try/catch so + // the thread pool doesn't go to hell if the job throws an exception. + // This is omitted here because Google3 doesn't like exceptions. + std::move(job)(); + job = nullptr; + + LockMutex(); + } + } + UnlockMutex(); +} + +void ThreadPool::Shutdown() { + // Tell worker threads how to exit. + LockMutex(); + exit_threads_ = true; + UnlockMutex(); + SignalAll(); + + // Join all workers. This will block. + for (int i = 0; i < num_threads_; ++i) { + if (threads_[i] == nullptr) break; + threads_[i]->Join(); + delete threads_[i]; + } +} + +} // namespace libgav1 |