Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
ThreadPool.h
Go to the documentation of this file.
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#ifndef _Stroika_Foundation_Execution_ThreadPool_h_
5#define _Stroika_Foundation_Execution_ThreadPool_h_ 1
6
7#include "Stroika/Foundation/StroikaPreComp.h"
8
9#include <list>
10#include <mutex>
11
12#include "Stroika/Foundation/Containers/Collection.h"
15#include "Stroika/Foundation/Execution/WaitableEvent.h"
16
17/**
18 * \file
19 *
20 * \note Code-Status: <a href="Code-Status.md#Beta">Beta</a>
21 *
22 * TODO:
23 * @todo AddTask () implementation is HORRIBLE (for case where fAddBlockTimeout != 0) and needs to be rewritten with condition variables.
24 *
25 * @todo ThreadPool::WaitForTask () is a very sloppy inefficient implementation (but probably rarely called).
26 *
27 * @todo CONSIDER USE OF blocking q - I think it will help. Or figure out
28 * how these tie together. Or rewrite using condition_variable.
29 *
30 * @todo Current approach to aborting a running task is to abort the thread. But the current
31 * thread code doesn't support restarting a thread once its been aborted. We PROBABLY
32 * should correct that at some point - and allow a thread to undo its abort-in-progress.
33 * However - no need immediately. Instead - the current ThreadPool implementation simply
34 * drops that thread and builds a new one. Performance overhead yes - but only for the
35 * likely rare case of aborting a running task.
36 *
37 * @todo Consider adding the idea of TaskGroups - which are properties shared by all tasks (or some tasks)
38 * added to a threadpool. If multiple tasks are added to the threadpool with the same
39 * TaskGroup, then they respect that task-group's constraints. One example constraint would be
40 * mutual run exclusion. This would allow you to create lockless threaded procedures, because
41 * they would be guaranteed to not be run all at the same time, and yet could STILL leverage
42 * the benefits of thread pooling.
43 *
44 * For example, if you had 3 threads in the pool, and 5 thread groups, then typically one or
45 * more thread groups would be idle. The thread groups give you lockless execution, and
46 * the threadpool lets the 5 groups run 'at the same time' on only 3 threads.
47 */
48
50
51 /**
52 * The ThreadPool class creates a small fixed number of Thread objects, and lets you use them
53 * as if there were many more. You submit a task (representable as a comparable std::function - @see Function) -
54 * and it gets eventually executed.
55 *
56 * If as Task in the thread pool raises an exception - this will be IGNORED (except for the
57 * special case of Thread::AbortException which is used internally to end the threadpool or
58 * remove some threads). Because of this, your submitted runnables should take care of their own
59 * error handling internally.
60 *
61 * ThreadPool mainly useful for servicing lost-cost calls/threads, where the overhead
62 * of constructing the thread is significant compared to the cost of performing the action,
63 * and where the priority & stacksize can all be predetermined and 'shared'.
64 * Also - where you want to CONTROL the level of thread creation (possibly to avoid
65 * DOS attacks or just accidental overloading).
66 *
67 * \pre Debug::AppearsDuringMainLifetime (); during the lifetime of the ThreadPool
68 *
69 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
70 * all methods can be freely used from any thread, and they will block internally as needed.
71 */
72 class [[nodiscard]] ThreadPool {
73 public:
74 /**
75 * Affects behavior of AddTask command. If a max provided, then attempts to Add
76 * will check this versus the actual pending Q length, and wait/throw if insufficient space.
77 *
78 * Use this overload to assure thread pools don't 'blow up' with too many tasks.
79 *
80 * \note - often only provide QMax::fLength, since the default to not blocking - just throwing cuz Q full - is quite reasonable.
81 */
82 struct QMax {
83 size_t fLength{0};
84
85 /**
86 * Zero timeout means never wait (AddTask either adds or throws but doesn't wait for Q to empty enuf).
87 */
88 Time::DurationSeconds fAddBlockTimeout{0s};
89 };
90
91 /**
92 */
93 struct Options {
94 /**
95 * number of threads allocated to service tasks.
96 */
97 unsigned int fThreadCount{thread::hardware_concurrency ()};
98
99 /**
100 * Name used in various debug messages.
101 */
102 optional<Characters::String> fThreadPoolName;
103
104 /**
105 * Default to no max Q - infinite queue size. Note, a value of fLength=fThreadCount, or some small mutiple, would often be reasonable.
106 */
107 optional<QMax> fQMax;
108
109 /**
110 * defaults false;
111 */
112 bool fCollectStatistics{false};
113 };
114
115 public:
116 /**
117 * \par Example Usage
118 * \code
119 * ThreadPool p{ThreadPool::Options{.fThreadCount = 3}};
120 * p.AddTask ([&q, &counter] () {
121 * ..dostuff..
122 * });
123 * // when goes out of scope automatically blocks waiting for threads to complete...
124 * // or call p.WaitForTasksDoneUntil ()
125 * \endcode
126 */
127 ThreadPool ();
128 ThreadPool (const Options& options);
129 ThreadPool (ThreadPool&&) = delete;
130 ThreadPool (const ThreadPool&) = delete;
131
132 public:
133 nonvirtual ThreadPool& operator= (ThreadPool&&) = delete;
134 nonvirtual ThreadPool& operator= (const ThreadPool&) = delete;
135
136 public:
137 /**
138 * Destroying a threadpool implicitly calls AbortAndWaitForDone () and eats any errors (cannot rethrow).
139 *
140 * \note - ThreadPool used to have explicit abort methods, but there was no point. When aborting, you must wait for them all to
141 * shut down to destroy the object. And since you cannot restart the object, there is no point in ever aborting without destroying.
142 * so KISS - just destroy the ThreadPool object.
143 */
144 ~ThreadPool ();
145
146 public:
147 /**
148 * \note It is important (required) that all tasks added to a ThreadPool respond in a timely manner to Thread Abort.
149 * ThreadPool counts on that for clean shutdown.
150 *
151 * This means periodically calling CheckForInterruption () and that any waits respect thread cancelation (stop_token).
152 *
153 * Tasks may exit via exception, but nothing will be done with that exception (beyond DbgTrace logging). So generally
154 * not a good idea, except for ThreadAbort handling.
155 */
156 using TaskType = Function<void ()>;
157
158 public:
159 /**
160 * These options have have been modified by various APIs, and reflect the current state of options, not necessarily those that the
161 * ThreadPool was created with.
162 */
163 nonvirtual Options GetOptions () const;
164
165 public:
166 /**
167 * This returns the number of threads in the pool (not the number of tasks). Note 0 is a legal size.
168 */
169 nonvirtual unsigned int GetPoolSize () const;
170
171 public:
172 /**
173 * SetPoolSize () is advisory. It attempts to add or remove entries as requested. Note - 0 is a legal size.
174 *
175 * But under some circumstances, it will fail. For example, if tasks are busy
176 * running on all threads, the number of threads in the pool cannot be decreased.
177 *
178 * @todo - WE CAN do better than this - at least marking the thread as to be removed when the
179 * task finishes - but NYI
180 */
181 nonvirtual void SetPoolSize (unsigned int poolSize);
182
183 public:
184 /**
185 * Push the given task into the queue (possibly blocking til space in the Q or throwing if no space in Q, but does NOT block waiting for Q to till).
186 *
187 * \par Example Usage
188 * \code
189 * ThreadPool p;
190 * p.AddTask ([] () {doIt ();});
191 * \endcode
192 *
193 * if qmax is provided, it takes precedence over any default value associated with the ThreadPool (constructor). If neither
194 * provided (as an argument or associated with the pool, this is treated as no max, and the addition just proceeds.
195 *
196 * If qMax provided (even indirectly), assure task q length doesn't exceed argument by waiting up to the qMax
197 * duration, and either timing out, or successfully add the task.
198 *
199 * \note Design Note:
200 * The reason this returns as TaskType is that its easy to convert a lambda or whatever into a TaskType, but if you do
201 * it multiple times you get different (!=) values. So to make the auto conversion work easier without needing
202 * to first create a variable, and then do the add task, you can just do them together. And it avoids mistakes like:
203 * function<void()> f = ...;
204 * p.AddTask(f);
205 * p.RemoveTask (p); // fails cuz different 'TaskType' added - f converted to TaskType twice!
206 */
207 nonvirtual TaskType AddTask (const TaskType& task, const optional<Characters::String>& name = nullopt);
208 nonvirtual TaskType AddTask (const TaskType& task, QMax qmax, const optional<Characters::String>& name = nullopt);
209
210 private:
211 nonvirtual TaskType AddTask_ (const TaskType& task, const optional<Characters::String>& name);
212
213 public:
214 /**
215 * It is NOT an error to call this with a task that is not in the Queue
216 * (since it would be a race to try to find out if it was already executed.
217 *
218 * It can cancel a task if it has not yet been started, or EVEN if its already in
219 * progress (see Thread::Abort - it sends abort signal)
220 *
221 * The function doesn't return until the task has been successfully cancelled, or it throws if timeout.
222 */
223 nonvirtual void AbortTask (const TaskType& task, Time::DurationSeconds timeout = Time::kInfinity);
224
225 public:
226 /**
227 * See AbortTask () - it aborts all tasks - if any.
228 */
229 nonvirtual void AbortTasks (Time::DurationSeconds timeout = Time::kInfinity);
230
231 public:
232 /**
233 * returns true if queued OR actively running.
234 *
235 * \pre task != nullptr
236 */
237 nonvirtual bool IsPresent (const TaskType& task) const;
238
239 public:
240 /**
241 * returns true actively running
242 *
243 * \pre task != nullptr
244 */
245 nonvirtual bool IsRunning (const TaskType& task) const;
246
247 public:
248 /**
249 * throws if timeout. Returns when task has completed (or if not in task q)
250 *
251 * \pre task != nullptr
252 */
253 nonvirtual void WaitForTask (const TaskType& task, Time::DurationSeconds timeout = Time::kInfinity) const;
254
255 public:
256 /**
257 */
258 struct TaskInfo {
259 TaskType fTask;
260 optional<Characters::String> fName;
261 optional<Time::TimePointSeconds> fRunningSince; // if missing, cuz not running
262
263 nonvirtual bool IsRunning () const;
264 };
265
266 public:
267 /**
268 * \brief returns GetPendingTasks () + GetRunningTasks () - but also some extra information about each task
269 */
270 nonvirtual Containers::Collection<TaskInfo> GetTasks () const;
271
272 public:
273 /**
274 * return all tasks which are queued, but haven't yet been assigned to a thread.
275 */
276 nonvirtual Containers::Collection<TaskType> GetPendingTasks () const;
277
278 public:
279 /**
280 * return all tasks which are currently running (assigned to some thread in the thread pool).
281 * \note - this is a snapshot in time of something which is often rapidly changing, so by the time
282 * you look at it, it may have changed (but since we use shared_ptrs, its always safe to look at).
283 */
284 nonvirtual Containers::Collection<TaskType> GetRunningTasks () const;
285
286 public:
287 /**
288 * \brief return total number of tasks, either pending, or currently running.
289 *
290 * This is GetRunningTasks().size () + GetPendingTasks ().size (), or alternatively GetTasks.size (), but more efficient.
291 */
292 nonvirtual size_t GetTasksCount () const;
293
294 public:
295 /**
296 * \brief return GetPendingTasks ().size (), except probably much more efficient
297 */
298 nonvirtual size_t GetPendingTasksCount () const;
299
300 public:
301 /**
302 * Wait for the given amount of time for all (either given argument or all tasks in this thread pool) to be done.
303 *
304 * When called with a specific set of tasks, this procedure waits for exactly those tasks. When called with no task
305 * argument, it waits until GetTaskCount () == 0.
306 *
307 * \note For the all-tasks overload, if new tasks are added to the thread pool, those are waited for too.
308 * But - its perfectly legal to add new tasks after this returns, so the task count could increase (if tasks were added) after
309 * this returns without exception (obviously there are likely more tasks if it returns with a timeout exception).
310 *
311 * \note ***Cancelation Point***
312 */
313 nonvirtual void WaitForTasksDone (const Traversal::Iterable<TaskType>& tasks, Time::DurationSeconds timeout = Time::kInfinity) const;
314 nonvirtual void WaitForTasksDone (Time::DurationSeconds timeout = Time::kInfinity) const;
315
316 public:
317 /**
318 * Wait for the given amount of time for all (either given argument or all tasks in this thread pool) to be done.
319 *
320 * When called with a specific set of tasks, this procedure waits for exactly those tasks. When called with no task
321 * argument, it waits until GetTaskCount () == 0.
322 *
323 * \note For the all-tasks overload, if new tasks are added to the thread pool, those are waited for too.
324 * But - its perfectly legal to add new tasks after this returns, so the task count could increase (if tasks were added) after
325 * this returns without exception (obviously there are likely more tasks if it returns with a timeout exception).
326 *
327 * \note ***Cancelation Point***
328 */
329 nonvirtual void WaitForTasksDoneUntil (const Traversal::Iterable<TaskType>& tasks, Time::TimePointSeconds timeoutAt) const;
330 nonvirtual void WaitForTasksDoneUntil (Time::TimePointSeconds timeoutAt) const;
331
332 public:
333 /**
334 * NOTE - see http://stroika-bugs.sophists.com/browse/STK-995 for how we might cheaply significantly improve these stats
335 */
336 struct Statistics {
337 unsigned int fNumberOfTasksAdded{0};
338
339 /**
340 * Reports number of tasks that ran to completion, possibly with exceptions, possibly without
341 */
342 unsigned int fNumberOfTasksCompleted{0};
343
344 /**
345 * This is the divisor for fTotalTimeConsumed, and may not include all tasks for a variety of reasons (canceled etc).
346 * It doesn't contain tasks not yet run. Number of tasks reporting COULD exceed number added (since you can reset counters
347 * while there are tasks in queue).
348 */
349 unsigned int fNumberOfTasksReporting{0};
350
351 Time::DurationSeconds fTotalTimeConsumed{0.0};
352
353 /**
354 *
355 */
356 optional<Time::DurationSeconds> GetMeanTimeConsumed () const;
357
358 /**
359 * See Characters::ToString ()
360 */
361 nonvirtual Characters::String ToString () const;
362 };
363
364 public:
365 /**
366 * \pre (GetOptions ().fCollectStatistics);
367 */
368 nonvirtual void ResetStatistics ();
369
370 public:
371 /**
372 * \pre (GetOptions ().fCollectStatistics);
373 */
374 nonvirtual Statistics GetCurrentStatistics () const;
375
376 public:
377 /**
378 * a helpful debug dump of the ThreadPool status
379 */
380 nonvirtual Characters::String ToString () const;
381
382 public:
383 [[deprecated ("Since Stroika v3.0d5 use Options")]] ThreadPool (unsigned int tc, const optional<Characters::String>& name = nullopt)
384 : ThreadPool{Options{tc, name}}
385 {
386 }
387
388 private:
389 bool fCollectStatistics_{false};
390 Statistics fCollectedTaskStats_;
391
392 private:
393 nonvirtual void Abort_ () noexcept;
394
395 private:
396 nonvirtual void AbortAndWaitForDone_ () noexcept;
397
398 private:
399 class MyRunnable_;
400
401 private:
402 struct TPInfo_ {
403 Thread::Ptr fThread;
404 shared_ptr<MyRunnable_> fRunnable;
405 };
406
407 private:
408 // Called internally from threadpool tasks - to wait until there is a new task to run.
409 // This will not return UNTIL it has a new task to proceed with (except via exception like Thread::AbortException)
410 nonvirtual void WaitForNextTask_ (TaskType* result, optional<Characters::String>* resultName);
411 nonvirtual TPInfo_ mkThread_ ();
412
413 private:
414 struct PendingTaskInfo_ {
415 TaskType fTask;
416 optional<Characters::String> fName;
417 };
418 mutable mutex fCriticalSection_; // fCriticalSection_ protects fThreads_ and fPendingTasks_ and the fields of the MyRunnable_ members inside each thread (fThreads_).
419 // Each should be a very short critical section, except for SetPoolSize()
420 atomic<bool> fAborted_{false};
421 optional<QMax> fDefaultQMax_;
422 Containers::Collection<TPInfo_> fThreads_; // all threads, and a data member for thread object, and one for running task, if any
423 list<PendingTaskInfo_> fPendingTasks_; // tasks not yet running - somewhat like a queue, but allow remove from middle
424 WaitableEvent fTasksMaybeAdded_{}; // recheck for new tasks (or other events - wakeup waiters on fTasks);
425 atomic<unsigned int> fNextThreadEntryNumber_{1};
426 optional<Characters::String> fThreadPoolName_;
427
428 private:
429 friend class MyRunnable_; // So MyRunnable_ can call WaitForNextTask_()
430 };
431
432}
433
434/*
435 ********************************************************************************
436 ***************************** Implementation Details ***************************
437 ********************************************************************************
438 */
439#include "ThreadPool.inl"
440
441#endif /*_Stroika_Foundation_Execution_ThreadPool_h_*/
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
String is like std::u32string, except it is much easier to use, often much more space efficient,...
Definition String.h:201
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
Definition Collection.h:102
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Definition Iterable.h:237