Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
Stroika::Foundation::Execution::ThreadPool Class Reference

#include <ThreadPool.h>

Classes

struct  QMax
 
struct  Statistics
 

Public Types

using TaskType = Function< void()>
 

Public Member Functions

 ThreadPool ()
 
 ~ThreadPool ()
 
nonvirtual Options GetOptions () const
 
nonvirtual unsigned int GetPoolSize () const
 
nonvirtual void SetPoolSize (unsigned int poolSize)
 
nonvirtual TaskType AddTask (const TaskType &task, const optional< Characters::String > &name=nullopt)
 
nonvirtual void AbortTask (const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity)
 
nonvirtual void AbortTasks (Time::DurationSeconds timeout=Time::kInfinity)
 
nonvirtual bool IsPresent (const TaskType &task) const
 
nonvirtual bool IsRunning (const TaskType &task) const
 
nonvirtual void WaitForTask (const TaskType &task, Time::DurationSeconds timeout=Time::kInfinity) const
 
nonvirtual Containers::Collection< TaskInfo > GetTasks () const
 returns GetPendingTasks () + GetRunningTasks () - but also some extra information about each task
 
nonvirtual Containers::Collection< TaskTypeGetPendingTasks () const
 
nonvirtual Containers::Collection< TaskTypeGetRunningTasks () const
 
nonvirtual size_t GetTasksCount () const
 return total number of tasks, either pending, or currently running.
 
nonvirtual size_t GetPendingTasksCount () const
 return GetPendingTasks ().size (), except probably much more efficient
 
nonvirtual void WaitForTasksDone (const Traversal::Iterable< TaskType > &tasks, Time::DurationSeconds timeout=Time::kInfinity) const
 
nonvirtual void WaitForTasksDoneUntil (const Traversal::Iterable< TaskType > &tasks, Time::TimePointSeconds timeoutAt) const
 
nonvirtual void ResetStatistics ()
 
nonvirtual Statistics GetCurrentStatistics () const
 
nonvirtual Characters::String ToString () const
 

Detailed Description

The ThreadPool class creates a small fixed number of Thread objects, and lets you use them as if there were many more. You submit a task (representable as a comparable std::function -

See also
Function) - and it gets eventually executed.

If as Task in the thread pool raises an exception - this will be IGNORED (except for the special case of Thread::AbortException which is used internally to end the threadpool or remove some threads). Because of this, your submitted runnables should take care of their own error handling internally.

ThreadPool mainly useful for servicing lost-cost calls/threads, where the overhead of constructing the thread is significant compared to the cost of performing the action, and where the priority & stacksize can all be predetermined and 'shared'. Also - where you want to CONTROL the level of thread creation (possibly to avoid DOS attacks or just accidental overloading).

Precondition
Debug::AppearsDuringMainLifetime (); during the lifetime of the ThreadPool
Note
Thread-Safety Internally-Synchronized-Thread-Safety all methods can be freely used from any thread, and they will block internally as needed.

Definition at line 72 of file ThreadPool.h.

Member Typedef Documentation

◆ TaskType

Note
It is important (required) that all tasks added to a ThreadPool respond in a timely manner to Thread Abort. ThreadPool counts on that for clean shutdown.

This means periodically calling CheckForInterruption () and that any waits respect thread cancelation (stop_token).

Tasks may exit via exception, but nothing will be done with that exception (beyond DbgTrace logging). So generally not a good idea, except for ThreadAbort handling.

Definition at line 156 of file ThreadPool.h.

Constructor & Destructor Documentation

◆ ThreadPool()

Stroika::Foundation::Execution::ThreadPool::ThreadPool ( )
Example Usage
ThreadPool p{ThreadPool::Options{.fThreadCount = 3}};
p.AddTask ([&q, &counter] () {
..dostuff..
});
// when goes out of scope automatically blocks waiting for threads to complete...
// or call p.WaitForTasksDoneUntil ()
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)

Definition at line 13 of file ThreadPool.inl.

◆ ~ThreadPool()

ThreadPool::~ThreadPool ( )

Destroying a threadpool implicitly calls AbortAndWaitForDone () and eats any errors (cannot rethrow).

Note
- ThreadPool used to have explicit abort methods, but there was no point. When aborting, you must wait for them all to shut down to destroy the object. And since you cannot restart the object, there is no point in ever aborting without destroying. so KISS - just destroy the ThreadPool object.

Definition at line 139 of file ThreadPool.cpp.

Member Function Documentation

◆ GetOptions()

auto Stroika::Foundation::Execution::ThreadPool::GetOptions ( ) const

These options have have been modified by various APIs, and reflect the current state of options, not necessarily those that the ThreadPool was created with.

Definition at line 17 of file ThreadPool.inl.

◆ GetPoolSize()

unsigned int ThreadPool::GetPoolSize ( ) const

This returns the number of threads in the pool (not the number of tasks). Note 0 is a legal size.

Definition at line 145 of file ThreadPool.cpp.

◆ SetPoolSize()

void ThreadPool::SetPoolSize ( unsigned int  poolSize)

SetPoolSize () is advisory. It attempts to add or remove entries as requested. Note - 0 is a legal size.

But under some circumstances, it will fail. For example, if tasks are busy running on all threads, the number of threads in the pool cannot be decreased.

Definition at line 151 of file ThreadPool.cpp.

◆ AddTask()

ThreadPool::TaskType Stroika::Foundation::Execution::ThreadPool::AddTask ( const TaskType task,
const optional< Characters::String > &  name = nullopt 
)

Push the given task into the queue (possibly blocking til space in the Q or throwing if no space in Q, but does NOT block waiting for Q to till).

Example Usage
p.AddTask ([] () {doIt ();});

if qmax is provided, it takes precedence over any default value associated with the ThreadPool (constructor). If neither provided (as an argument or associated with the pool, this is treated as no max, and the addition just proceeds.

If qMax provided (even indirectly), assure task q length doesn't exceed argument by waiting up to the qMax duration, and either timing out, or successfully add the task.

Note
Design Note: The reason this returns as TaskType is that its easy to convert a lambda or whatever into a TaskType, but if you do it multiple times you get different (!=) values. So to make the auto conversion work easier without needing to first create a variable, and then do the add task, you can just do them together. And it avoids mistakes like: function<void()> f = ...; p.AddTask(f); p.RemoveTask (p); // fails cuz different 'TaskType' added - f converted to TaskType twice!

Definition at line 24 of file ThreadPool.inl.

◆ AbortTask()

void ThreadPool::AbortTask ( const TaskType task,
Time::DurationSeconds  timeout = Time::kInfinity 
)

It is NOT an error to call this with a task that is not in the Queue (since it would be a race to try to find out if it was already executed.

It can cancel a task if it has not yet been started, or EVEN if its already in progress (see Thread::Abort - it sends abort signal)

The function doesn't return until the task has been successfully cancelled, or it throws if timeout.

Definition at line 229 of file ThreadPool.cpp.

◆ AbortTasks()

void ThreadPool::AbortTasks ( Time::DurationSeconds  timeout = Time::kInfinity)

See AbortTask () - it aborts all tasks - if any.

Definition at line 271 of file ThreadPool.cpp.

◆ IsPresent()

bool ThreadPool::IsPresent ( const TaskType task) const

returns true if queued OR actively running.

Precondition
task != nullptr

Definition at line 295 of file ThreadPool.cpp.

◆ IsRunning()

bool ThreadPool::IsRunning ( const TaskType task) const

returns true actively running

Precondition
task != nullptr

Definition at line 317 of file ThreadPool.cpp.

◆ WaitForTask()

void ThreadPool::WaitForTask ( const TaskType task,
Time::DurationSeconds  timeout = Time::kInfinity 
) const

throws if timeout. Returns when task has completed (or if not in task q)

Precondition
task != nullptr

Definition at line 329 of file ThreadPool.cpp.

◆ GetPendingTasks()

auto ThreadPool::GetPendingTasks ( ) const

return all tasks which are queued, but haven't yet been assigned to a thread.

Definition at line 392 of file ThreadPool.cpp.

◆ GetRunningTasks()

auto ThreadPool::GetRunningTasks ( ) const

return all tasks which are currently running (assigned to some thread in the thread pool).

Note
- this is a snapshot in time of something which is often rapidly changing, so by the time you look at it, it may have changed (but since we use shared_ptrs, its always safe to look at).

Definition at line 365 of file ThreadPool.cpp.

◆ GetTasksCount()

size_t ThreadPool::GetTasksCount ( ) const

return total number of tasks, either pending, or currently running.

This is GetRunningTasks().size () + GetPendingTasks ().size (), or alternatively GetTasks.size (), but more efficient.

Definition at line 378 of file ThreadPool.cpp.

◆ WaitForTasksDone()

void Stroika::Foundation::Execution::ThreadPool::WaitForTasksDone ( const Traversal::Iterable< TaskType > &  tasks,
Time::DurationSeconds  timeout = Time::kInfinity 
) const

Wait for the given amount of time for all (either given argument or all tasks in this thread pool) to be done.

When called with a specific set of tasks, this procedure waits for exactly those tasks. When called with no task argument, it waits until GetTaskCount () == 0.

Note
For the all-tasks overload, if new tasks are added to the thread pool, those are waited for too. But - its perfectly legal to add new tasks after this returns, so the task count could increase (if tasks were added) after this returns without exception (obviously there are likely more tasks if it returns with a timeout exception).
Cancelation Point

Definition at line 33 of file ThreadPool.inl.

◆ WaitForTasksDoneUntil()

void ThreadPool::WaitForTasksDoneUntil ( const Traversal::Iterable< TaskType > &  tasks,
Time::TimePointSeconds  timeoutAt 
) const

Wait for the given amount of time for all (either given argument or all tasks in this thread pool) to be done.

When called with a specific set of tasks, this procedure waits for exactly those tasks. When called with no task argument, it waits until GetTaskCount () == 0.

Note
For the all-tasks overload, if new tasks are added to the thread pool, those are waited for too. But - its perfectly legal to add new tasks after this returns, so the task count could increase (if tasks were added) after this returns without exception (obviously there are likely more tasks if it returns with a timeout exception).
Cancelation Point

Definition at line 408 of file ThreadPool.cpp.

◆ ResetStatistics()

void ThreadPool::ResetStatistics ( )
Precondition
(GetOptions ().fCollectStatistics);

Definition at line 436 of file ThreadPool.cpp.

◆ GetCurrentStatistics()

auto ThreadPool::GetCurrentStatistics ( ) const
Precondition
(GetOptions ().fCollectStatistics);

Definition at line 443 of file ThreadPool.cpp.

◆ ToString()

String ThreadPool::ToString ( ) const

a helpful debug dump of the ThreadPool status

Definition at line 495 of file ThreadPool.cpp.


The documentation for this class was generated from the following files: