Stroika Library 3.0d18
 
Loading...
Searching...
No Matches
ThreadPool.h
Go to the documentation of this file.
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#ifndef _Stroika_Foundation_Execution_ThreadPool_h_
5#define _Stroika_Foundation_Execution_ThreadPool_h_ 1
6
7#include "Stroika/Foundation/StroikaPreComp.h"
8
9#include <list>
10#include <mutex>
11
12#include "Stroika/Foundation/Containers/Collection.h"
15#include "Stroika/Foundation/Execution/WaitableEvent.h"
16
17/**
18 * \file
19 *
20 * \note Code-Status: <a href="Code-Status.md#Beta">Beta</a>
21 *
22 * TODO:
23 * @todo AddTask () implementation is HORRIBLE (for case where fAddBlockTimeout != 0) and needs to be rewritten with condition variables.
24 *
25 * @todo ThreadPool::WaitForTask () is a very sloppy inefficient implementation (but probably rarely called).
26 *
27 * @todo CONSIDER USE OF blocking q - I think it will help. Or figure out
28 * how these tie together. Or rewrite using condition_variable.
29 *
30 * @todo Current approach to aborting a running task is to abort the thread. But the current
31 * thread code doesn't support restarting a thread once its been aborted. We PROBABLY
32 * should correct that at some point - and allow a thread to undo its abort-in-progress.
33 * However - no need immediately. Instead - the current ThreadPool implementation simply
34 * drops that thread and builds a new one. Performance overhead yes - but only for the
35 * likely rare case of aborting a running task.
36 *
37 * @todo Consider adding the idea of TaskGroups - which are properties shared by all tasks (or some tasks)
38 * added to a threadpool. If multiple tasks are added to the threadpool with the same
39 * TaskGroup, then they respect that task-group's constraints. One example constraint would be
40 * mutual run exclusion. This would allow you to create lockless threaded procedures, because
41 * they would be guaranteed to not be run all at the same time, and yet could STILL leverage
42 * the benefits of thread pooling.
43 *
44 * For example, if you had 3 threads in the pool, and 5 thread groups, then typically one or
45 * more thread groups would be idle. The thread groups give you lockless execution, and
46 * the threadpool lets the 5 groups run 'at the same time' on only 3 threads.
47 */
48
50
51 /**
52 * The ThreadPool class creates a small fixed number of Thread objects, and lets you use them
53 * as if there were many more. You submit a task (representable as a comparable std::function - @see Function) -
54 * and it gets eventually executed.
55 *
56 * If as Task in the thread pool raises an exception - this will be IGNORED (except for the
57 * special case of Thread::AbortException which is used internally to end the threadpool or
58 * remove some threads). Because of this, your submitted runnable should take care of its own
59 * error handling internally.
60 *
61 * ThreadPool mainly useful for servicing lost-cost calls/threads, where the overhead
62 * of constructing the thread is significant compared to the cost of performing the action,
63 * and where the priority & stacksize can all be predetermined and 'shared'.
64 * Also - where you want to CONTROL the level of thread creation (possibly to avoid
65 * DOS attacks or just accidental overloading).
66 *
67 * \pre Debug::AppearsDuringMainLifetime (); during the lifetime of the ThreadPool
68 *
69 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
70 * all methods can be freely used from any thread, and they will block internally as needed.
71 *
72 * \note pointless to ever create threadpool and not use, so [[nodiscard]] appropriate
73 */
74 class [[nodiscard]] ThreadPool {
75 public:
76 /**
77 * Affects behavior of AddTask command. If a max provided, then attempts to Add
78 * will check this versus the actual pending Q length, and wait/throw if insufficient space.
79 *
80 * Use this overload to assure thread pools don't 'blow up' with too many tasks.
81 *
82 * \note - often only provide QMax::fLength, since the default to not blocking - just throwing cuz Q full - is quite reasonable.
83 */
84 struct QMax {
85 size_t fLength{0};
86
87 /**
88 * Zero timeout means never wait (AddTask either adds or throws but doesn't wait for Q to empty enuf).
89 */
90 Time::DurationSeconds fAddBlockTimeout{0s};
91 };
92
93 /**
94 */
95 struct Options {
96 /**
97 * number of threads allocated to service tasks.
98 */
99 unsigned int fThreadCount{thread::hardware_concurrency ()};
100
101 /**
102 * Name used in various debug messages.
103 */
104 optional<Characters::String> fThreadPoolName;
105
106 /**
107 * Default to no max Q - infinite queue size. Note, a value of fLength=fThreadCount, or some small mutiple, would often be reasonable.
108 */
109 optional<QMax> fQMax;
110
111 /**
112 * defaults false;
113 */
114 bool fCollectStatistics{false};
115 };
116
117 public:
118 /**
119 * \par Example Usage
120 * \code
121 * ThreadPool p{ThreadPool::Options{.fThreadCount = 3}};
122 * p.AddTask ([&q, &counter] () {
123 * ..dostuff..
124 * });
125 * // when goes out of scope automatically blocks waiting for threads to complete...
126 * // or call p.WaitForTasksDoneUntil ()
127 * \endcode
128 */
129 ThreadPool ();
130 ThreadPool (const Options& options);
131 ThreadPool (ThreadPool&&) = delete;
132 ThreadPool (const ThreadPool&) = delete;
133
134 public:
135 nonvirtual ThreadPool& operator= (ThreadPool&&) = delete;
136 nonvirtual ThreadPool& operator= (const ThreadPool&) = delete;
137
138 public:
139 /**
140 * Destroying a threadpool implicitly calls AbortAndWaitForDone () and eats any errors (cannot rethrow).
141 *
142 * \note - ThreadPool used to have explicit abort methods, but there was no point. When aborting, you must wait for them all to
143 * shut down to destroy the object. And since you cannot restart the object, there is no point in ever aborting without destroying.
144 * so KISS - just destroy the ThreadPool object.
145 */
146 ~ThreadPool ();
147
148 public:
149 /**
150 * \note It is important (required) that all tasks added to a ThreadPool respond in a timely manner to Thread Abort.
151 * ThreadPool counts on that for clean shutdown.
152 *
153 * This means periodically calling CheckForInterruption () and that any waits respect thread cancelation (stop_token).
154 *
155 * Tasks may exit via exception, but nothing will be done with that exception (beyond DbgTrace logging). So generally
156 * not a good idea, except for ThreadAbort handling.
157 */
158 using TaskType = Function<void ()>;
159
160 public:
161 /**
162 * These options have have been modified by various APIs, and reflect the current state of options, not necessarily those that the
163 * ThreadPool was created with.
164 */
165 nonvirtual Options GetOptions () const;
166
167 public:
168 /**
169 * This returns the number of threads in the pool (not the number of tasks). Note 0 is a legal size.
170 */
171 nonvirtual unsigned int GetPoolSize () const;
172
173 public:
174 /**
175 * SetPoolSize () is advisory. It attempts to add or remove entries as requested. Note - 0 is a legal size.
176 *
177 * But under some circumstances, it will fail. For example, if tasks are busy
178 * running on all threads, the number of threads in the pool cannot be decreased.
179 *
180 * @todo - WE CAN do better than this - at least marking the thread as to be removed when the
181 * task finishes - but NYI
182 */
183 nonvirtual void SetPoolSize (unsigned int poolSize);
184
185 public:
186 /**
187 * Push the given task into the queue (possibly blocking til space in the Q or throwing if no space in Q, but does NOT block waiting for Q to till).
188 *
189 * \par Example Usage
190 * \code
191 * ThreadPool p;
192 * p.AddTask ([] () {doIt ();});
193 * \endcode
194 *
195 * if qmax is provided, it takes precedence over any default value associated with the ThreadPool (constructor). If neither
196 * provided (as an argument or associated with the pool, this is treated as no max, and the addition just proceeds.
197 *
198 * If qMax provided (even indirectly), assure task q length doesn't exceed argument by waiting up to the qMax
199 * duration, and either timing out, or successfully add the task.
200 *
201 * \note Design Note:
202 * The reason this returns as TaskType is that its easy to convert a lambda or whatever into a TaskType, but if you do
203 * it multiple times you get different (!=) values. So to make the auto conversion work easier without needing
204 * to first create a variable, and then do the add task, you can just do them together. And it avoids mistakes like:
205 * function<void()> f = ...;
206 * p.AddTask(f);
207 * p.RemoveTask (p); // fails cuz different 'TaskType' added - f converted to TaskType twice!
208 */
209 nonvirtual TaskType AddTask (const TaskType& task, const optional<Characters::String>& name = nullopt);
210 nonvirtual TaskType AddTask (const TaskType& task, QMax qmax, const optional<Characters::String>& name = nullopt);
211
212 private:
213 nonvirtual TaskType AddTask_ (const TaskType& task, const optional<Characters::String>& name);
214
215 public:
216 /**
217 * It is NOT an error to call this with a task that is not in the Queue
218 * (since it would be a race to try to find out if it was already executed.
219 *
220 * It can cancel a task if it has not yet been started, or EVEN if its already in
221 * progress (see Thread::Abort - it sends abort signal)
222 *
223 * The function doesn't return until the task has been successfully cancelled, or it throws if timeout.
224 */
225 nonvirtual void AbortTask (const TaskType& task, Time::DurationSeconds timeout = Time::kInfinity);
226
227 public:
228 /**
229 * See AbortTask () - it aborts all tasks - if any.
230 */
231 nonvirtual void AbortTasks (Time::DurationSeconds timeout = Time::kInfinity);
232
233 public:
234 /**
235 * returns true if queued OR actively running.
236 *
237 * \pre task != nullptr
238 */
239 nonvirtual bool IsPresent (const TaskType& task) const;
240
241 public:
242 /**
243 * returns true actively running
244 *
245 * \pre task != nullptr
246 */
247 nonvirtual bool IsRunning (const TaskType& task) const;
248
249 public:
250 /**
251 * throws if timeout. Returns when task has completed (or if not in task q)
252 *
253 * \pre task != nullptr
254 */
255 nonvirtual void WaitForTask (const TaskType& task, Time::DurationSeconds timeout = Time::kInfinity) const;
256
257 public:
258 /**
259 */
260 struct TaskInfo {
261 TaskType fTask;
262 optional<Characters::String> fName;
263 optional<Time::TimePointSeconds> fRunningSince; // if missing, cuz not running
264
265 nonvirtual bool IsRunning () const;
266 };
267
268 public:
269 /**
270 * \brief returns GetPendingTasks () + GetRunningTasks () - but also some extra information about each task
271 */
272 nonvirtual Containers::Collection<TaskInfo> GetTasks () const;
273
274 public:
275 /**
276 * return all tasks which are queued, but haven't yet been assigned to a thread.
277 */
278 nonvirtual Containers::Collection<TaskType> GetPendingTasks () const;
279
280 public:
281 /**
282 * return all tasks which are currently running (assigned to some thread in the thread pool).
283 * \note - this is a snapshot in time of something which is often rapidly changing, so by the time
284 * you look at it, it may have changed (but since we use shared_ptrs, its always safe to look at).
285 */
286 nonvirtual Containers::Collection<TaskType> GetRunningTasks () const;
287
288 public:
289 /**
290 * \brief return total number of tasks, either pending, or currently running.
291 *
292 * This is GetRunningTasks().size () + GetPendingTasks ().size (), or alternatively GetTasks.size (), but more efficient.
293 */
294 nonvirtual size_t GetTasksCount () const;
295
296 public:
297 /**
298 * \brief return GetPendingTasks ().size (), except probably much more efficient
299 */
300 nonvirtual size_t GetPendingTasksCount () const;
301
302 public:
303 /**
304 * Wait for the given amount of time for all (either given argument or all tasks in this thread pool) to be done.
305 *
306 * When called with a specific set of tasks, this procedure waits for exactly those tasks. When called with no task
307 * argument, it waits until GetTaskCount () == 0.
308 *
309 * \note For the all-tasks overload, if new tasks are added to the thread pool, those are waited for too.
310 * But - its perfectly legal to add new tasks after this returns, so the task count could increase (if tasks were added) after
311 * this returns without exception (obviously there are likely more tasks if it returns with a timeout exception).
312 *
313 * \note ***Cancelation Point***
314 */
315 nonvirtual void WaitForTasksDone (const Traversal::Iterable<TaskType>& tasks, Time::DurationSeconds timeout = Time::kInfinity) const;
316 nonvirtual void WaitForTasksDone (Time::DurationSeconds timeout = Time::kInfinity) const;
317
318 public:
319 /**
320 * Wait for the given amount of time for all (either given argument or all tasks in this thread pool) to be done.
321 *
322 * When called with a specific set of tasks, this procedure waits for exactly those tasks. When called with no task
323 * argument, it waits until GetTaskCount () == 0.
324 *
325 * \note For the all-tasks overload, if new tasks are added to the thread pool, those are waited for too.
326 * But - its perfectly legal to add new tasks after this returns, so the task count could increase (if tasks were added) after
327 * this returns without exception (obviously there are likely more tasks if it returns with a timeout exception).
328 *
329 * \note ***Cancelation Point***
330 */
331 nonvirtual void WaitForTasksDoneUntil (const Traversal::Iterable<TaskType>& tasks, Time::TimePointSeconds timeoutAt) const;
332 nonvirtual void WaitForTasksDoneUntil (Time::TimePointSeconds timeoutAt) const;
333
334 public:
335 /**
336 * NOTE - see http://stroika-bugs.sophists.com/browse/STK-995 for how we might cheaply significantly improve these stats
337 */
338 struct Statistics {
339 unsigned int fNumberOfTasksAdded{0};
340
341 /**
342 * Reports number of tasks that ran to completion, possibly with exceptions, possibly without
343 */
344 unsigned int fNumberOfTasksCompleted{0};
345
346 /**
347 * This is the divisor for fTotalTimeConsumed, and may not include all tasks for a variety of reasons (canceled etc).
348 * It doesn't contain tasks not yet run. Number of tasks reporting COULD exceed number added (since you can reset counters
349 * while there are tasks in queue).
350 */
351 unsigned int fNumberOfTasksReporting{0};
352
353 Time::DurationSeconds fTotalTimeConsumed{0.0};
354
355 /**
356 *
357 */
358 optional<Time::DurationSeconds> GetMeanTimeConsumed () const;
359
360 /**
361 * See Characters::ToString ()
362 */
363 nonvirtual Characters::String ToString () const;
364 };
365
366 public:
367 /**
368 * \pre (GetOptions ().fCollectStatistics);
369 */
370 nonvirtual void ResetStatistics ();
371
372 public:
373 /**
374 * \pre (GetOptions ().fCollectStatistics);
375 */
376 nonvirtual Statistics GetCurrentStatistics () const;
377
378 public:
379 /**
380 * a helpful debug dump of the ThreadPool status
381 */
382 nonvirtual Characters::String ToString () const;
383
384 public:
385 [[deprecated ("Since Stroika v3.0d5 use Options")]] ThreadPool (unsigned int tc, const optional<Characters::String>& name = nullopt)
386 : ThreadPool{Options{tc, name}}
387 {
388 }
389
390 private:
391 bool fCollectStatistics_{false};
392 Statistics fCollectedTaskStats_;
393
394 private:
395 nonvirtual void Abort_ () noexcept;
396
397 private:
398 nonvirtual void AbortAndWaitForDone_ () noexcept;
399
400 private:
401 class MyRunnable_;
402
403 private:
404 struct TPInfo_ {
405 Thread::Ptr fThread;
406 shared_ptr<MyRunnable_> fRunnable;
407 };
408
409 private:
410 // Called internally from threadpool tasks - to wait until there is a new task to run.
411 // This will not return UNTIL it has a new task to proceed with (except via exception like Thread::AbortException)
412 nonvirtual void WaitForNextTask_ (TaskType* result, optional<Characters::String>* resultName);
413 nonvirtual TPInfo_ mkThread_ ();
414
415 private:
416 struct PendingTaskInfo_ {
417 TaskType fTask;
418 optional<Characters::String> fName;
419 };
420 mutable mutex fCriticalSection_; // fCriticalSection_ protects fThreads_ and fPendingTasks_ and the fields of the MyRunnable_ members inside each thread (fThreads_).
421 // Each should be a very short critical section, except for SetPoolSize()
422 atomic<bool> fAborted_{false};
423 optional<QMax> fDefaultQMax_;
424 Containers::Collection<TPInfo_> fThreads_; // all threads, and a data member for thread object, and one for running task, if any
425 list<PendingTaskInfo_> fPendingTasks_; // tasks not yet running - somewhat like a queue, but allow remove from middle
426 WaitableEvent fTasksMaybeAdded_{}; // recheck for new tasks (or other events - wakeup waiters on fTasks);
427 atomic<unsigned int> fNextThreadEntryNumber_{1};
428 optional<Characters::String> fThreadPoolName_;
429
430 private:
431 friend class MyRunnable_; // So MyRunnable_ can call WaitForNextTask_()
432 };
433
434}
435
436/*
437 ********************************************************************************
438 ***************************** Implementation Details ***************************
439 ********************************************************************************
440 */
441#include "ThreadPool.inl"
442
443#endif /*_Stroika_Foundation_Execution_ThreadPool_h_*/
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
String is like std::u32string, except it is much easier to use, often much more space efficient,...
Definition String.h:201
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Definition Iterable.h:237