Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
BlockingQueue.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4
5#include "Stroika/Foundation/Execution/TimeOutException.h"
7
9
10 /*
11 ********************************************************************************
12 ******************************** BlockingQueue<T> ******************************
13 ********************************************************************************
14 */
15 template <typename T>
16 BlockingQueue<T>::BlockingQueue (const Containers::Queue<T>& useQueue)
17 : fQueue_{useQueue}
18 {
19 Require (useQueue.empty ()); // this constructor is only used to control the 'type' (data structure/backend) used by the Blocking Queue
20 }
21 template <typename T>
22 inline void BlockingQueue<T>::AddTail (const T& e, Time::DurationSeconds /*timeout*/)
23 {
24 // Our locks are short-lived, so its safe to ignore the timeout - this will always be fast
25 //
26 // note also: this must be NotifyAll, not NotifyOne () - because we could wake a useless, ineffective thread, e.g. https://stackoverflow.com/questions/13774802/notify-instead-of-notifyall-for-blocking-queue
27 fConditionVariable_.MutateDataNotifyAll ([&, this] () {
28 Require (not fEndOfInput_);
29 fQueue_.AddTail (e);
30 });
31 }
32 template <typename T>
34 {
35 fConditionVariable_.MutateDataNotifyAll ([this] () { fEndOfInput_ = true; });
36 }
37 template <typename T>
39 {
40 // lock may not always be strictly needed, but could report stale (cross thread) value without lock
41 typename ConditionVariable<>::QuickLockType critSection{fConditionVariable_.fMutex};
42 return fEndOfInput_;
43 }
44 template <typename T>
45 inline bool BlockingQueue<T>::QAtEOF () const
46 {
47 typename ConditionVariable<>::QuickLockType critSection{fConditionVariable_.fMutex};
48 return fEndOfInput_ and fQueue_.empty ();
49 }
50 template <typename T>
52 {
53 Time::TimePointSeconds waitTil = Time::GetTickCount () + timeout;
54 while (true) {
55 typename ConditionVariable<>::LockType waitableLock{fConditionVariable_.fMutex}; // despite appearances to the contrary, not holding lock lock cuz condition variable unlocks before waiting
56 if (optional<T> tmp = fQueue_.RemoveHeadIf ()) {
57 // Only notify_all() on additions, cuz waiters just looking for more data
58 return *tmp;
59 }
60 if (fEndOfInput_) [[unlikely]] {
61 Execution::Throw (Streams::EOFException::kThe); // Since we always must return, and know we never will, throw timeout now
62 }
64 (void)fConditionVariable_.wait_until (waitableLock, Time::Pin2SafeSeconds (waitTil),
65 [this] () { return fEndOfInput_ or not fQueue_.empty (); });
66 }
67 }
68 template <typename T>
70 {
71 Time::TimePointSeconds waitTil = Time::GetTickCount () + timeout;
72 while (true) {
73 typename ConditionVariable<>::LockType waitableLock{fConditionVariable_.fMutex}; // despite appearances to the contrary, not holding lock lock cuz condition variable unlocks before waiting
74 if (optional<T> tmp = fQueue_.RemoveHeadIf ()) {
75 return tmp;
76 }
77 if (fEndOfInput_) {
78 return nullopt; // on end of input, no point in waiting
79 }
80 if (Time::GetTickCount () > waitTil) {
81 return nullopt; // on timeout, return 'missing'
82 }
83 (void)fConditionVariable_.wait_until (waitableLock, Time::Pin2SafeSeconds (waitTil),
84 [this] () { return fEndOfInput_ or not fQueue_.empty (); });
85 }
86 }
87 template <typename T>
88 inline optional<T> BlockingQueue<T>::PeekHead () const
89 {
90 typename ConditionVariable<>::QuickLockType critSection{fConditionVariable_.fMutex};
91 return fQueue_.HeadIf ();
92 }
93 template <typename T>
94 inline bool BlockingQueue<T>::empty () const
95 {
96 typename ConditionVariable<>::QuickLockType critSection{fConditionVariable_.fMutex};
97 return fQueue_.empty ();
98 }
99 template <typename T>
100 inline size_t BlockingQueue<T>::size () const
101 {
102 typename ConditionVariable<>::QuickLockType critSection{fConditionVariable_.fMutex};
103 return fQueue_.size ();
104 }
105 template <typename T>
106 inline size_t BlockingQueue<T>::length () const
107 {
108 return size ();
109 }
110 template <typename T>
113 typename ConditionVariable<>::QuickLockType critSection{fConditionVariable_.fMutex};
114 return fQueue_;
115 }
116 template <typename T>
117 inline void BlockingQueue<T>::clear ()
118 {
119 typename ConditionVariable<>::LockType waitableLock{fConditionVariable_.fMutex};
120 fQueue_.clear ();
121 if (fEndOfInput_) {
122 fConditionVariable_.release_and_notify_all (waitableLock); // cuz readers could be waiting and need to know no more
123 }
124 }
125
126}
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
A Queue is a first-in-first-out (FIFO) data structure, where elements are arranged in well-ordered fa...
Definition Queue.h:95
void ThrowTimeoutExceptionAfter(Time::TimePointSeconds afterTickCount, EXCEPTION &&exception2Throw)
Throw TimeOutException if the @Time::GetTickCount () is >= the given value.