Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
BlockingQueue.h
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#ifndef _Stroika_Foundation_Execution_BlockingQueue_h_
5#define _Stroika_Foundation_Execution_BlockingQueue_h_ 1
6
7#include "Stroika/Foundation/StroikaPreComp.h"
8
9#include "Stroika/Foundation/Common/Common.h"
12
13/*
14 * \note Code-Status: <a href="Code-Status.md#Beta">Beta</a>
15 *
16 * TODO:
17 * @todo Perhaps have PeekHead() take timeout=0 optional param?
18 *
19 * @todo Consider linking this to ThreadPools - so that instead of having a single
20 * thread running the Q, you have an entire threadpool. Maybe that's an attachable
21 * attribute of the Q?
22 *
23 */
24
26
27 /**
28 *
29 * SEE http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html
30 *
31 * Throws exception Special value Blocks Times out
32 * Insert add(e) offer(e) put(e) offer(e, time, unit)
33 * Remove remove() poll() take() poll(time, unit)
34 * Examine Front() peek() not applicable not applicable
35 *
36 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
37 *
38 * \par Example Usage (RegressionTest10_BlockingQueue_)
39 * \code
40 * enum { START = 0, END = 100 };
41 * int expectedValue = (START + END) * (END - START + 1) / 2;
42 * int counter = 0;
43 * BlockingQueue<function<void()>> q;
44 *
45 * Verify (q.size () == 0);
46 *
47 * Thread::Ptr producerThread = Thread::New (
48 * [&q, &counter] () {
49 * for (int incBy = START; incBy <= END; ++incBy) {
50 * q.AddTail ([&counter, incBy] () { counter += incBy; });
51 * }
52 * q.SignalEndOfInput ();
53 * },
54 * Thread::eAutoStart,
55 * "Producer"_k
56 * );
57 * Thread::Ptr consumerThread = Thread::New (
58 * [&q] () {
59 * while (true) {
60 * function<void()> f = q.RemoveHead ();
61 * f();
62 * }
63 * },
64 * Thread::eAutoStart,
65 * "Consumer"_k
66 * );
67 * Time::TimePointSeconds killAt = 10.0 + Time::GetTickCount ();
68 * while (counter != expectedValue and Time::GetTickCount () < killAt) {
69 * Execution::Sleep (500ms);
70 * }
71 * Verify (counter == expectedValue);
72 * producerThread.WaitForDone (); // producer already set to run off the end...
73 * consumerThread.WaitForDone (); // consumer will end due to exception reading from end
74 * \endcode
75 *
76 * \note Aliases
77 * This could easily be called EventQueue or MessageQueue, as its well suited to those sorts of uses.
78 */
79 template <typename T>
81 public:
82 /*
83 * Note - you may want to pass in a specific queue object, to require use of a particular concrete implementation
84 * for the Queue (such as one that doesn't allocate memory). But when constructing a blocking Q (even with another Q)
85 * the 'useQueue' must be empty.
86 */
87 BlockingQueue () = default;
88 BlockingQueue (const Containers::Queue<T>& useQueue);
89
90 public:
91 /**
92 * Blocks until item added, and throws if timeout exceeded. About the only way the
93 * throw can happen is if the Q is full (or timeout is very small).
94 *
95 * Typically this will return almost instantly.
96 *
97 * Analogous to the java BlockingQueue<T>::put(e) and similar to the java
98 * BlockingQueue<T>::offer() or BlockingQueue<T>::add () method.
99 *
100 * \note this is illegal to call (assertion error) if SignalEndOfInput () has been called on this BlockingQueue.
101 */
102 nonvirtual void AddTail (const T& e, Time::DurationSeconds timeout = Time::kInfinity);
103
104 public:
105 /**
106 * Cause any future calls to AddTail () to be illegal, and any pending (while BlockingQueue empty) calls to RemoveHead to not block but
107 * throw a timeout error (no matter the timeout provided).
108 *
109 * \note This doesn't delete the current entries in the blocking queue, so they will still get consumed. This just prevents
110 * the Q from blocking while its being emptied out. This is like adding the special value 'EOF' to the end of Q.
111 */
112 nonvirtual void SignalEndOfInput ();
113
114 public:
115 /**
116 * This returns true iff SignalEndOfInput () has been called. It does NOT imply the BlockingQueue is empty.
117 *
118 * This routine exist because there is no other non-blocking way to check (peek) at see if you are at end of input.
119 */
120 nonvirtual bool EndOfInputHasBeenQueued () const;
121
122 public:
123 /**
124 * This returns true iff SignalEndOfInput () has been called and the BlockingQueue is empty.
125 *
126 * \note Equivalent to EndOfInputHasBeenQueued () and empty ()
127 *
128 * \note Once this is true, it will always remain true.
129 *
130 * \note This function is non-blocking.
131 *
132 * \see also empty ()
133 */
134 nonvirtual bool QAtEOF () const;
135
136 public:
137 /**
138 * Blocks until item removed, and throws if timeout exceeded.
139 *
140 * If there are currently no items in the Q, this may wait indefinitely (up to timeout provided).
141 *
142 * If there are no available entries, and SignalEndOfInput () has been called, this will throw a Streams::EOFException
143 * no matter what the timeout value given.
144 *
145 * Similar to the java BlockingQueue<T>::take() or BlockingQueue<T>::poll (time) method.
146 *
147 * @see RemoveHeadIfPossible()
148 */
149 nonvirtual T RemoveHead (Time::DurationSeconds timeout = Time::kInfinity);
150
151 public:
152 /**
153 * Like RemoveHead() except that on timeout, returns empty optional<T> instead
154 * of throwing.
155 *
156 * If there is an entry at the head of the Q, return it immediately. Wait up til
157 * 'timeout' seconds for an entry to appear. Return 'missing' value if none appears.
158 *
159 * If timeout == 0 (the default) this amounts to peeking (but with remove), and never waits.
160 *
161 * Analogous to the java BlockingQueue<T>::poll () method.
162 */
163 nonvirtual optional<T> RemoveHeadIfPossible (Time::DurationSeconds timeout = 0s);
164
165 public:
166 /**
167 * Returns the front element from the Q, if there is one, and an empty optional<T> if
168 * there is none (without blocking).
169 *
170 * Analogous to the java BlockingQueue<T>::peek() method.
171 */
172 nonvirtual optional<T> PeekHead () const;
173
174 public:
175 /**
176 * Returns true if the Q contains no items. Equivalent to PeekHead ().empty ()
177 *
178 * \see also QAtEOF ()
179 */
180 nonvirtual bool empty () const;
181
182 public:
183 /**
184 * Returns the number of elements in the blocking queue (zero if empty).
185 */
186 nonvirtual size_t size () const;
187
188 public:
189 /**
190 * @aliases size()
191 */
192 nonvirtual size_t length () const;
193
194 public:
195 /**
196 * Get a copy of the entire owned Queue. NOTE - modifications of the returned copy have no effect on Queue associated
197 * with the BlockingQueue.
198 */
199 nonvirtual Containers::Queue<T> GetQueue () const;
200
201 public:
202 /**
203 */
204 nonvirtual void clear ();
205
206 private:
207 mutable ConditionVariable<> fConditionVariable_;
208 bool fEndOfInput_{false};
209 Containers::Queue<T> fQueue_;
210 };
211
212}
213
214/*
215 ********************************************************************************
216 ***************************** Implementation Details ***************************
217 ********************************************************************************
218 */
219#include "BlockingQueue.inl"
220
221#endif /*_Stroika_Foundation_Execution_BlockingQueue_h_*/
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
A Queue is a first-in-first-out (FIFO) data structure, where elements are arranged in well-ordered fa...
Definition Queue.h:95
nonvirtual Containers::Queue< T > GetQueue() const
nonvirtual optional< T > PeekHead() const
nonvirtual void AddTail(const T &e, Time::DurationSeconds timeout=Time::kInfinity)
nonvirtual optional< T > RemoveHeadIfPossible(Time::DurationSeconds timeout=0s)
nonvirtual T RemoveHead(Time::DurationSeconds timeout=Time::kInfinity)