Stroika Library 3.0d23
 
Loading...
Searching...
No Matches
WaitForIOReady.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "Stroika/Foundation/StroikaPreComp.h"
5
6#if qStroika_Foundation_Common_Platform_POSIX
7#include <poll.h>
8#include <unistd.h>
9#elif qStroika_Foundation_Common_Platform_Windows
10#include <Windows.h>
11
12#include <winsock2.h>
13
14#include <ws2tcpip.h>
15#endif
16
17#include "Stroika/Foundation/Containers/Sequence.h"
21
22#include "Exceptions.h"
23#if qStroika_Foundation_Common_Platform_Windows
24#include "Platform/Windows/Exception.h"
25#include "Platform/Windows/WaitSupport.h"
26#endif
27
28#include "Stroika/Foundation/IO/Network/ConnectionOrientedMasterSocket.h"
29#include "Stroika/Foundation/IO/Network/ConnectionOrientedStreamSocket.h"
30
31#include "WaitForIOReady.h"
32
33using namespace Stroika::Foundation;
36using namespace Stroika::Foundation::Execution;
39
40using std::byte;
41
42using Memory::BLOB;
46
47// Comment this in to turn on aggressive noisy DbgTrace in this module
48// #define USE_NOISY_TRACE_IN_THIS_MODULE_ 1
49
50namespace {
51
52 /*
53 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
54 */
55 struct EventFD_Based_ : public EventFD {
56
57 EventFD_Based_ () = default;
58 virtual bool IsSet () const override
59 {
60 return fIsSet_;
61 }
62 virtual void Set () override
63 {
64 // If already set, nothing todo. To set, we set flag, and write so anybody selecting will wakeup
65 if (not IsSet ()) {
66 fIsSet_ = true;
67 _WriteOne (); // so select calls wake
68 }
69 }
70 virtual void Clear () override
71 {
72 if (IsSet ()) {
73 fIsSet_ = false;
74 _ReadAllAvail (); // so select calls don't prematurely wake
75 }
76 }
77
78 protected:
79 virtual void _ReadAllAvail () = 0;
80 virtual void _WriteOne () = 0;
81
82 private:
83 atomic<bool> fIsSet_{false}; // cuz called from multiple threads - sync
84 };
85
86 /*
87 * This strategy may not be the most efficient (esp to construct) but it should work
88 * portably, so implemented first.
89 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
90 */
91 struct EventFD_Based_SocketPair_ : EventFD_Based_ {
92 static const inline BLOB sSingleEltDatum{BLOB ({1})};
93
94 EventFD_Based_SocketPair_ ()
95 {
96 Debug::TraceContextBumper ctx{Stroika_Foundation_Debug_OptionalizeTraceArgs ("EventFD_Based_SocketPair_::CTOR")};
97 auto [r, w] = ConnectionOrientedStreamSocket::NewPair (SocketAddress::FamilyType::INET, Socket::Type::STREAM);
98 fReadSocket_ = r;
99 fWriteSocket_ = w;
100 }
101 ConnectionOrientedStreamSocket::Ptr fReadSocket_{nullptr};
102 ConnectionOrientedStreamSocket::Ptr fWriteSocket_{nullptr};
103
104 virtual pair<SDKPollableType, WaitForIOReady_Base::TypeOfMonitorSet> GetWaitInfo () override
105 {
106 // Poll on read FD to see if data available to read
107 return pair<SDKPollableType, WaitForIOReady_Base::TypeOfMonitorSet>{
108 fReadSocket_.GetNativeSocket (), WaitForIOReady_Base::TypeOfMonitorSet{WaitForIOReady_Base::TypeOfMonitor::eRead}};
109 }
110 virtual void _ReadAllAvail () override
111 {
112 // thread safety OK cuz only reading from Ptr (nobody writes) and socket rep internally synchronized
113 byte buf[1024];
114 while (fReadSocket_.ReadNonBlocking (buf))
115 ;
116 }
117 virtual void _WriteOne () override
118 {
119 // thread safety OK cuz only reading from Ptr (nobody writes) and socket rep internally synchronized
120 fWriteSocket_.Write (sSingleEltDatum);
121 }
122 };
123
124}
125
126/*
127 ********************************************************************************
128 *********** Execution::WaitForIOReady::WaitForIOReady_Support::mkEventFD *******
129 ********************************************************************************
130 */
132{
133 Debug::TraceContextBumper ctx{"WaitForIOReady_Support::mkEventFD"};
134 // @todo - See http://stroika-bugs.sophists.com/browse/STK-709
135 // to support eventfd and pipe based helper classes
136 /// need ifdefs to allow build based on eventfd, or pipe
137 return make_unique<EventFD_Based_SocketPair_> ();
138}
139
140/*
141 ********************************************************************************
142 **************** Execution::WaitForIOReady::WaitForIOReady_Base ****************
143 ********************************************************************************
144 */
145auto WaitForIOReady_Base::_WaitQuietlyUntil (const pair<SDKPollableType, TypeOfMonitorSet>* start,
146 const pair<SDKPollableType, TypeOfMonitorSet>* end, TimePointSeconds timeoutAt) -> Containers::Set<size_t>
147{
148#if USE_NOISY_TRACE_IN_THIS_MODULE_
149 Debug::TraceContextBumper ctx{"WaitForIOReady_Base::_WaitQuietlyUntil", "args={}"_f, vector<pair<SDKPollableType, TypeOfMonitorSet>>{start, end}};
150#endif
151 DurationSeconds time2Wait = Math::AtLeast<DurationSeconds> (timeoutAt - Time::GetTickCount (), 0s);
153 StackBuffer<pollfd> pollData;
154 {
155 pollData.GrowToSize_uninitialized (end - start);
156 size_t idx = 0;
157 for (auto i = start; i != end; ++i) {
158 short events = 0;
159 for (TypeOfMonitor ii : i->second) {
160 switch (ii) {
161 case TypeOfMonitor::eRead:
162 events |= POLLIN;
163 break;
164 case TypeOfMonitor::eWrite:
165 events |= POLLOUT;
166 break;
167 case TypeOfMonitor::ePriority:
168 events |= POLLPRI;
169 break;
170 }
171 }
172 pollData[idx] = pollfd{i->first, events, 0};
173 Assert (pollData[idx].revents == 0);
174 idx++;
175 }
176 }
177 // USE ppoll? Also verify meaning of timeout, as docs on http://linux.die.net/man/2/poll seem to suggest
178 // I have this wrong but I suspect docs wrong (says "The timeout argument specifies the minimum number of milliseconds that poll() will block"
179 // which sounds backward...
180 [[maybe_unused]] int timeoutMilliseconds = Math::Round<int> (time2Wait.count () * 1000);
181 Assert (timeoutMilliseconds >= 0);
182 int pollResult;
183#if qStroika_Foundation_Common_Platform_Windows
184#if qStroika_Foundation_Execution_WaitForIOReady_BreakWSAPollIntoTimedMillisecondChunks > 0
185 while (true) {
187 DurationSeconds timeLeft2Wait = Math::AtLeast<DurationSeconds> (timeoutAt - Time::GetTickCount (), 0s);
188 DurationSeconds time2WaitThisLoop = clamp<DurationSeconds> (
189 timeLeft2Wait, 0s, DurationSeconds{qStroika_Foundation_Execution_WaitForIOReady_BreakWSAPollIntoTimedMillisecondChunks / 1000.0});
190 int time2WaitMillisecondsThisLoop = static_cast<int> (time2WaitThisLoop.count () * 1000);
191 if ((pollResult = ::WSAPoll (pollData.begin (), static_cast<ULONG> (pollData.GetSize ()), time2WaitMillisecondsThisLoop)) == SOCKET_ERROR) {
192 ThrowSystemErrNo (::WSAGetLastError ());
193 }
194 if (pollResult != 0 or Time::GetTickCount () >= timeoutAt) {
195 break;
196 }
197 }
198#else
199 if ((pollResult = ::WSAPoll (pollData.begin (), static_cast<ULONG> (pollData.GetSize ()), timeoutMilliseconds)) == SOCKET_ERROR) {
200 Platform::Windows::Exception::Throw (::WSAGetLastError ());
201 }
202#endif
203#else
204 pollResult = Handle_ErrNoResultInterruption ([&] () { return ::poll (pollData.begin (), pollData.GetSize (), timeoutMilliseconds); });
205#endif
206 Set<size_t> result;
207#if USE_NOISY_TRACE_IN_THIS_MODULE_
209#endif
210 if (pollResult != 0) {
211 for (size_t i = 0; i < pollData.GetSize (); ++i) {
212 if (pollData[i].revents != 0) {
213#if USE_NOISY_TRACE_IN_THIS_MODULE_
214 dbgResult += start[i].first;
215#endif
216 result.Add (i);
217 }
218 }
219 }
220#if USE_NOISY_TRACE_IN_THIS_MODULE_
221 DbgTrace ("returning {}"_f, dbgResult);
222#endif
223 return result;
224}
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
#define DbgTrace
Definition Trace.h:317
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Definition Trace.h:278
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
nonvirtual PlatformNativeHandle GetNativeSocket() const
Definition Socket.inl:52
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
auto Handle_ErrNoResultInterruption(CALL call) -> decltype(call())
Handle UNIX EINTR system call behavior - fairly transparently - just effectively removes them from th...