4#include "Stroika/Foundation/StroikaPreComp.h"
9#include "Stroika/Foundation/Debug/Main.h"
15#include "TimeOutException.h"
31class ThreadPool::MyRunnable_ {
34 : fThreadPool{threadPool}
39 tuple<TaskType, Time::TimePointSeconds, optional<String>> GetCurTaskInfo ()
const
42 return make_tuple (fCurTask, fCurTaskStartedAt, fCurName);
46 nonvirtual
void Run ()
54 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
55 Assert (fCurTask ==
nullptr);
59 fThreadPool.WaitForNextTask_ (&fCurTask, &fCurName);
61 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
62 Assert (fCurTask !=
nullptr);
65 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept {
68 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
70 taskStartedAt = fCurTaskStartedAt;
72 if (fThreadPool.fCollectStatistics_) {
73 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
74 ++fThreadPool.fCollectedTaskStats_.fNumberOfTasksCompleted;
75 ++fThreadPool.fCollectedTaskStats_.fNumberOfTasksReporting;
76 fThreadPool.fCollectedTaskStats_.fTotalTimeConsumed += Time::GetTickCount () - taskStartedAt;
83 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
85 fCurTaskStartedAt = Time::GetTickCount ();
89 catch (
const Thread::AbortException&) {
94 if constexpr (kEmitDbgTraceOnThreadPoolEntryExceptions_ and qStroika_Foundation_Debug_DefaultTracingOn) {
95 DbgTrace (
"in threadpool, ignoring exception running task: {}"_f, current_exception ());
106 optional<Characters::String> fCurName;
118 sb <<
"NumberOfTasksAdded: "sv << fNumberOfTasksAdded;
120 sb <<
", TotalTimeConsumed: "sv << fTotalTimeConsumed;
131 : fCollectStatistics_{options.fCollectStatistics}
132 , fDefaultQMax_{options.fQMax}
133 , fThreadPoolName_{options.fThreadPoolName}
135 Require (Debug::AppearsDuringMainLifetime ());
141 Require (Debug::AppearsDuringMainLifetime ());
142 AbortAndWaitForDone_ ();
147 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
148 return static_cast<unsigned int> (fThreads_.
size ());
154 Require (not fAborted_);
155 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
157 while (poolSize > fThreads_.
size ()) {
158 fThreads_.
Add (mkThread_ ());
162 while (poolSize < fThreads_.
size ()) {
164 bool anyFoundToKill =
false;
166 TaskType ct{i->fRunnable->fCurTask};
170 anyFoundToKill =
true;
174 if (not anyFoundToKill) {
176 DbgTrace (
"Failed to lower the loop size - cuz all threads busy - giving up"_f);
187#if qStroika_Foundation_Debug_AssertionsChecked
188 bool blockedAtLeastOnce =
false;
192#if qStroika_Foundation_Debug_AssertionsChecked
193 if (not blockedAtLeastOnce) {
194 DbgTrace (
"Blocking inside ThreadPool::AddTask due to excessive pending task count"_f);
195 blockedAtLeastOnce =
true;
202 return AddTask_ (task, name);
207auto ThreadPool::AddTask_ (
const TaskType& task,
const optional<Characters::String>& name) ->
TaskType
209#if USE_NOISY_TRACE_IN_THIS_MODULE_
212 Require (not fAborted_);
214 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
215 fPendingTasks_.push_back (PendingTaskInfo_{.fTask = task, .fName = name});
216#if USE_NOISY_TRACE_IN_THIS_MODULE_
217 DbgTrace (
"fPendingTasks.size () now = {}"_f, (
int)fPendingTasks_.size ());
219 ++fCollectedTaskStats_.fNumberOfTasksAdded;
223 fTasksMaybeAdded_.
Set ();
225 Require (not fAborted_);
234 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
235 for (
auto i = fPendingTasks_.begin (); i != fPendingTasks_.end (); ++i) {
236 if (i->fTask == task) {
237 fPendingTasks_.erase (i);
256 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
258 TaskType ct{i->fRunnable->fCurTask};
260 thread2Kill = i->fThread;
261 fThreads_.
Update (i, mkThread_ ());
266 if (thread2Kill !=
nullptr) {
276 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
277 fPendingTasks_.clear ();
278 for (
const TPInfo_& ti : fThreads_) {
284 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
285 for (
const TPInfo_& ti : fThreads_) {
287 ti.fThread.AbortAndWaitForDone (timeout);
297 Require (task !=
nullptr);
300 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
301 for (
auto i = fPendingTasks_.begin (); i != fPendingTasks_.end (); ++i) {
302 if (i->fTask == task) {
307 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
308 TaskType rTask{i->fRunnable->fCurTask};
319 Require (task !=
nullptr);
320 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
321 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
322 if (task == i->fRunnable->fCurTask) {
334 TimePointSeconds timeoutAt = timeout + Time::GetTickCount ();
348 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
349 for (
const auto& ti : fPendingTasks_) {
350 result.
Add (TaskInfo{.fTask = ti.fTask, .fName = ti.fName});
352 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
353 tuple<TaskType, Time::TimePointSeconds, optional<String>> curTaskInfo = i->fRunnable->GetCurTaskInfo ();
354 if (get<TaskType> (curTaskInfo) !=
nullptr) {
355 result.
Add (TaskInfo{
356 .fTask = get<TaskType> (curTaskInfo),
357 .fName = get<optional<String>> (curTaskInfo),
358 .fRunningSince = get<Time::TimePointSeconds> (curTaskInfo),
368 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
369 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
370 TaskType task{i->fRunnable->fCurTask};
371 if (task !=
nullptr) {
381 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
382 size_t count = fPendingTasks_.size ();
383 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
385 if (i->fRunnable->fCurTask !=
nullptr) {
395 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
396 for (
const auto& i : fPendingTasks_) {
397 result.
Add (i.fTask);
404 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
405 return fPendingTasks_.size ();
410#if USE_NOISY_TRACE_IN_THIS_MODULE_
412 "ThreadPool::WaitForTasksDoneUntil",
"*this={}, tasks={}, timeoutAt={}"_f,
ToString (),
ToString (tasks), timeoutAt)};
415 for (
const auto& task : tasks) {
416 auto now = Time::GetTickCount ();
424#if USE_NOISY_TRACE_IN_THIS_MODULE_
426 "*this={}, timeoutAt={}"_f,
ToString (), timeoutAt)};
438 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
439 Require (fCollectStatistics_);
440 fCollectedTaskStats_ = {};
445 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
446 Require (fCollectStatistics_);
447 return fCollectedTaskStats_;
450void ThreadPool::Abort_ () noexcept
459 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
460 fPendingTasks_.clear ();
461 for (
const TPInfo_& ti : fThreads_) {
467void ThreadPool::AbortAndWaitForDone_ () noexcept
470#if USE_NOISY_TRACE_IN_THIS_MODULE_
478 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
479 fThreads_.
Apply ([&] (
const TPInfo_& i) { threadsToShutdown.
Add (i.fThread); });
483 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
484 fThreads_.
Apply ([&] (
const TPInfo_& i) { Assert (i.fThread.IsDone ()); });
489 DbgTrace (
"ThreadPool::AbortAndWaitForDone_: serious bug/exception"_f);
492 Ensure (fThreads_.
empty ());
500 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
501 if (fThreadPoolName_) {
502 sb <<
"pool-name: '{}'"_f(*fThreadPoolName_);
505 if (fThreadPoolName_) {
511 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
512 sb <<
", pool-thread-count: {}"_f(fThreads_.
size ());
519void ThreadPool::WaitForNextTask_ (
TaskType* result, optional<Characters::String>* resultName)
523 Require (*result ==
nullptr);
525 if (fAborted_) [[unlikely]] {
526 Throw (Thread::AbortException::kThe);
530 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
531 if (not fPendingTasks_.empty ()) {
532 *result = fPendingTasks_.front ().fTask;
533 *resultName = fPendingTasks_.front ().fName;
534 fPendingTasks_.pop_front ();
535 DbgTrace (
"ThreadPool::WaitForNextTask_ () pulled a new task from 'pending-tasks' to run on this thread, leaving pending-task-list-size = {}"_f,
536 fPendingTasks_.size ());
537 Ensure (*result !=
nullptr);
549ThreadPool::TPInfo_ ThreadPool::mkThread_ ()
551 shared_ptr<MyRunnable_> r{make_shared<ThreadPool::MyRunnable_> (*
this)};
552 StringBuilder entryName =
"TPE #{}"_f(fNextThreadEntryNumber_++);
553 entryName +=
" {"sv + fThreadPoolName_.value_or (
"anonymous-thread-pool"sv) +
"}"sv;
555 return TPInfo_{t, r};
#define qStroika_Foundation_Debug_AssertionsChecked
The qStroika_Foundation_Debug_AssertionsChecked flag determines if assertions are checked and validat...
#define RequireNotNull(p)
#define AssertNotReached()
Results< FLOAT_TYPE > Run(const TargetFunction< FLOAT_TYPE > &function2Minimize, const Sequence< FLOAT_TYPE > &initialValues, const Options< FLOAT_TYPE > &options=Options< FLOAT_TYPE >{})
Downhill Simplex Minimization, AKA Nelder-Mead algorithm, to compute minimization.
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
nonvirtual void Update(const Iterator< value_type > &i, ArgByValueType< value_type > newValue, Iterator< value_type > *nextI=nullptr)
nonvirtual void Remove(ArgByValueType< value_type > item, EQUALS_COMPARER &&equalsComparer={})
Remove () the argument value (which must exist)
nonvirtual void Add(ArgByValueType< value_type > item)
nonvirtual void RemoveAll()
RemoveAll removes all, or all matching (predicate, iterator range, equals comparer or whatever) items...
Thread::Ptr is a (unsynchronized) smart pointer referencing an internally synchronized std::thread ob...
nonvirtual void AbortAndWaitForDone(Time::DurationSeconds timeout=Time::kInfinity) const
Abort () the thread, and then WaitForDone () - but if doesn't finish fast enough, send extra aborts (...
nonvirtual size_t GetPendingTasksCount() const
return GetPendingTasks ().size (), except probably much more efficient
nonvirtual bool IsRunning(const TaskType &task) const
nonvirtual Containers::Collection< TaskType > GetRunningTasks() const
nonvirtual void AbortTasks(Time::DurationSeconds timeout=Time::kInfinity)
nonvirtual Containers::Collection< TaskInfo > GetTasks() const
returns GetPendingTasks () + GetRunningTasks () - but also some extra information about each task
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual void AbortTask(const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity)
Function< void()> TaskType
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual size_t GetTasksCount() const
return total number of tasks, either pending, or currently running.
nonvirtual Characters::String ToString() const
nonvirtual bool IsPresent(const TaskType &task) const
nonvirtual void WaitForTask(const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity) const
nonvirtual void ResetStatistics()
nonvirtual void SetPoolSize(unsigned int poolSize)
nonvirtual void WaitForTasksDoneUntil(const Traversal::Iterable< TaskType > &tasks, Time::TimePointSeconds timeoutAt) const
nonvirtual Containers::Collection< TaskType > GetPendingTasks() const
nonvirtual unsigned int GetPoolSize() const
nonvirtual void WaitAndReset(Time::Duration timeout=Time::kInfinity)
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
nonvirtual void Apply(const function< void(ArgByValueType< T > item)> &doToElement, Execution::SequencePolicy seq=Execution::SequencePolicy::eDEFAULT) const
Run the argument function (or lambda) on each element of the container.
nonvirtual size_t size() const
Returns the number of items contained.
nonvirtual Iterator< T > begin() const
Support for ranged for, and STL syntax in general.
nonvirtual bool empty() const
Returns true iff size() == 0.
static constexpr default_sentinel_t end() noexcept
Support for ranged for, and STL syntax in general.
An Iterator<T> is a copyable object which allows traversing the contents of some container....
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
void CheckForInterruption()
void AbortAndWaitForDone(const Traversal::Iterable< Ptr > &threads, Time::DurationSeconds timeout=Time::kInfinity)
shorthand for AbortAndWaitForDoneUntil (Time::GetTickCount () + timeout)
void Sleep(Time::Duration seconds2Wait)
void ThrowTimeoutExceptionAfter(Time::TimePointSeconds afterTickCount, EXCEPTION &&exception2Throw)
Throw TimeOutException if the @Time::GetTickCount () is >= the given value.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
unsigned int fNumberOfTasksCompleted
nonvirtual Characters::String ToString() const