Stroika Library 3.0d23x
 
Loading...
Searching...
No Matches
ThreadPool.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "Stroika/Foundation/StroikaPreComp.h"
5
9#include "Stroika/Foundation/Debug/Main.h"
11#include "Stroika/Foundation/Execution/Common.h"
14#include "Stroika/Foundation/Execution/TimeOutException.h"
16
17#include "ThreadPool.h"
18
19using namespace Stroika::Foundation;
22using namespace Stroika::Foundation::Execution;
23
24using Memory::MakeSharedPtr;
25
26// Comment this in to turn on aggressive noisy DbgTrace in this module
27//#define USE_NOISY_TRACE_IN_THIS_MODULE_ 1
28
29namespace {
30 constexpr bool kEmitDbgTraceOnThreadPoolEntryExceptions_ = qStroika_Foundation_Debug_AssertionsChecked;
31}
32
33class ThreadPool::MyRunnable_ {
34public:
35 MyRunnable_ (ThreadPool& threadPool)
36 : fThreadPool{threadPool}
37 {
38 }
39
40public:
41 tuple<TaskType, Time::TimePointSeconds, optional<String>> GetCurTaskInfo () const
42 {
43 // assume caller holds lock
44 return make_tuple (fCurTask, fCurTaskStartedAt, fCurName); // note curTask can be null, in which case these other things are meaningless
45 }
46
47public:
48 nonvirtual void Run ()
49 {
50 // For NOW - allow ThreadAbort to just kill this thread. In the future - we may want to implement some sort of restartability
51
52 // Keep grabbing new tasks, and running them
53 while (true) {
54 {
56 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
57 Assert (fCurTask == nullptr);
58 }
59 // Subtle point, but we must copy directly into fCurTask (WaitForNextTask_ call) so its filled in under lock
60 // while being moved so task moves from pending to in-use without ever temporarily disappearing from known tasks lists
61 fThreadPool.WaitForNextTask_ (&fCurTask, &fCurName); // This will block INDEFINITELY until ThreadAbort throws out or we have a new task to run
63 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
64 Assert (fCurTask != nullptr);
65 }
66 }
67 [[maybe_unused]] auto&& cleanup = Finally ([this] () noexcept {
68 Time::TimePointSeconds taskStartedAt;
69 {
70 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
71 fCurTask = nullptr;
72 taskStartedAt = fCurTaskStartedAt;
73 }
74 if (fThreadPool.fCollectStatistics_) {
75 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
76 ++fThreadPool.fCollectedTaskStats_.fNumberOfTasksCompleted;
77 ++fThreadPool.fCollectedTaskStats_.fNumberOfTasksReporting;
78 fThreadPool.fCollectedTaskStats_.fTotalTimeConsumed += Time::GetTickCount () - taskStartedAt;
79 }
80 });
81 try {
82 // Use lock to access fCurTask, but don't hold the lock during run, so others can call getcurrenttask
83 ThreadPool::TaskType task2Run;
84 {
85 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
86 task2Run = fCurTask;
87 fCurTaskStartedAt = Time::GetTickCount ();
88 }
89 task2Run ();
90 }
91 catch (const Thread::AbortException&) {
92 throw; // cancel this thread
93 }
94 catch (...) {
95 // other exceptions WARNING WITH DEBUG MESSAGE - but otherwise - EAT/IGNORE
96 if constexpr (kEmitDbgTraceOnThreadPoolEntryExceptions_ and qStroika_Foundation_Debug_DefaultTracingOn) {
97 DbgTrace ("in threadpool, ignoring exception running task: {}"_f, current_exception ());
98 }
99 }
100 }
101 }
102
103public:
104 ThreadPool& fThreadPool;
105 // fThreadPool.fCriticalSection_ protect access to fCurTask/fCurTaskStartedAt/fCurName - very short duration
106 ThreadPool::TaskType fCurTask;
107 Time::TimePointSeconds fCurTaskStartedAt{0s}; // meaningless if fCurTask==nullptr
108 optional<Characters::String> fCurName; // ""
109};
110
111/*
112 ********************************************************************************
113 ************************ Execution::ThreadPool::Statistics *********************
114 ********************************************************************************
115 */
117{
118 StringBuilder sb;
119 sb << "{"sv;
120 sb << "NumberOfTasksAdded: "sv << fNumberOfTasksAdded;
121 sb << ", NumberOfTasksCompleted: "sv << fNumberOfTasksCompleted;
122 sb << ", TotalTimeConsumed: "sv << fTotalTimeConsumed;
123 sb << "}"sv;
124 return sb;
125}
126
127/*
128 ********************************************************************************
129 ****************************** Execution::ThreadPool ***************************
130 ********************************************************************************
131 */
132ThreadPool::ThreadPool (const Options& options)
133 : fCollectStatistics_{options.fCollectStatistics}
134 , fDefaultQMax_{options.fQMax}
135 , fThreadPoolName_{options.fThreadPoolName}
136{
137 Require (Debug::AppearsDuringMainLifetime ());
138 SetPoolSize (options.fThreadCount);
139}
140
142{
143 Require (Debug::AppearsDuringMainLifetime ());
144 AbortAndWaitForDone_ ();
145}
146
147unsigned int ThreadPool::GetPoolSize () const
148{
149 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
150 return static_cast<unsigned int> (fThreads_.size ());
151}
152
153void ThreadPool::SetPoolSize (unsigned int poolSize)
154{
155 Debug::TraceContextBumper ctx{"ThreadPool::SetPoolSize", "newPoolSize={}, *this={}"_f, poolSize, ToString ()};
156 Require (not fAborted_);
157 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
158 DbgTrace ("fThreads_.size ()={}"_f, fThreads_.size ());
159 while (poolSize > fThreads_.size ()) {
160 fThreads_.Add (mkThread_ ());
161 }
162
163 // Still quite weak implementation of REMOVAL
164 while (poolSize < fThreads_.size ()) {
165 // iterate over threads if any not busy, remove that them first
166 bool anyFoundToKill = false;
167 for (Iterator<TPInfo_> i = fThreads_.begin (); i != fThreads_.end (); ++i) {
168 TaskType ct{i->fRunnable->fCurTask};
169 if (ct == nullptr) {
170 // since we have fCriticalSection_ - we can safely remove this thread
171 fThreads_.Remove (i);
172 anyFoundToKill = true;
173 break;
174 }
175 }
176 if (not anyFoundToKill) {
177 // @todo - fix this better/eventually - either throw or wait...
178 DbgTrace ("Failed to lower the loop size - cuz all threads busy - giving up"_f);
179 return;
180 }
181 }
182}
183
184auto ThreadPool::AddTask (const TaskType& task, QMax qmax, const optional<Characters::String>& name) -> TaskType
185{
186 // also INTENTIONALLY dont hold lock long enuf to make this work 100% reliably cuz these magic numbers dont need to be precise, just approximate
187 // @todo rewrite this with condition variables, so more efficient and wakes up/times out appropriately...
188 Time::TimePointSeconds timeoutAt = Time::GetTickCount () + qmax.fAddBlockTimeout;
189#if qStroika_Foundation_Debug_AssertionsChecked
190 bool blockedAtLeastOnce = false;
191#endif
192 while (true) {
193 if (GetPendingTasksCount () >= qmax.fLength) [[unlikely]] {
194#if qStroika_Foundation_Debug_AssertionsChecked
195 if (not blockedAtLeastOnce) {
196 DbgTrace ("Blocking inside ThreadPool::AddTask due to excessive pending task count"_f);
197 blockedAtLeastOnce = true;
198 }
199#endif
200 ThrowTimeoutExceptionAfter (timeoutAt);
201 Execution::Sleep (500ms); // @todo fix and use condition variable - but good news is can only happen if fAddBlockTimeout != 0s
202 }
203 else {
204 return AddTask_ (task, name);
205 }
206 }
207}
208
209auto ThreadPool::AddTask_ (const TaskType& task, const optional<Characters::String>& name) -> TaskType
210{
211#if USE_NOISY_TRACE_IN_THIS_MODULE_
212 Debug::TraceContextBumper ctx{"ThreadPool::AddTask_"};
213#endif
214 Require (not fAborted_);
215 {
216 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
217 fPendingTasks_.push_back (PendingTaskInfo_{.fTask = task, .fName = name});
218#if USE_NOISY_TRACE_IN_THIS_MODULE_
219 DbgTrace ("fPendingTasks.size () now = {}"_f, (int)fPendingTasks_.size ());
220#endif
221 ++fCollectedTaskStats_.fNumberOfTasksAdded;
222 }
223
224 // Notify any waiting threads to wakeup and claim the next task
225 fTasksMaybeAdded_.Set ();
226 // this would be a race - if aborting and adding tasks at the same time
227 Require (not fAborted_);
228 return task;
229}
230
232{
233 Debug::TraceContextBumper ctx{"ThreadPool::AbortTask"};
234 {
235 // First see if its in the Q
236 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
237 for (auto i = fPendingTasks_.begin (); i != fPendingTasks_.end (); ++i) {
238 if (i->fTask == task) {
239 fPendingTasks_.erase (i);
240 return;
241 }
242 }
243 }
244
245 // If we got here - its NOT in the task Q, so maybe its already running.
246 //
247 //
248
249 // TODO:
250 // We walk the list of existing threads and ask each one if its (indirected - running task) is the given one and abort that task.
251 // But that requires we can RESTART an ABORTED thread (or that we remove it from the list - maybe thats better). THat COULD be OK
252 // actually since it involves on API changes and makes sense. The only slight issue is a peformace one but probably for something
253 // quite rare.
254 //
255 // Anyhow SB OK for now to just not allow aborting a task which has already started....
256 Thread::Ptr thread2Kill;
257 {
258 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
259 for (Iterator<TPInfo_> i = fThreads_.begin (); i != fThreads_.end (); ++i) {
260 TaskType ct{i->fRunnable->fCurTask};
261 if (task == ct) {
262 thread2Kill = i->fThread;
263 fThreads_.Update (i, mkThread_ ());
264 break;
265 }
266 }
267 }
268 if (thread2Kill != nullptr) {
269 thread2Kill.AbortAndWaitForDone (timeout);
270 }
271}
272
274{
275 Debug::TraceContextBumper ctx{"ThreadPool::AbortTasks"};
276 auto tps = GetPoolSize ();
277 {
278 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
279 fPendingTasks_.clear ();
280 for (const TPInfo_& ti : fThreads_) {
281 ti.fThread.Abort ();
282 }
283 }
284 {
285 // @todo maybe fix unsafe - waiting here holding the critsec lock - seems deadlock waiting to happen - LGP 2023-11-05
286 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
287 for (const TPInfo_& ti : fThreads_) {
288 // @todo fix wrong timeout value here
289 ti.fThread.AbortAndWaitForDone (timeout);
290 }
291 fThreads_.RemoveAll ();
292 }
293 // hack - not a good design or impl!!! - waste to recreate if not needed!
294 SetPoolSize (tps);
295}
296
297bool ThreadPool::IsPresent (const TaskType& task) const
298{
299 Require (task != nullptr);
300 {
301 // First see if its in the Q
302 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
303 for (auto i = fPendingTasks_.begin (); i != fPendingTasks_.end (); ++i) {
304 if (i->fTask == task) {
305 return true;
306 }
307 }
308 // then check if running
309 for (auto i = fThreads_.begin (); i != fThreads_.end (); ++i) {
310 TaskType rTask{i->fRunnable->fCurTask};
311 if (task == rTask) {
312 return true;
313 }
314 }
315 }
316 return false;
317}
318
319bool ThreadPool::IsRunning (const TaskType& task) const
320{
321 Require (task != nullptr);
322 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
323 for (auto i = fThreads_.begin (); i != fThreads_.end (); ++i) {
324 if (task == i->fRunnable->fCurTask) {
325 return true;
326 }
327 }
328 return false;
329}
330
332{
333 Debug::TraceContextBumper ctx{"ThreadPool::WaitForTask"};
334 // Inefficient / VERY SLOPPY impl - @todo fix use WaitableEvent or condition variables...
336 TimePointSeconds timeoutAt = timeout + Time::GetTickCount ();
337 while (true) {
338 if (not IsPresent (task)) {
339 return;
340 }
341 Time::DurationSeconds remaining = timeoutAt - Time::GetTickCount ();
342 ThrowTimeoutExceptionAfter (timeoutAt);
343 Execution::Sleep (min<Time::DurationSeconds> (remaining, 1.0s));
344 }
345}
346
347auto ThreadPool::GetTasks () const -> Collection<TaskInfo>
348{
350 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
351 for (const auto& ti : fPendingTasks_) {
352 result.Add (TaskInfo{.fTask = ti.fTask, .fName = ti.fName});
353 }
354 for (auto i = fThreads_.begin (); i != fThreads_.end (); ++i) {
355 tuple<TaskType, Time::TimePointSeconds, optional<String>> curTaskInfo = i->fRunnable->GetCurTaskInfo ();
356 if (get<TaskType> (curTaskInfo) != nullptr) {
357 result.Add (TaskInfo{
358 .fTask = get<TaskType> (curTaskInfo),
359 .fName = get<optional<String>> (curTaskInfo),
360 .fRunningSince = get<Time::TimePointSeconds> (curTaskInfo),
361 });
362 }
363 }
364 return result;
365}
366
368{
370 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
371 for (auto i = fThreads_.begin (); i != fThreads_.end (); ++i) {
372 TaskType task{i->fRunnable->fCurTask};
373 if (task != nullptr) {
374 result.Add (task);
375 }
376 }
377 return result;
378}
379
381{
382 // First see if its in the Q
383 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
384 size_t count = fPendingTasks_.size ();
385 for (auto i = fThreads_.begin (); i != fThreads_.end (); ++i) {
386 AssertNotNull (i->fRunnable);
387 if (i->fRunnable->fCurTask != nullptr) {
388 ++count;
389 }
390 }
391 return count;
392}
393
395{
397 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
398 for (const auto& i : fPendingTasks_) {
399 result.Add (i.fTask);
400 }
401 return result;
402}
403
405{
406 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
407 return fPendingTasks_.size ();
408}
409
411{
412#if USE_NOISY_TRACE_IN_THIS_MODULE_
414 "ThreadPool::WaitForTasksDoneUntil", "*this={}, tasks={}, timeoutAt={}"_f, ToString (), ToString (tasks), timeoutAt)};
415#endif
417 for (const auto& task : tasks) {
418 auto now = Time::GetTickCount ();
419 ThrowTimeoutExceptionAfter (timeoutAt);
420 WaitForTask (task, timeoutAt - now);
421 }
422}
423
425{
426#if USE_NOISY_TRACE_IN_THIS_MODULE_
427 Debug::TraceContextBumper ctx{Stroika_Foundation_Debug_OptionalizeTraceArgs ("ThreadPool::WaitForTasksDoneUntil",
428 "*this={}, timeoutAt={}"_f, ToString (), timeoutAt)};
429#endif
431 // @todo - use WaitablEvent - this is a horrible implementation
432 while (GetTasksCount () != 0) {
433 ThrowTimeoutExceptionAfter (timeoutAt);
434 Execution::Sleep (100ms);
435 }
436}
437
439{
440 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
441 Require (fCollectStatistics_);
442 fCollectedTaskStats_ = {};
443}
444
446{
447 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
448 Require (fCollectStatistics_);
449 return fCollectedTaskStats_;
450}
451
452void ThreadPool::Abort_ () noexcept
453{
454 Thread::SuppressInterruptionInContext suppressCtx; // must cleanly shut down each of our sub-threads - even if our thread is aborting... don't be half-way aborted
455 Debug::TraceContextBumper ctx{Stroika_Foundation_Debug_OptionalizeTraceArgs ("ThreadPool::Abort_", "*this={}"_f, ToString ())};
456 Debug::TimingTrace tt{1.0s};
457 fAborted_ = true; // No race, because fAborted never 'unset'
458 // no need to set fTasksMaybeAdded_, since aborting each thread should be sufficient
459 {
460 // Clear the task Q and then abort each thread
461 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
462 fPendingTasks_.clear ();
463 for (const TPInfo_& ti : fThreads_) {
464 ti.fThread.Abort ();
465 }
466 }
467}
468
469void ThreadPool::AbortAndWaitForDone_ () noexcept
470{
471 Thread::SuppressInterruptionInContext suppressCtx; // cuz we must shutdown owned threads
472#if USE_NOISY_TRACE_IN_THIS_MODULE_
473 Debug::TraceContextBumper ctx{Stroika_Foundation_Debug_OptionalizeTraceArgs ("ThreadPool::AbortAndWaitForDone_", "*this={}"_f, ToString ())};
474 Debug::TimingTrace tt{1.0s};
475#endif
476 try {
477 Abort_ (); // to get the rest of the threadpool abort stuff triggered - flag saying aborting
478 Collection<Thread::Ptr> threadsToShutdown;
479 {
480 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
481 fThreads_.Apply ([&] (const TPInfo_& i) { threadsToShutdown.Add (i.fThread); });
482 }
483 Thread::AbortAndWaitForDone (threadsToShutdown);
484 {
485 [[maybe_unused]] lock_guard critSec{fCriticalSection_}; // they should all be shutdown now
486 fThreads_.Apply ([&] (const TPInfo_& i) { Assert (i.fThread.IsDone ()); });
487 fThreads_.RemoveAll ();
488 }
489 }
490 catch (...) {
491 DbgTrace ("ThreadPool::AbortAndWaitForDone_: serious bug/exception"_f);
492 AssertNotReached (); // this should never happen due to the SuppressInterruptionInContext...
493 }
494 Ensure (fThreads_.empty ());
495}
496
498{
499 StringBuilder sb;
500 sb << "{"sv;
501 {
502 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
503 if (fThreadPoolName_) {
504 sb << "pool-name: '{}'"_f(*fThreadPoolName_);
505 }
506 }
507 if (fThreadPoolName_) {
508 sb << ", "sv;
509 }
510 sb << "pending-task-count: {}"_f(GetPendingTasksCount ());
511 sb << ", running-task-count: {}"_f(GetRunningTasks ().size ());
512 {
513 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
514 sb << ", pool-thread-count: {}"_f(fThreads_.size ());
515 }
516 sb << "}"sv;
517 return sb;
518}
519
520// THIS is called NOT from 'this' - but from the context of an OWNED thread of the pool
521void ThreadPool::WaitForNextTask_ (TaskType* result, optional<Characters::String>* resultName)
522{
523 RequireNotNull (result);
524 RequireNotNull (resultName);
525 Require (*result == nullptr);
526 while (true) {
527 if (fAborted_) [[unlikely]] {
528 Throw (Thread::AbortException::kThe);
529 }
530
531 {
532 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
533 if (not fPendingTasks_.empty ()) {
534 *result = fPendingTasks_.front ().fTask;
535 *resultName = fPendingTasks_.front ().fName;
536 fPendingTasks_.pop_front ();
537 DbgTrace ("ThreadPool::WaitForNextTask_ () pulled a new task from 'pending-tasks' to run on this thread, leaving pending-task-list-size = {}"_f,
538 fPendingTasks_.size ());
539 Ensure (*result != nullptr);
540 return;
541 }
542 }
543
544 // Prevent spinwaiting... This event is SET when any new item arrives
545 //DbgTrace ("ThreadPool::WaitForNextTask_ () - about to wait for added tasks");
546 fTasksMaybeAdded_.WaitAndReset ();
547 //DbgTrace ("ThreadPool::WaitForNextTask_ () - completed wait for added tasks");
548 }
549}
550
551ThreadPool::TPInfo_ ThreadPool::mkThread_ ()
552{
553 shared_ptr<MyRunnable_> r{MakeSharedPtr<ThreadPool::MyRunnable_> (*this)};
554 StringBuilder entryName = "TPE #{}"_f(fNextThreadEntryNumber_++); // make name so short cuz unix only shows first 15 chars - http://man7.org/linux/man-pages/man3/pthread_setname_np.3.html
555 entryName += " {"sv + fThreadPoolName_.value_or ("anonymous-thread-pool"sv) + "}"sv;
556 Thread::Ptr t = Thread::New ([r] () { r->Run (); }, Thread::eAutoStart, entryName); // race condition for updating this number, but who cares - its purely cosmetic...
557 return TPInfo_{t, r};
558}
#define AssertNotNull(p)
Definition Assertions.h:333
#define qStroika_Foundation_Debug_AssertionsChecked
The qStroika_Foundation_Debug_AssertionsChecked flag determines if assertions are checked and validat...
Definition Assertions.h:48
#define RequireNotNull(p)
Definition Assertions.h:347
#define AssertNotReached()
Definition Assertions.h:355
Results< FLOAT_TYPE > Run(const TargetFunction< FLOAT_TYPE > &function2Minimize, const Sequence< FLOAT_TYPE > &initialValues, const Options< FLOAT_TYPE > &options=Options< FLOAT_TYPE >{})
Downhill Simplex Minimization, AKA Nelder-Mead algorithm, to compute minimization.
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
#define DbgTrace
Definition Trace.h:309
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Definition Trace.h:270
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
Definition String.h:201
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
nonvirtual void Update(const Iterator< value_type > &i, ArgByValueType< value_type > newValue, Iterator< value_type > *nextI=nullptr)
nonvirtual void Remove(ArgByValueType< value_type > item, EQUALS_COMPARER &&equalsComparer={})
Remove () the argument value (which must exist)
nonvirtual void Add(ArgByValueType< value_type > item)
nonvirtual void RemoveAll()
RemoveAll removes all, or all matching (predicate, iterator range, equals comparer or whatever) items...
Thread::Ptr is a (unsynchronized) smart pointer referencing an internally synchronized std::thread ob...
Definition Thread.h:334
nonvirtual void AbortAndWaitForDone(Time::DurationSeconds timeout=Time::kInfinity) const
Abort () the thread, and then WaitForDone () - but if doesn't finish fast enough, send extra aborts (...
Definition Thread.inl:291
nonvirtual size_t GetPendingTasksCount() const
return GetPendingTasks ().size (), except probably much more efficient
nonvirtual bool IsRunning(const TaskType &task) const
nonvirtual Containers::Collection< TaskType > GetRunningTasks() const
nonvirtual void AbortTasks(Time::DurationSeconds timeout=Time::kInfinity)
nonvirtual Containers::Collection< TaskInfo > GetTasks() const
returns GetPendingTasks () + GetRunningTasks () - but also some extra information about each task
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual void AbortTask(const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity)
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual size_t GetTasksCount() const
return total number of tasks, either pending, or currently running.
nonvirtual Characters::String ToString() const
nonvirtual bool IsPresent(const TaskType &task) const
nonvirtual void WaitForTask(const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity) const
nonvirtual void SetPoolSize(unsigned int poolSize)
nonvirtual void WaitForTasksDoneUntil(const Traversal::Iterable< TaskType > &tasks, Time::TimePointSeconds timeoutAt) const
nonvirtual Containers::Collection< TaskType > GetPendingTasks() const
nonvirtual unsigned int GetPoolSize() const
nonvirtual void WaitAndReset(Time::Duration timeout=Time::kInfinity)
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Definition Iterable.h:237
nonvirtual void Apply(const function< void(ArgByValueType< T > item)> &doToElement, Execution::SequencePolicy seq=Execution::SequencePolicy::eDEFAULT) const
Run the argument function (or lambda) on each element of the container.
nonvirtual size_t size() const
Returns the number of items contained.
Definition Iterable.inl:303
nonvirtual Iterator< T > begin() const
Support for ranged for, and STL syntax in general.
nonvirtual bool empty() const
Returns true iff size() == 0.
Definition Iterable.inl:309
static constexpr default_sentinel_t end() noexcept
Support for ranged for, and STL syntax in general.
An Iterator<T> is a copyable object which allows traversing the contents of some container....
Definition Iterator.h:225
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
Definition Thread.cpp:960
void AbortAndWaitForDone(const Traversal::Iterable< Ptr > &threads, Time::DurationSeconds timeout=Time::kInfinity)
shorthand for AbortAndWaitForDoneUntil (Time::GetTickCount () + timeout)
Definition Thread.inl:343
void Sleep(Time::Duration seconds2Wait)
Definition Sleep.inl:97
void ThrowTimeoutExceptionAfter(Time::TimePointSeconds afterTickCount, EXCEPTION &&exception2Throw)
Throw TimeOutException if the @Time::GetTickCount () is >= the given value.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
Definition Throw.inl:43
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
Definition Finally.inl:31
nonvirtual Characters::String ToString() const