Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
WaitForIOReady.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#include "Stroika/Foundation/StroikaPreComp.h"
5
6#if qStroika_Foundation_Common_Platform_POSIX
7#include <poll.h>
8#include <unistd.h>
9#elif qStroika_Foundation_Common_Platform_Windows
10#include <Windows.h>
11
12#include <winsock2.h>
13
14#include <ws2tcpip.h>
15#endif
16
20
21#include "Exceptions.h"
22#if qStroika_Foundation_Common_Platform_Windows
23#include "Platform/Windows/Exception.h"
24#include "Platform/Windows/WaitSupport.h"
25#endif
26
27#include "Stroika/Foundation/IO/Network/ConnectionOrientedMasterSocket.h"
28#include "Stroika/Foundation/IO/Network/ConnectionOrientedStreamSocket.h"
29
30#include "WaitForIOReady.h"
31
32using namespace Stroika::Foundation;
34using namespace Stroika::Foundation::Execution;
37
38using std::byte;
39
40using Memory::BLOB;
44
45namespace {
46
47 /*
48 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
49 */
50 struct EventFD_Based_ : public EventFD {
51
52 EventFD_Based_ () = default;
53 virtual bool IsSet () const override
54 {
55 return fIsSet_;
56 }
57 virtual void Set () override
58 {
59 // If already set, nothing todo. To set, we set flag, and write so anybody selecting will wakeup
60 if (not IsSet ()) {
61 fIsSet_ = true;
62 _WriteOne (); // so select calls wake
63 }
64 }
65 virtual void Clear () override
66 {
67 if (IsSet ()) {
68 fIsSet_ = false;
69 _ReadAllAvail (); // so select calls don't prematurely wake
70 }
71 }
72
73 protected:
74 virtual void _ReadAllAvail () = 0;
75 virtual void _WriteOne () = 0;
76
77 private:
78 atomic<bool> fIsSet_{false}; // cuz called from multiple threads - sync
79 };
80
81 /*
82 * This strategy may not be the most efficient (esp to construct) but it should work
83 * portably, so implemented first.
84 * \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
85 */
86 struct EventFD_Based_SocketPair_ : EventFD_Based_ {
87 static const inline BLOB sSingleEltDatum{BLOB ({1})};
88
89 EventFD_Based_SocketPair_ ()
90 {
91 Debug::TraceContextBumper ctx{Stroika_Foundation_Debug_OptionalizeTraceArgs ("EventFD_Based_SocketPair_::CTOR")};
92 auto [r, w] = ConnectionOrientedStreamSocket::NewPair (SocketAddress::FamilyType::INET, Socket::Type::STREAM);
93 fReadSocket_ = r;
94 fWriteSocket_ = w;
95 }
96 ConnectionOrientedStreamSocket::Ptr fReadSocket_{nullptr};
97 ConnectionOrientedStreamSocket::Ptr fWriteSocket_{nullptr};
98
99 virtual pair<SDKPollableType, WaitForIOReady_Base::TypeOfMonitorSet> GetWaitInfo () override
100 {
101 // Poll on read FD to see if data available to read
102 return pair<SDKPollableType, WaitForIOReady_Base::TypeOfMonitorSet>{
103 fReadSocket_.GetNativeSocket (), WaitForIOReady_Base::TypeOfMonitorSet{WaitForIOReady_Base::TypeOfMonitor::eRead}};
104 }
105 virtual void _ReadAllAvail () override
106 {
107 // thread safety OK cuz only reading from Ptr (nobody writes) and socket rep internally synchronized
108 byte buf[1024];
109 while (fReadSocket_.ReadNonBlocking (buf))
110 ;
111 }
112 virtual void _WriteOne () override
113 {
114 // thread safety OK cuz only reading from Ptr (nobody writes) and socket rep internally synchronized
115 fWriteSocket_.Write (sSingleEltDatum);
116 }
117 };
118
119}
120
121/*
122 ********************************************************************************
123 *********** Execution::WaitForIOReady::WaitForIOReady_Support::mkEventFD *******
124 ********************************************************************************
125 */
127{
128 Debug::TraceContextBumper ctx{"WaitForIOReady_Support::mkEventFD"};
129 // @todo - See http://stroika-bugs.sophists.com/browse/STK-709
130 // to support eventfd and pipe based helper classes
131 /// need ifdefs to allow build based on eventfd, or pipe
132 return make_unique<EventFD_Based_SocketPair_> ();
133}
134
135/*
136 ********************************************************************************
137 **************** Execution::WaitForIOReady::WaitForIOReady_Base ****************
138 ********************************************************************************
139 */
140auto WaitForIOReady_Base::_WaitQuietlyUntil (const pair<SDKPollableType, TypeOfMonitorSet>* start,
141 const pair<SDKPollableType, TypeOfMonitorSet>* end, TimePointSeconds timeoutAt) -> Containers::Set<size_t>
142{
143 DurationSeconds time2Wait = Math::AtLeast<DurationSeconds> (timeoutAt - Time::GetTickCount (), 0s);
145 StackBuffer<pollfd> pollData;
146 {
147 pollData.GrowToSize_uninitialized (end - start);
148 size_t idx = 0;
149 for (auto i = start; i != end; ++i) {
150 short events = 0;
151 for (TypeOfMonitor ii : i->second) {
152 switch (ii) {
153 case TypeOfMonitor::eRead:
154 events |= POLLIN;
155 break;
156 case TypeOfMonitor::eWrite:
157 events |= POLLOUT;
158 break;
159 case TypeOfMonitor::eError:
160 events |= POLLERR;
161 break;
162 case TypeOfMonitor::eHUP:
163 events |= POLLHUP;
164 break;
165 }
166 }
167 pollData[idx] = pollfd{i->first, events, 0};
168 Assert (pollData[idx].revents == 0);
169 idx++;
170 }
171 }
172 // USE ppoll? Also verify meaning of timeout, as docs on http://linux.die.net/man/2/poll seem to suggest
173 // I have this wrong but I suspect docs wrong (says "The timeout argument specifies the minimum number of milliseconds that poll() will block"
174 // which sounds backward...
175 [[maybe_unused]] int timeoutMilliseconds = Math::Round<int> (time2Wait.count () * 1000);
176 Assert (timeoutMilliseconds >= 0);
177 int pollResult;
178#if qStroika_Foundation_Common_Platform_Windows
179#if qStroika_Foundation_Execution_WaitForIOReady_BreakWSAPollIntoTimedMillisecondChunks > 0
180 while (true) {
182 DurationSeconds timeLeft2Wait = Math::AtLeast<DurationSeconds> (timeoutAt - Time::GetTickCount (), 0s);
183 DurationSeconds time2WaitThisLoop = clamp<DurationSeconds> (
184 timeLeft2Wait, 0s, DurationSeconds{qStroika_Foundation_Execution_WaitForIOReady_BreakWSAPollIntoTimedMillisecondChunks / 1000.0});
185 int time2WaitMillisecondsThisLoop = static_cast<int> (time2WaitThisLoop.count () * 1000);
186 if ((pollResult = ::WSAPoll (pollData.begin (), static_cast<ULONG> (pollData.GetSize ()), time2WaitMillisecondsThisLoop)) == SOCKET_ERROR) {
187 ThrowSystemErrNo (::WSAGetLastError ());
188 }
189 if (pollResult != 0 or Time::GetTickCount () >= timeoutAt) {
190 break;
191 }
192 }
193#else
194 if ((pollResult = ::WSAPoll (pollData.begin (), static_cast<ULONG> (pollData.GetSize ()), timeoutMilliseconds)) == SOCKET_ERROR) {
195 Platform::Windows::Exception::Throw (::WSAGetLastError ());
196 }
197#endif
198#else
199 pollResult = Handle_ErrNoResultInterruption ([&] () { return ::poll (pollData.begin (), pollData.GetSize (), timeoutMilliseconds); });
200#endif
201 Set<size_t> result;
202 if (pollResult != 0) {
203 for (size_t i = 0; i < pollData.GetSize (); ++i) {
204 if (pollData[i].revents != 0) {
205 result.Add (i);
206 }
207 }
208 }
209 return result;
210}
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Definition Trace.h:270
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
Definition Set.h:105
nonvirtual void Add(ArgByValueType< value_type > item)
Definition Set.inl:138
nonvirtual PlatformNativeHandle GetNativeSocket() const
Definition Socket.inl:52
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
auto Handle_ErrNoResultInterruption(CALL call) -> decltype(call())
Handle UNIX EINTR system call behavior - fairly transparently - just effectively removes them from th...