4#include "Stroika/Foundation/StroikaPreComp.h"
6#if qStroika_Foundation_Common_Platform_POSIX
9#include <netinet/tcp.h>
10#include <sys/socket.h>
18#include "Stroika/Foundation/IO/Network/ConnectionOrientedStreamSocket.h"
19#include "Stroika/Foundation/IO/Network/Socket-Private_.h"
27using namespace Stroika::Foundation::IO::Network::PRIVATE_;
29using namespace ConnectionOrientedStreamSocket;
34 struct Rep_ : BackSocketImpl_<ConnectionOrientedStreamSocket::_IRep> {
35 using inherited = BackSocketImpl_<ConnectionOrientedStreamSocket::_IRep>;
39#if USE_NOISY_TRACE_IN_THIS_MODULE_
40 DbgTrace (
"Constructed BackSocketImpl_<ConnectionOrientedStreamSocket>::Rep_ with sd={:x}"_f, (
int)sd);
44 Rep_ (
const Rep_&) =
delete;
48 if (fSD_ != kINVALID_NATIVE_HANDLE_) {
52 virtual void Close ()
override
55 if (fSD_ != kINVALID_NATIVE_HANDLE_ and fAutomaticTCPDisconnectOnClose_) {
56 Shutdown (Socket::ShutdownTarget::eWrites);
61 (void)ioReady.WaitUntil (timeOutAt);
63#if qStroika_Foundation_Common_Platform_POSIX
64 int nb = ::read (fSD_, data, NEltsOf (data));
65#elif qStroika_Foundation_Common_Platform_Windows
67 int nb = ::recv (fSD_, data, (
int)NEltsOf (data), flags);
70 DbgTrace (
"Warning: {} unread bytes to be read on socket when it was closed."_f,
76#if USE_NOISY_TRACE_IN_THIS_MODULE_
77 DbgTrace (L
"timeout closing down socket - not serious - just means client didn't send close ACK quickly enough");
83 nonvirtual
void Connect_Sync_ (
const SocketAddress& sockAddr)
const
86 sockaddr_storage useSockAddr = sockAddr.
As<sockaddr_storage> ();
87#if qStroika_Foundation_Common_Platform_POSIX
89#elif qStroika_Foundation_Common_Platform_Windows
90 ThrowWSASystemErrorIfSOCKET_ERROR (::connect (fSD_, (sockaddr*)&useSockAddr,
static_cast<int> (sockAddr.
GetRequiredSize ())));
98 sockaddr_storage useSockAddr = sockAddr.
As<sockaddr_storage> ();
99#if qStroika_Foundation_Common_Platform_POSIX
105 [[maybe_unused]]
auto&& cleanup =
Finally ([
this, savedFlags] ()
noexcept {
107 if (::fcntl (fSD_, F_SETFL, savedFlags) < 0) {
111 while (::connect (fSD_, (sockaddr*)&useSockAddr, sockAddr.
GetRequiredSize ()) < 0) {
118 FD_SET (fSD_, &myset);
119 timeval time_out = timeout.
As<timeval> ();
121 auto r = ::select (fSD_ + 1, NULL, &myset,
nullptr, &time_out);
129 if (
auto err = getsockopt<int> (SOL_SOCKET, SO_ERROR)) {
130 Execution::ThrowSystemErrNo (err);
135 Execution::ThrowSystemErrNo ();
139#elif qStroika_Foundation_Common_Platform_Windows
143 if (::ioctlsocket (fSD_, FIONBIO, &block) == SOCKET_ERROR) {
144 Execution::ThrowSystemErrNo (::WSAGetLastError ());
147 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept {
149 if (::ioctlsocket (fSD_, FIONBIO, &block) == SOCKET_ERROR) {
154 if (::connect (fSD_, (sockaddr*)&useSockAddr,
static_cast<int> (sockAddr.
GetRequiredSize ())) == SOCKET_ERROR) {
155 if (::WSAGetLastError () != WSAEWOULDBLOCK) {
156 Execution::ThrowSystemErrNo (::WSAGetLastError ());
161 FD_SET (fSD_, &setW);
164 FD_SET (fSD_, &setE);
165 timeval time_out = timeout.
As<timeval> ();
166 int ret = ::select (0, NULL, &setW, &setE, &time_out);
170 WSASetLastError (WSAETIMEDOUT);
172 Execution::ThrowSystemErrNo (::WSAGetLastError ());
175 if (
auto err = getsockopt<int> (SOL_SOCKET, SO_ERROR)) {
176 Execution::ThrowSystemErrNo (err);
184 virtual void Connect (
const SocketAddress& sockAddr,
const optional<Time::Duration>& timeout)
const override
186 Debug::TraceContextBumper ctx{
"ConnectionOrientedStreamSocket_IMPL_::Connect",
"sockAddr={}, timeout={}"_f, sockAddr, timeout};
188 Connect_AsyncWTimeout_ (sockAddr, *timeout);
191 Connect_Sync_ (sockAddr);
194 virtual span<byte> Read (span<byte> into)
const override
198#if qStroika_Foundation_Debug_AssertionsChecked
199 Assert (fCurrentPendingReadsCount++ == 0);
200 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept { Assert (--fCurrentPendingReadsCount == 0); });
203#if qStroika_Foundation_Common_Platform_POSIX
204 return into.subspan (
206#elif qStroika_Foundation_Common_Platform_Windows
208 int nBytesToRead =
static_cast<int> (min<size_t> (into.size (), numeric_limits<int>::max ()));
209 return into.subspan (0,
static_cast<size_t> (ThrowWSASystemErrorIfSOCKET_ERROR (
210 ::recv (fSD_,
reinterpret_cast<char*
> (into.data ()), nBytesToRead, flags))));
215 virtual optional<span<byte>> ReadNonBlocking (span<byte> into)
const override
218 if (AvailableToRead ()) {
223 virtual optional<size_t> AvailableToRead ()
const override
226#if qStroika_Foundation_Debug_AssertionsChecked
227 Assert (fCurrentPendingReadsCount++ == 0);
228 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept { Assert (--fCurrentPendingReadsCount == 0); });
230#if qStroika_Foundation_Common_Platform_POSIX or qStroika_Foundation_Common_Platform_Windows
234 FD_SET (fSD_, &input);
235 struct timeval timeout{};
236 if (::select (
static_cast<int> (fSD_) + 1, &input, NULL, NULL, &timeout) == 1) {
240#if qStroika_Foundation_Common_Platform_POSIX
242#elif qStroika_Foundation_Common_Platform_Windows
243 int tmp = ThrowWSASystemErrorIfSOCKET_ERROR (::recv (fSD_, buf,
static_cast<int> (NEltsOf (buf)), MSG_PEEK));
258 virtual void Write (span<const byte> data)
const override
261#if USE_NOISY_TRACE_IN_THIS_MODULE_
265#if qStroika_Foundation_Common_Platform_POSIX
271 BreakWriteIntoParts_<byte> (data, numeric_limits<int>::max (), [
this] (span<const byte> data) ->
size_t {
272 Assert (data.size () < numeric_limits<int>::max ());
276 Assert (0 <= n and n <= data.size ());
277 return static_cast<size_t> (n);
279#elif qStroika_Foundation_Common_Platform_Windows
286 size_t maxSendAtATime = getsockopt<unsigned int> (SOL_SOCKET, SO_MAX_MSG_SIZE);
287 BreakWriteIntoParts_<byte> (data, maxSendAtATime, [
this, maxSendAtATime] (span<const byte> data) ->
size_t {
288 Require (data.size () <= maxSendAtATime);
289 Assert (data.size () <
static_cast<size_t> (numeric_limits<int>::max ()));
290 int len =
static_cast<int> (data.size ());
292 int n = ThrowWSASystemErrorIfSOCKET_ERROR (::send (fSD_,
reinterpret_cast<const char*
> (data.data ()), len, flags));
293 Assert (0 <= n and
static_cast<size_t> (n) <= data.size ());
294 return static_cast<size_t> (n);
300 virtual optional<IO::Network::SocketAddress> GetPeerAddress ()
const override
303 struct sockaddr_storage radr;
304 socklen_t len =
sizeof (radr);
305 if (::getpeername (
static_cast<int> (fSD_), (
struct sockaddr*)&radr, &len) == 0) {
311 virtual optional<Time::DurationSeconds> GetAutomaticTCPDisconnectOnClose ()
const override
314 return fAutomaticTCPDisconnectOnClose_;
316 virtual void SetAutomaticTCPDisconnectOnClose (
const optional<Time::DurationSeconds>& waitFor)
override
319 fAutomaticTCPDisconnectOnClose_ = waitFor;
321 virtual KeepAliveOptions GetKeepAlives ()
const override
324 KeepAliveOptions result;
325 result.fEnabled = !!getsockopt<int> (SOL_SOCKET, SO_KEEPALIVE);
326#if qStroika_Foundation_Common_Platform_Linux
328 result.fMaxProbesSentBeforeDrop = getsockopt<int> (SOL_TCP, TCP_KEEPCNT);
329 result.fTimeIdleBeforeSendingKeepalives =
Time::DurationSeconds{getsockopt<int> (SOL_TCP, TCP_KEEPIDLE)};
330 result.fTimeBetweenIndividualKeepaliveProbes =
Time::DurationSeconds{getsockopt<int> (SOL_TCP, TCP_KEEPINTVL)};
331#elif qStroika_Foundation_Common_Platform_Windows
337 virtual void SetKeepAlives (
const KeepAliveOptions& keepAliveOptions)
override
340 setsockopt<int> (SOL_SOCKET, SO_KEEPALIVE, keepAliveOptions.fEnabled);
341#if qStroika_Foundation_Common_Platform_Linux
343 if (keepAliveOptions.fMaxProbesSentBeforeDrop) {
344 setsockopt<int> (SOL_TCP, TCP_KEEPCNT, *keepAliveOptions.fMaxProbesSentBeforeDrop);
346 if (keepAliveOptions.fTimeIdleBeforeSendingKeepalives) {
347 setsockopt<int> (SOL_TCP, TCP_KEEPIDLE,
static_cast<int> (keepAliveOptions.fTimeIdleBeforeSendingKeepalives->count ()));
349 if (keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes) {
350 setsockopt<int> (SOL_TCP, TCP_KEEPINTVL,
static_cast<int> (keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes->count ()));
352#elif qStroika_Foundation_Common_Platform_Windows
354 if (keepAliveOptions.fEnabled and
355 (keepAliveOptions.fTimeIdleBeforeSendingKeepalives or keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes)) {
356 tcp_keepalive alive{keepAliveOptions.fEnabled};
358 alive.keepalivetime = Math::Round<ULONG> (keepAliveOptions.fTimeIdleBeforeSendingKeepalives.value_or (2 * 60 * 60s).count () * 1000.0);
359 alive.keepaliveinterval = Math::Round<ULONG> (keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes.value_or (1s).count () * 1000.0);
361 if (::WSAIoctl (fSD_, SIO_KEEPALIVE_VALS, &alive,
sizeof (alive), NULL, 0, &dwBytesRet, NULL, NULL) == SOCKET_ERROR) {
362 Execution::ThrowSystemErrNo (::WSAGetLastError ());
367 virtual bool GetTCPNoDelay ()
const override
370 return static_cast<bool> (getsockopt<int> (IPPROTO_TCP, TCP_NODELAY));
372 virtual void SetTCPNoDelay (
bool noDelay)
override
375 setsockopt<int> (IPPROTO_TCP, TCP_NODELAY, noDelay);
377 optional<Time::DurationSeconds> fAutomaticTCPDisconnectOnClose_;
378#if qStroika_Foundation_Debug_AssertionsChecked
379 mutable atomic<int> fCurrentPendingReadsCount{};
389Characters::String Network::ConnectionOrientedStreamSocket::KeepAliveOptions::ToString ()
const
393 sb <<
"Enabled: "sv << fEnabled;
394#if qStroika_Foundation_Common_Platform_Linux or qStroika_Foundation_Common_Platform_Windows
395 if (fMaxProbesSentBeforeDrop) {
396 sb <<
", Max-Probes-Sent-Before-Drop: "sv << fMaxProbesSentBeforeDrop;
398 if (fTimeIdleBeforeSendingKeepalives) {
399 sb <<
", Time-Idle-Before-Sending-Keepalives: "sv << fTimeIdleBeforeSendingKeepalives;
401 if (fTimeBetweenIndividualKeepaliveProbes) {
402 sb <<
", Time-Between-Individual-Keepalive-Probes: "sv << fTimeBetweenIndividualKeepaliveProbes;
416 return Ptr{make_shared<Rep_> (_Protected::mkLowLevelSocket_ (family, socketKind, protocol))};
421 return Ptr{make_shared<Rep_> (sd)};
424#include "ConnectionOrientedMasterSocket.h"
425auto ConnectionOrientedStreamSocket::NewPair (
SocketAddress::FamilyType family, Type socketKind,
const optional<IPPROTO>& protocol) -> tuple<Ptr, Ptr>
427 constexpr bool kLowLevelSocketPairWorks_{
true};
428 if constexpr (kLowLevelSocketPairWorks_) {
429 auto sp = _Protected::mkLowLevelSocketPair_ (family, socketKind, protocol);
430 return make_tuple (Attach (get<0> (sp)), Attach (get<1> (sp)));
437 connectionOrientedMaster.Listen (1);
440 auto one = ConnectionOrientedStreamSocket::NewConnection (*connectionOrientedMaster.GetLocalAddress ());
441 auto two = connectionOrientedMaster.Accept ();
442 return make_tuple (one, two);
453 linger lr = getsockopt<linger> (SOL_SOCKET, SO_LINGER);
454 return lr.l_onoff ? lr.l_linger : optional<int>{};
459 ::linger so_linger{};
461 so_linger.l_onoff =
true;
462 so_linger.l_linger =
static_cast<u_short
> (*linger);
464 setsockopt<::linger> (SOL_SOCKET, SO_LINGER, so_linger);
#define AssertNotImplemented()
#define AssertNotReached()
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
static const TimeOutException kThe
nonvirtual void SetLinger(const optional< int > &linger) const
nonvirtual optional< int > GetLinger() const
nonvirtual size_t GetRequiredSize() const
FamilyType
Socket address family - also sometimes referred to as domain (argument to ::socket calls it domain)
Duration is a chrono::duration<double> (=.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
auto Handle_ErrNoResultInterruption(CALL call) -> decltype(call())
Handle UNIX EINTR system call behavior - fairly transparently - just effectively removes them from th...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
INT_TYPE ThrowPOSIXErrNoIfNegative(INT_TYPE returnCode)
Ptr New(SocketAddress::FamilyType family, Type socketKind, const optional< IPPROTO > &protocol={})
constexpr InternetAddress LocalHost(SocketAddress::FamilyType fm)
return V4::kLocalhost or V6::kLocalhost depending on argument address family