4#include "Stroika/Foundation/StroikaPreComp.h"
9#include "Stroika/Foundation/Debug/Main.h"
11#include "Stroika/Foundation/Execution/Common.h"
14#include "Stroika/Foundation/Execution/TimeOutException.h"
24using Memory::MakeSharedPtr;
33class ThreadPool::MyRunnable_ {
36 : fThreadPool{threadPool}
41 tuple<TaskType, Time::TimePointSeconds, optional<String>> GetCurTaskInfo ()
const
44 return make_tuple (fCurTask, fCurTaskStartedAt, fCurName);
48 nonvirtual
void Run ()
56 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
57 Assert (fCurTask ==
nullptr);
61 fThreadPool.WaitForNextTask_ (&fCurTask, &fCurName);
63 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
64 Assert (fCurTask !=
nullptr);
67 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept {
70 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
72 taskStartedAt = fCurTaskStartedAt;
74 if (fThreadPool.fCollectStatistics_) {
75 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
76 ++fThreadPool.fCollectedTaskStats_.fNumberOfTasksCompleted;
77 ++fThreadPool.fCollectedTaskStats_.fNumberOfTasksReporting;
78 fThreadPool.fCollectedTaskStats_.fTotalTimeConsumed += Time::GetTickCount () - taskStartedAt;
85 [[maybe_unused]] lock_guard critSec{fThreadPool.fCriticalSection_};
87 fCurTaskStartedAt = Time::GetTickCount ();
91 catch (
const Thread::AbortException&) {
96 if constexpr (kEmitDbgTraceOnThreadPoolEntryExceptions_ and qStroika_Foundation_Debug_DefaultTracingOn) {
97 DbgTrace (
"in threadpool, ignoring exception running task: {}"_f, current_exception ());
108 optional<Characters::String> fCurName;
120 sb <<
"NumberOfTasksAdded: "sv << fNumberOfTasksAdded;
122 sb <<
", TotalTimeConsumed: "sv << fTotalTimeConsumed;
133 : fCollectStatistics_{options.fCollectStatistics}
134 , fDefaultQMax_{options.fQMax}
135 , fThreadPoolName_{options.fThreadPoolName}
137 Require (Debug::AppearsDuringMainLifetime ());
143 Require (Debug::AppearsDuringMainLifetime ());
144 AbortAndWaitForDone_ ();
149 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
150 return static_cast<unsigned int> (fThreads_.
size ());
156 Require (not fAborted_);
157 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
159 while (poolSize > fThreads_.
size ()) {
160 fThreads_.
Add (mkThread_ ());
164 while (poolSize < fThreads_.
size ()) {
166 bool anyFoundToKill =
false;
168 TaskType ct{i->fRunnable->fCurTask};
172 anyFoundToKill =
true;
176 if (not anyFoundToKill) {
178 DbgTrace (
"Failed to lower the loop size - cuz all threads busy - giving up"_f);
189#if qStroika_Foundation_Debug_AssertionsChecked
190 bool blockedAtLeastOnce =
false;
194#if qStroika_Foundation_Debug_AssertionsChecked
195 if (not blockedAtLeastOnce) {
196 DbgTrace (
"Blocking inside ThreadPool::AddTask due to excessive pending task count"_f);
197 blockedAtLeastOnce =
true;
204 return AddTask_ (task, name);
209auto ThreadPool::AddTask_ (
const TaskType& task,
const optional<Characters::String>& name) ->
TaskType
211#if USE_NOISY_TRACE_IN_THIS_MODULE_
214 Require (not fAborted_);
216 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
217 fPendingTasks_.push_back (PendingTaskInfo_{.fTask = task, .fName = name});
218#if USE_NOISY_TRACE_IN_THIS_MODULE_
219 DbgTrace (
"fPendingTasks.size () now = {}"_f, (
int)fPendingTasks_.size ());
221 ++fCollectedTaskStats_.fNumberOfTasksAdded;
225 fTasksMaybeAdded_.
Set ();
227 Require (not fAborted_);
236 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
237 for (
auto i = fPendingTasks_.begin (); i != fPendingTasks_.end (); ++i) {
238 if (i->fTask == task) {
239 fPendingTasks_.erase (i);
258 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
260 TaskType ct{i->fRunnable->fCurTask};
262 thread2Kill = i->fThread;
263 fThreads_.
Update (i, mkThread_ ());
268 if (thread2Kill !=
nullptr) {
278 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
279 fPendingTasks_.clear ();
280 for (
const TPInfo_& ti : fThreads_) {
286 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
287 for (
const TPInfo_& ti : fThreads_) {
289 ti.fThread.AbortAndWaitForDone (timeout);
299 Require (task !=
nullptr);
302 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
303 for (
auto i = fPendingTasks_.begin (); i != fPendingTasks_.end (); ++i) {
304 if (i->fTask == task) {
309 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
310 TaskType rTask{i->fRunnable->fCurTask};
321 Require (task !=
nullptr);
322 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
323 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
324 if (task == i->fRunnable->fCurTask) {
336 TimePointSeconds timeoutAt = timeout + Time::GetTickCount ();
350 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
351 for (
const auto& ti : fPendingTasks_) {
352 result.
Add (TaskInfo{.fTask = ti.fTask, .fName = ti.fName});
354 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
355 tuple<TaskType, Time::TimePointSeconds, optional<String>> curTaskInfo = i->fRunnable->GetCurTaskInfo ();
356 if (get<TaskType> (curTaskInfo) !=
nullptr) {
357 result.
Add (TaskInfo{
358 .fTask = get<TaskType> (curTaskInfo),
359 .fName = get<optional<String>> (curTaskInfo),
360 .fRunningSince = get<Time::TimePointSeconds> (curTaskInfo),
370 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
371 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
372 TaskType task{i->fRunnable->fCurTask};
373 if (task !=
nullptr) {
383 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
384 size_t count = fPendingTasks_.size ();
385 for (
auto i = fThreads_.
begin (); i != fThreads_.
end (); ++i) {
387 if (i->fRunnable->fCurTask !=
nullptr) {
397 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
398 for (
const auto& i : fPendingTasks_) {
399 result.
Add (i.fTask);
406 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
407 return fPendingTasks_.size ();
412#if USE_NOISY_TRACE_IN_THIS_MODULE_
414 "ThreadPool::WaitForTasksDoneUntil",
"*this={}, tasks={}, timeoutAt={}"_f,
ToString (),
ToString (tasks), timeoutAt)};
417 for (
const auto& task : tasks) {
418 auto now = Time::GetTickCount ();
426#if USE_NOISY_TRACE_IN_THIS_MODULE_
428 "*this={}, timeoutAt={}"_f,
ToString (), timeoutAt)};
440 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
441 Require (fCollectStatistics_);
442 fCollectedTaskStats_ = {};
447 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
448 Require (fCollectStatistics_);
449 return fCollectedTaskStats_;
452void ThreadPool::Abort_ () noexcept
461 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
462 fPendingTasks_.clear ();
463 for (
const TPInfo_& ti : fThreads_) {
469void ThreadPool::AbortAndWaitForDone_ () noexcept
472#if USE_NOISY_TRACE_IN_THIS_MODULE_
480 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
481 fThreads_.
Apply ([&] (
const TPInfo_& i) { threadsToShutdown.
Add (i.fThread); });
485 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
486 fThreads_.
Apply ([&] (
const TPInfo_& i) { Assert (i.fThread.IsDone ()); });
491 DbgTrace (
"ThreadPool::AbortAndWaitForDone_: serious bug/exception"_f);
494 Ensure (fThreads_.
empty ());
502 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
503 if (fThreadPoolName_) {
504 sb <<
"pool-name: '{}'"_f(*fThreadPoolName_);
507 if (fThreadPoolName_) {
513 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
514 sb <<
", pool-thread-count: {}"_f(fThreads_.
size ());
521void ThreadPool::WaitForNextTask_ (
TaskType* result, optional<Characters::String>* resultName)
525 Require (*result ==
nullptr);
527 if (fAborted_) [[unlikely]] {
528 Throw (Thread::AbortException::kThe);
532 [[maybe_unused]] lock_guard critSec{fCriticalSection_};
533 if (not fPendingTasks_.empty ()) {
534 *result = fPendingTasks_.front ().fTask;
535 *resultName = fPendingTasks_.front ().fName;
536 fPendingTasks_.pop_front ();
537 DbgTrace (
"ThreadPool::WaitForNextTask_ () pulled a new task from 'pending-tasks' to run on this thread, leaving pending-task-list-size = {}"_f,
538 fPendingTasks_.size ());
539 Ensure (*result !=
nullptr);
551ThreadPool::TPInfo_ ThreadPool::mkThread_ ()
553 shared_ptr<MyRunnable_> r{MakeSharedPtr<ThreadPool::MyRunnable_> (*
this)};
554 StringBuilder entryName =
"TPE #{}"_f(fNextThreadEntryNumber_++);
555 entryName +=
" {"sv + fThreadPoolName_.value_or (
"anonymous-thread-pool"sv) +
"}"sv;
557 return TPInfo_{t, r};
#define qStroika_Foundation_Debug_AssertionsChecked
The qStroika_Foundation_Debug_AssertionsChecked flag determines if assertions are checked and validat...
#define RequireNotNull(p)
#define AssertNotReached()
Results< FLOAT_TYPE > Run(const TargetFunction< FLOAT_TYPE > &function2Minimize, const Sequence< FLOAT_TYPE > &initialValues, const Options< FLOAT_TYPE > &options=Options< FLOAT_TYPE >{})
Downhill Simplex Minimization, AKA Nelder-Mead algorithm, to compute minimization.
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
nonvirtual void Update(const Iterator< value_type > &i, ArgByValueType< value_type > newValue, Iterator< value_type > *nextI=nullptr)
nonvirtual void Remove(ArgByValueType< value_type > item, EQUALS_COMPARER &&equalsComparer={})
Remove () the argument value (which must exist)
nonvirtual void Add(ArgByValueType< value_type > item)
nonvirtual void RemoveAll()
RemoveAll removes all, or all matching (predicate, iterator range, equals comparer or whatever) items...
Thread::Ptr is a (unsynchronized) smart pointer referencing an internally synchronized std::thread ob...
nonvirtual void AbortAndWaitForDone(Time::DurationSeconds timeout=Time::kInfinity) const
Abort () the thread, and then WaitForDone () - but if doesn't finish fast enough, send extra aborts (...
nonvirtual size_t GetPendingTasksCount() const
return GetPendingTasks ().size (), except probably much more efficient
nonvirtual bool IsRunning(const TaskType &task) const
nonvirtual Containers::Collection< TaskType > GetRunningTasks() const
nonvirtual void AbortTasks(Time::DurationSeconds timeout=Time::kInfinity)
nonvirtual Containers::Collection< TaskInfo > GetTasks() const
returns GetPendingTasks () + GetRunningTasks () - but also some extra information about each task
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual void AbortTask(const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity)
Function< void()> TaskType
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual size_t GetTasksCount() const
return total number of tasks, either pending, or currently running.
nonvirtual Characters::String ToString() const
nonvirtual bool IsPresent(const TaskType &task) const
nonvirtual void WaitForTask(const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity) const
nonvirtual void ResetStatistics()
nonvirtual void SetPoolSize(unsigned int poolSize)
nonvirtual void WaitForTasksDoneUntil(const Traversal::Iterable< TaskType > &tasks, Time::TimePointSeconds timeoutAt) const
nonvirtual Containers::Collection< TaskType > GetPendingTasks() const
nonvirtual unsigned int GetPoolSize() const
nonvirtual void WaitAndReset(Time::Duration timeout=Time::kInfinity)
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
nonvirtual void Apply(const function< void(ArgByValueType< T > item)> &doToElement, Execution::SequencePolicy seq=Execution::SequencePolicy::eDEFAULT) const
Run the argument function (or lambda) on each element of the container.
nonvirtual size_t size() const
Returns the number of items contained.
nonvirtual Iterator< T > begin() const
Support for ranged for, and STL syntax in general.
nonvirtual bool empty() const
Returns true iff size() == 0.
static constexpr default_sentinel_t end() noexcept
Support for ranged for, and STL syntax in general.
An Iterator<T> is a copyable object which allows traversing the contents of some container....
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
void CheckForInterruption()
void AbortAndWaitForDone(const Traversal::Iterable< Ptr > &threads, Time::DurationSeconds timeout=Time::kInfinity)
shorthand for AbortAndWaitForDoneUntil (Time::GetTickCount () + timeout)
void Sleep(Time::Duration seconds2Wait)
void ThrowTimeoutExceptionAfter(Time::TimePointSeconds afterTickCount, EXCEPTION &&exception2Throw)
Throw TimeOutException if the @Time::GetTickCount () is >= the given value.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
unsigned int fNumberOfTasksCompleted
nonvirtual Characters::String ToString() const