4#include "Stroika/Foundation/StroikaPreComp.h"
6#if qStroika_Foundation_Common_Platform_POSIX
9#include <netinet/tcp.h>
10#include <sys/socket.h>
18#include "Stroika/Foundation/IO/Network/ConnectionOrientedStreamSocket.h"
19#include "Stroika/Foundation/IO/Network/Socket-Private_.h"
28using namespace Stroika::Foundation::IO::Network::PRIVATE_;
30using namespace ConnectionOrientedStreamSocket;
35 struct Rep_ : BackSocketImpl_<ConnectionOrientedStreamSocket::_IRep> {
36 using inherited = BackSocketImpl_<ConnectionOrientedStreamSocket::_IRep>;
40#if USE_NOISY_TRACE_IN_THIS_MODULE_
41 DbgTrace (
"Constructed BackSocketImpl_<ConnectionOrientedStreamSocket>::Rep_ with sd={:x}"_f, (
int)sd);
45 Rep_ (
const Rep_&) =
delete;
49 if (fSD_ != kINVALID_NATIVE_HANDLE_) {
53 virtual void Close ()
override
56 if (fSD_ != kINVALID_NATIVE_HANDLE_ and fAutomaticTCPDisconnectOnClose_) {
57 Shutdown (Socket::ShutdownTarget::eWrites);
62 (void)ioReady.WaitUntil (timeOutAt);
64#if qStroika_Foundation_Common_Platform_POSIX
65 int nb = ::read (fSD_, data, std::size (data));
66#elif qStroika_Foundation_Common_Platform_Windows
68 int nb = ::recv (fSD_, data, (
int)std::size (data), flags);
71 DbgTrace (
"Warning: {} unread bytes to be read on socket when it was closed."_f,
77#if USE_NOISY_TRACE_IN_THIS_MODULE_
78 DbgTrace (L
"timeout closing down socket - not serious - just means client didn't send close ACK quickly enough");
84 nonvirtual
void Connect_Sync_ (
const SocketAddress& sockAddr)
const
87 sockaddr_storage useSockAddr = sockAddr.
As<sockaddr_storage> ();
88#if qStroika_Foundation_Common_Platform_POSIX
90#elif qStroika_Foundation_Common_Platform_Windows
91 ThrowWSASystemErrorIfSOCKET_ERROR (::connect (fSD_, (sockaddr*)&useSockAddr,
static_cast<int> (sockAddr.
GetRequiredSize ())));
99 sockaddr_storage useSockAddr = sockAddr.
As<sockaddr_storage> ();
100#if qStroika_Foundation_Common_Platform_POSIX
106 [[maybe_unused]]
auto&& cleanup =
Finally ([
this, savedFlags] ()
noexcept {
108 if (::fcntl (fSD_, F_SETFL, savedFlags) < 0) {
112 while (::connect (fSD_, (sockaddr*)&useSockAddr, sockAddr.
GetRequiredSize ()) < 0) {
119 FD_SET (fSD_, &myset);
120 timeval time_out = timeout.
As<timeval> ();
122 auto r = ::select (fSD_ + 1, NULL, &myset,
nullptr, &time_out);
130 if (
auto err = getsockopt<int> (SOL_SOCKET, SO_ERROR)) {
131 Execution::ThrowSystemErrNo (err);
136 Execution::ThrowSystemErrNo ();
140#elif qStroika_Foundation_Common_Platform_Windows
144 if (::ioctlsocket (fSD_, FIONBIO, &block) == SOCKET_ERROR) {
145 Execution::ThrowSystemErrNo (::WSAGetLastError ());
148 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept {
150 if (::ioctlsocket (fSD_, FIONBIO, &block) == SOCKET_ERROR) {
155 if (::connect (fSD_, (sockaddr*)&useSockAddr,
static_cast<int> (sockAddr.
GetRequiredSize ())) == SOCKET_ERROR) {
156 if (::WSAGetLastError () != WSAEWOULDBLOCK) {
157 Execution::ThrowSystemErrNo (::WSAGetLastError ());
162 FD_SET (fSD_, &setW);
165 FD_SET (fSD_, &setE);
166 timeval time_out = timeout.
As<timeval> ();
167 int ret = ::select (0, NULL, &setW, &setE, &time_out);
171 WSASetLastError (WSAETIMEDOUT);
173 Execution::ThrowSystemErrNo (::WSAGetLastError ());
176 if (
auto err = getsockopt<int> (SOL_SOCKET, SO_ERROR)) {
177 Execution::ThrowSystemErrNo (err);
185 virtual void Connect (
const SocketAddress& sockAddr,
const optional<Time::Duration>& timeout)
const override
187 Debug::TraceContextBumper ctx{
"ConnectionOrientedStreamSocket_IMPL_::Connect",
"sockAddr={}, timeout={}"_f, sockAddr, timeout};
189 Connect_AsyncWTimeout_ (sockAddr, *timeout);
192 Connect_Sync_ (sockAddr);
195 virtual span<byte> Read (span<byte> into)
const override
199#if qStroika_Foundation_Debug_AssertionsChecked
200 Assert (fCurrentPendingReadsCount++ == 0);
201 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept { Assert (--fCurrentPendingReadsCount == 0); });
204#if qStroika_Foundation_Common_Platform_POSIX
205 return into.subspan (
207#elif qStroika_Foundation_Common_Platform_Windows
209 int nBytesToRead =
static_cast<int> (min<size_t> (into.size (), numeric_limits<int>::max ()));
210 return into.subspan (0,
static_cast<size_t> (ThrowWSASystemErrorIfSOCKET_ERROR (
211 ::recv (fSD_,
reinterpret_cast<char*
> (into.data ()), nBytesToRead, flags))));
216 virtual optional<span<byte>> ReadNonBlocking (span<byte> into)
const override
219 if (AvailableToRead ()) {
224 virtual optional<size_t> AvailableToRead ()
const override
227#if qStroika_Foundation_Debug_AssertionsChecked
228 Assert (fCurrentPendingReadsCount++ == 0);
229 [[maybe_unused]]
auto&& cleanup =
Finally ([
this] ()
noexcept { Assert (--fCurrentPendingReadsCount == 0); });
231#if qStroika_Foundation_Common_Platform_POSIX or qStroika_Foundation_Common_Platform_Windows
235 FD_SET (fSD_, &input);
236 struct timeval timeout{};
237 if (::select (
static_cast<int> (fSD_) + 1, &input, NULL, NULL, &timeout) == 1) {
241#if qStroika_Foundation_Common_Platform_POSIX
243#elif qStroika_Foundation_Common_Platform_Windows
244 int tmp = ThrowWSASystemErrorIfSOCKET_ERROR (::recv (fSD_, buf,
static_cast<int> (std::size (buf)), MSG_PEEK));
259 virtual void Write (span<const byte> data)
const override
262#if USE_NOISY_TRACE_IN_THIS_MODULE_
266#if qStroika_Foundation_Common_Platform_POSIX
272 BreakWriteIntoParts_<byte> (data, numeric_limits<int>::max (), [
this] (span<const byte> data) ->
size_t {
273 Assert (data.size () < numeric_limits<int>::max ());
277 Assert (0 <= n and n <= data.size ());
278 return static_cast<size_t> (n);
280#elif qStroika_Foundation_Common_Platform_Windows
287 size_t maxSendAtATime = getsockopt<unsigned int> (SOL_SOCKET, SO_MAX_MSG_SIZE);
288 BreakWriteIntoParts_<byte> (data, maxSendAtATime, [
this, maxSendAtATime] (span<const byte> data) ->
size_t {
289 Require (data.size () <= maxSendAtATime);
290 Assert (data.size () <
static_cast<size_t> (numeric_limits<int>::max ()));
291 int len =
static_cast<int> (data.size ());
293 int n = ThrowWSASystemErrorIfSOCKET_ERROR (::send (fSD_,
reinterpret_cast<const char*
> (data.data ()), len, flags));
294 Assert (0 <= n and
static_cast<size_t> (n) <= data.size ());
295 return static_cast<size_t> (n);
301 virtual optional<IO::Network::SocketAddress> GetPeerAddress ()
const override
304 struct sockaddr_storage radr;
305 socklen_t len =
sizeof (radr);
306 if (::getpeername (
static_cast<int> (fSD_), (
struct sockaddr*)&radr, &len) == 0) {
312 virtual optional<Time::DurationSeconds> GetAutomaticTCPDisconnectOnClose ()
const override
315 return fAutomaticTCPDisconnectOnClose_;
317 virtual void SetAutomaticTCPDisconnectOnClose (
const optional<Time::DurationSeconds>& waitFor)
override
320 fAutomaticTCPDisconnectOnClose_ = waitFor;
322 virtual KeepAliveOptions GetKeepAlives ()
const override
325 KeepAliveOptions result;
326 result.fEnabled = !!getsockopt<int> (SOL_SOCKET, SO_KEEPALIVE);
327#if qStroika_Foundation_Common_Platform_Linux
329 result.fMaxProbesSentBeforeDrop = getsockopt<int> (SOL_TCP, TCP_KEEPCNT);
330 result.fTimeIdleBeforeSendingKeepalives =
Time::DurationSeconds{getsockopt<int> (SOL_TCP, TCP_KEEPIDLE)};
331 result.fTimeBetweenIndividualKeepaliveProbes =
Time::DurationSeconds{getsockopt<int> (SOL_TCP, TCP_KEEPINTVL)};
332#elif qStroika_Foundation_Common_Platform_Windows
338 virtual void SetKeepAlives (
const KeepAliveOptions& keepAliveOptions)
override
341 setsockopt<int> (SOL_SOCKET, SO_KEEPALIVE, keepAliveOptions.fEnabled);
342#if qStroika_Foundation_Common_Platform_Linux
344 if (keepAliveOptions.fMaxProbesSentBeforeDrop) {
345 setsockopt<int> (SOL_TCP, TCP_KEEPCNT, *keepAliveOptions.fMaxProbesSentBeforeDrop);
347 if (keepAliveOptions.fTimeIdleBeforeSendingKeepalives) {
348 setsockopt<int> (SOL_TCP, TCP_KEEPIDLE,
static_cast<int> (keepAliveOptions.fTimeIdleBeforeSendingKeepalives->count ()));
350 if (keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes) {
351 setsockopt<int> (SOL_TCP, TCP_KEEPINTVL,
static_cast<int> (keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes->count ()));
353#elif qStroika_Foundation_Common_Platform_Windows
355 if (keepAliveOptions.fEnabled and
356 (keepAliveOptions.fTimeIdleBeforeSendingKeepalives or keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes)) {
357 tcp_keepalive alive{keepAliveOptions.fEnabled};
359 alive.keepalivetime = Math::Round<ULONG> (keepAliveOptions.fTimeIdleBeforeSendingKeepalives.value_or (2 * 60 * 60s).count () * 1000.0);
360 alive.keepaliveinterval = Math::Round<ULONG> (keepAliveOptions.fTimeBetweenIndividualKeepaliveProbes.value_or (1s).count () * 1000.0);
362 if (::WSAIoctl (fSD_, SIO_KEEPALIVE_VALS, &alive,
sizeof (alive), NULL, 0, &dwBytesRet, NULL, NULL) == SOCKET_ERROR) {
363 Execution::ThrowSystemErrNo (::WSAGetLastError ());
368 virtual bool GetTCPNoDelay ()
const override
371 return static_cast<bool> (getsockopt<int> (IPPROTO_TCP, TCP_NODELAY));
373 virtual void SetTCPNoDelay (
bool noDelay)
override
376 setsockopt<int> (IPPROTO_TCP, TCP_NODELAY, noDelay);
378 optional<Time::DurationSeconds> fAutomaticTCPDisconnectOnClose_;
379#if qStroika_Foundation_Debug_AssertionsChecked
380 mutable atomic<int> fCurrentPendingReadsCount{};
390Characters::String Network::ConnectionOrientedStreamSocket::KeepAliveOptions::ToString ()
const
394 sb <<
"Enabled: "sv << fEnabled;
395#if qStroika_Foundation_Common_Platform_Linux or qStroika_Foundation_Common_Platform_Windows
396 if (fMaxProbesSentBeforeDrop) {
397 sb <<
", Max-Probes-Sent-Before-Drop: "sv << fMaxProbesSentBeforeDrop;
399 if (fTimeIdleBeforeSendingKeepalives) {
400 sb <<
", Time-Idle-Before-Sending-Keepalives: "sv << fTimeIdleBeforeSendingKeepalives;
402 if (fTimeBetweenIndividualKeepaliveProbes) {
403 sb <<
", Time-Between-Individual-Keepalive-Probes: "sv << fTimeBetweenIndividualKeepaliveProbes;
417 return Ptr{Memory::MakeSharedPtr<Rep_> (_Protected::mkLowLevelSocket_ (family, socketKind, protocol))};
422 return Ptr{Memory::MakeSharedPtr<Rep_> (sd)};
425#include "ConnectionOrientedMasterSocket.h"
426auto ConnectionOrientedStreamSocket::NewPair (
SocketAddress::FamilyType family, Type socketKind,
const optional<IPPROTO>& protocol) -> tuple<Ptr, Ptr>
428 constexpr bool kLowLevelSocketPairWorks_{
true};
429 if constexpr (kLowLevelSocketPairWorks_) {
430 auto sp = _Protected::mkLowLevelSocketPair_ (family, socketKind, protocol);
431 return make_tuple (Attach (get<0> (sp)), Attach (get<1> (sp)));
438 connectionOrientedMaster.Listen (1);
441 auto one = ConnectionOrientedStreamSocket::NewConnection (*connectionOrientedMaster.GetLocalAddress ());
442 auto two = connectionOrientedMaster.Accept ();
443 return make_tuple (one, two);
454 linger lr = getsockopt<linger> (SOL_SOCKET, SO_LINGER);
455 return lr.l_onoff ? lr.l_linger : optional<int>{};
460 ::linger so_linger{};
462 so_linger.l_onoff =
true;
463 so_linger.l_linger =
static_cast<u_short
> (*linger);
465 setsockopt<::linger> (SOL_SOCKET, SO_LINGER, so_linger);
#define AssertNotImplemented()
#define AssertNotReached()
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
static const TimeOutException kThe
nonvirtual void SetLinger(const optional< int > &linger) const
nonvirtual optional< int > GetLinger() const
nonvirtual size_t GetRequiredSize() const
FamilyType
Socket address family - also sometimes referred to as domain (argument to ::socket calls it domain)
Duration is a chrono::duration<double> (=.
auto Handle_ErrNoResultInterruption(CALL call) -> decltype(call())
Handle UNIX EINTR system call behavior - fairly transparently - just effectively removes them from th...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
INT_TYPE ThrowPOSIXErrNoIfNegative(INT_TYPE returnCode)
Ptr New(SocketAddress::FamilyType family, Type socketKind, const optional< IPPROTO > &protocol={})
constexpr InternetAddress LocalHost(SocketAddress::FamilyType fm)
return V4::kLocalhost or V6::kLocalhost depending on argument address family