aboutsummaryrefslogtreecommitdiff
path: root/src/utils/threadpool.h
blob: fac875e8481112e3a6cfad2de5212b5c57abe69c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*
 * Copyright 2019 The libgav1 Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_
#define LIBGAV1_SRC_UTILS_THREADPOOL_H_

#include <functional>
#include <memory>

#if defined(__APPLE__)
#include <TargetConditionals.h>
#endif

#if !defined(LIBGAV1_THREADPOOL_USE_STD_MUTEX)
#if defined(__ANDROID__) || (defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE)
#define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1
#else
#define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0
#endif
#endif

#if LIBGAV1_THREADPOOL_USE_STD_MUTEX
#include <condition_variable>  // NOLINT (unapproved c++11 header)
#include <mutex>               // NOLINT (unapproved c++11 header)
#else
// absl::Mutex & absl::CondVar are significantly faster than the pthread
// variants on platforms other than Android. iOS may deadlock on Shutdown()
// using absl, see b/142251739.
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#endif

#include "src/utils/compiler_attributes.h"
#include "src/utils/executor.h"
#include "src/utils/memory.h"
#include "src/utils/unbounded_queue.h"

namespace libgav1 {

// An implementation of ThreadPool using POSIX threads (pthreads) or Windows
// threads.
//
// - The pool allocates a fixed number of worker threads on instantiation.
// - The worker threads will pick up work jobs as they arrive.
// - If all workers are busy, work jobs are queued for later execution.
//
// The thread pool is shut down when the pool is destroyed.
//
// Example usage of the thread pool:
//   {
//     std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4);
//     for (int i = 0; i < 100; ++i) {  // Dispatch 100 jobs.
//       pool->Schedule([&my_data]() { MyFunction(&my_data); });
//     }
//   } // ThreadPool gets destroyed only when all jobs are done.
class ThreadPool : public Executor, public Allocable {
 public:
  // Creates the thread pool with the specified number of worker threads.
  // If num_threads is 1, the closures are run in FIFO order.
  static std::unique_ptr<ThreadPool> Create(int num_threads);

  // Like the above factory method, but also sets the name prefix for threads.
  static std::unique_ptr<ThreadPool> Create(const char name_prefix[],
                                            int num_threads);

  // The destructor will shut down the thread pool and all jobs are executed.
  // Note that after shutdown, the thread pool does not accept further jobs.
  ~ThreadPool() override;

  // Adds the specified "closure" to the queue for processing. If worker threads
  // are available, "closure" will run immediately. Otherwise "closure" is
  // queued for later execution.
  //
  // NOTE: If the internal queue is full and cannot be resized because of an
  // out-of-memory error, the current thread runs "closure" before returning
  // from Schedule(). For our use cases, this seems better than the
  // alternatives:
  //   1. Return a failure status.
  //   2. Have the current thread wait until the queue is not full.
  void Schedule(std::function<void()> closure) override;

  int num_threads() const;

 private:
  class WorkerThread;

  // Creates the thread pool with the specified number of worker threads.
  // If num_threads is 1, the closures are run in FIFO order.
  ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads,
             int num_threads);

  // Starts the worker pool.
  LIBGAV1_MUST_USE_RESULT bool StartWorkers();

  void WorkerFunction();

  // Shuts down the thread pool, i.e. worker threads finish their work and
  // pick up new jobs until the queue is empty. This call will block until
  // the shutdown is complete.
  //
  // Note: If a worker encounters an empty queue after this call, it will exit.
  // Other workers might still be running, and if the queue fills up again, the
  // thread pool will continue to operate with a decreased number of workers.
  // It is up to the caller to prevent adding new jobs.
  void Shutdown();

#if LIBGAV1_THREADPOOL_USE_STD_MUTEX

  void LockMutex() { queue_mutex_.lock(); }
  void UnlockMutex() { queue_mutex_.unlock(); }

  void Wait() {
    std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock);
    condition_.wait(queue_lock);
    queue_lock.release();
  }

  void SignalOne() { condition_.notify_one(); }
  void SignalAll() { condition_.notify_all(); }

  std::condition_variable condition_;
  std::mutex queue_mutex_;

#else  // !LIBGAV1_THREADPOOL_USE_STD_MUTEX

  void LockMutex() ABSL_EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); }
  void UnlockMutex() ABSL_UNLOCK_FUNCTION() { queue_mutex_.Unlock(); }
  void Wait() { condition_.Wait(&queue_mutex_); }
  void SignalOne() { condition_.Signal(); }
  void SignalAll() { condition_.SignalAll(); }

  absl::CondVar condition_;
  absl::Mutex queue_mutex_;

#endif  // LIBGAV1_THREADPOOL_USE_STD_MUTEX

  UnboundedQueue<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_);
  // If not all the worker threads are created, the first entry after the
  // created worker threads is a null pointer.
  const std::unique_ptr<WorkerThread*[]> threads_;

  bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false;
  const int num_threads_ = 0;
  // name_prefix_ is a C string, whose length is restricted to 16 characters,
  // including the terminating null byte ('\0'). This restriction comes from
  // the Linux pthread_setname_np() function.
  char name_prefix_[16];
};

}  // namespace libgav1

#undef LIBGAV1_THREADPOOL_USE_STD_MUTEX

#endif  // LIBGAV1_SRC_UTILS_THREADPOOL_H_