aboutsummaryrefslogtreecommitdiff
path: root/src/utils/threadpool.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils/threadpool.h')
-rw-r--r--src/utils/threadpool.h167
1 files changed, 167 insertions, 0 deletions
diff --git a/src/utils/threadpool.h b/src/utils/threadpool.h
new file mode 100644
index 0000000..fac875e
--- /dev/null
+++ b/src/utils/threadpool.h
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2019 The libgav1 Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_
+#define LIBGAV1_SRC_UTILS_THREADPOOL_H_
+
+#include <functional>
+#include <memory>
+
+#if defined(__APPLE__)
+#include <TargetConditionals.h>
+#endif
+
+#if !defined(LIBGAV1_THREADPOOL_USE_STD_MUTEX)
+#if defined(__ANDROID__) || (defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE)
+#define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1
+#else
+#define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0
+#endif
+#endif
+
+#if LIBGAV1_THREADPOOL_USE_STD_MUTEX
+#include <condition_variable> // NOLINT (unapproved c++11 header)
+#include <mutex> // NOLINT (unapproved c++11 header)
+#else
+// absl::Mutex & absl::CondVar are significantly faster than the pthread
+// variants on platforms other than Android. iOS may deadlock on Shutdown()
+// using absl, see b/142251739.
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#endif
+
+#include "src/utils/compiler_attributes.h"
+#include "src/utils/executor.h"
+#include "src/utils/memory.h"
+#include "src/utils/unbounded_queue.h"
+
+namespace libgav1 {
+
+// An implementation of ThreadPool using POSIX threads (pthreads) or Windows
+// threads.
+//
+// - The pool allocates a fixed number of worker threads on instantiation.
+// - The worker threads will pick up work jobs as they arrive.
+// - If all workers are busy, work jobs are queued for later execution.
+//
+// The thread pool is shut down when the pool is destroyed.
+//
+// Example usage of the thread pool:
+// {
+// std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4);
+// for (int i = 0; i < 100; ++i) { // Dispatch 100 jobs.
+// pool->Schedule([&my_data]() { MyFunction(&my_data); });
+// }
+// } // ThreadPool gets destroyed only when all jobs are done.
+class ThreadPool : public Executor, public Allocable {
+ public:
+ // Creates the thread pool with the specified number of worker threads.
+ // If num_threads is 1, the closures are run in FIFO order.
+ static std::unique_ptr<ThreadPool> Create(int num_threads);
+
+ // Like the above factory method, but also sets the name prefix for threads.
+ static std::unique_ptr<ThreadPool> Create(const char name_prefix[],
+ int num_threads);
+
+ // The destructor will shut down the thread pool and all jobs are executed.
+ // Note that after shutdown, the thread pool does not accept further jobs.
+ ~ThreadPool() override;
+
+ // Adds the specified "closure" to the queue for processing. If worker threads
+ // are available, "closure" will run immediately. Otherwise "closure" is
+ // queued for later execution.
+ //
+ // NOTE: If the internal queue is full and cannot be resized because of an
+ // out-of-memory error, the current thread runs "closure" before returning
+ // from Schedule(). For our use cases, this seems better than the
+ // alternatives:
+ // 1. Return a failure status.
+ // 2. Have the current thread wait until the queue is not full.
+ void Schedule(std::function<void()> closure) override;
+
+ int num_threads() const;
+
+ private:
+ class WorkerThread;
+
+ // Creates the thread pool with the specified number of worker threads.
+ // If num_threads is 1, the closures are run in FIFO order.
+ ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads,
+ int num_threads);
+
+ // Starts the worker pool.
+ LIBGAV1_MUST_USE_RESULT bool StartWorkers();
+
+ void WorkerFunction();
+
+ // Shuts down the thread pool, i.e. worker threads finish their work and
+ // pick up new jobs until the queue is empty. This call will block until
+ // the shutdown is complete.
+ //
+ // Note: If a worker encounters an empty queue after this call, it will exit.
+ // Other workers might still be running, and if the queue fills up again, the
+ // thread pool will continue to operate with a decreased number of workers.
+ // It is up to the caller to prevent adding new jobs.
+ void Shutdown();
+
+#if LIBGAV1_THREADPOOL_USE_STD_MUTEX
+
+ void LockMutex() { queue_mutex_.lock(); }
+ void UnlockMutex() { queue_mutex_.unlock(); }
+
+ void Wait() {
+ std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock);
+ condition_.wait(queue_lock);
+ queue_lock.release();
+ }
+
+ void SignalOne() { condition_.notify_one(); }
+ void SignalAll() { condition_.notify_all(); }
+
+ std::condition_variable condition_;
+ std::mutex queue_mutex_;
+
+#else // !LIBGAV1_THREADPOOL_USE_STD_MUTEX
+
+ void LockMutex() ABSL_EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); }
+ void UnlockMutex() ABSL_UNLOCK_FUNCTION() { queue_mutex_.Unlock(); }
+ void Wait() { condition_.Wait(&queue_mutex_); }
+ void SignalOne() { condition_.Signal(); }
+ void SignalAll() { condition_.SignalAll(); }
+
+ absl::CondVar condition_;
+ absl::Mutex queue_mutex_;
+
+#endif // LIBGAV1_THREADPOOL_USE_STD_MUTEX
+
+ UnboundedQueue<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_);
+ // If not all the worker threads are created, the first entry after the
+ // created worker threads is a null pointer.
+ const std::unique_ptr<WorkerThread*[]> threads_;
+
+ bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false;
+ const int num_threads_ = 0;
+ // name_prefix_ is a C string, whose length is restricted to 16 characters,
+ // including the terminating null byte ('\0'). This restriction comes from
+ // the Linux pthread_setname_np() function.
+ char name_prefix_[16];
+};
+
+} // namespace libgav1
+
+#undef LIBGAV1_THREADPOOL_USE_STD_MUTEX
+
+#endif // LIBGAV1_SRC_UTILS_THREADPOOL_H_