4#include "Stroika/Frameworks/StroikaPreComp.h"
10#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/Execution/Throw.h"
15#include "Stroika/Foundation/IO/Network/HTTP/Exception.h"
16#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
17#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
21#include "Stroika/Frameworks/WebServer/DefaultFaultInterceptor.h"
23#include "ConnectionManager.h"
29using namespace Stroika::Foundation::Math;
30using namespace Stroika::Foundation::Memory;
31using namespace Stroika::Foundation::Time;
32using namespace Stroika::Foundation::Traversal;
34using Memory::MakeSharedPtr;
36using namespace Stroika::Frameworks;
56 if (defaultFaultHandler) {
57 interceptors += *defaultFaultHandler;
65 interceptors += earlyInterceptors;
66 interceptors += beforeInterceptors;
67 interceptors += router;
68 interceptors += afterInterceptors;
74 using TypeOfMonitor = WaitForIOReady_Support::WaitForIOReady_Base::TypeOfMonitor;
83String WebServer::ConnectionManager::Statistics::ThreadPool::ToString ()
const
86 sb <<
", thread-entry-cont: "sv << fThreadEntryCount;
119 sb <<
"ThreadPool: "sv << fThreadPool;
120 sb <<
", Connections: "sv << fConnections;
139 constexpr unsigned int kMinThreadCnt_{1u};
140 return Math::AtLeast (kMinThreadCnt_,
options.fMaxConcurrentlyHandledConnections.value_or (
141 options.fMaxConnections.value_or (Options::kDefault_MaxConnections) / 5));
146 constexpr unsigned int kMinDefaultTCPBacklog_{3u};
148 Math::AtLeast (kMinDefaultTCPBacklog_,
options.fMaxConnections.value_or (Options::kDefault_MaxConnections) * 3 / 4));
155 result.fMaxConnections =
NullCoalesce (result.fMaxConnections, Options::kDefault_MaxConnections);
156 result.fMaxConcurrentlyHandledConnections =
NullCoalesce (result.fMaxConcurrentlyHandledConnections, ComputeThreadPoolSize_ (result));
157 result.fBindFlags =
NullCoalesce (result.fBindFlags, Options::kDefault_BindFlags);
158 result.fDefaultResponseHeaders =
NullCoalesce (result.fDefaultResponseHeaders, Options::kDefault_Headers);
159 result.fAutoComputeETagResponse =
NullCoalesce (result.fAutoComputeETagResponse, Options::kDefault_AutoComputeETagResponse);
160 result.fAutomaticTCPDisconnectOnClose =
NullCoalesce (result.fAutomaticTCPDisconnectOnClose, Options::kDefault_AutomaticTCPDisconnectOnClose);
161 result.fLinger =
NullCoalesce (result.fLinger, Options::kDefault_Linger);
162 result.fTCPNoDelay =
NullCoalesce (result.fTCPNoDelay, Options::kDefault_TCPNoDelay);
164 result.fTCPBacklog =
NullCoalesce (result.fTCPBacklog, ComputeConnectionBacklog_ (result));
184 v += HTTP::HeaderName::kOrigin;
186 result.fDefaultResponseHeaders = s;
194 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) ->
Sequence<
Interceptor> {
196 return thisObj->fAfterInterceptors_;
198 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto& afterInterceptors_) {
200 thisObj->fAfterInterceptors_ = afterInterceptors_;
201 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
204 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
Sequence<Interceptor> {
206 return thisObj->fBeforeInterceptors_;
208 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
beforeInterceptors) {
211 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
215 return thisObj->fBindings_;
217 ,
connections{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> ConnectionStatsCollection {
219 scoped_lock critSec{thisObj->fActiveConnections_};
220 Ensure (
Set<shared_ptr<Connection>>{thisObj->fActiveConnections_.load ()}.Intersection (thisObj->GetInactiveConnections_ ()).empty ());
221 ConnectionStatsCollection r;
222 for (
auto i : thisObj->fActiveConnections_.load ()) {
223 auto s = i->stats ();
227 for (
auto i : thisObj->GetInactiveConnections_ ()) {
228 auto s = i->stats ();
235 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> optional<Interceptor> {
237 return thisObj->fDefaultErrorHandler_;
239 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
defaultErrorHandler) {
242 thisObj->ReplaceInEarlyInterceptor_ (thisObj->fDefaultErrorHandler_.load (),
defaultErrorHandler);
247 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
Sequence<Interceptor> {
249 return thisObj->fEarlyInterceptors_;
251 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
earlyInterceptors) {
254 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
256 ,
options{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Options& {
258 return thisObj->fEffectiveOptions_;
260 ,
statistics{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Statistics {
264 return thisObj->ComputeStatistics_ ();
266 , fEffectiveOptions_{FillInDefaults_ (
options)}
267 , fBindings_{bindAddresses}
268 , fDefaultErrorHandler_{DefaultFaultInterceptor{}}
269 , fEarlyInterceptors_{mkEarlyInterceptors_ (fDefaultErrorHandler_)}
270 , fBeforeInterceptors_{}
271 , fAfterInterceptors_{}
272 , fRouter_{routes, *fEffectiveOptions_.
fCORS}
280 , fWaitForReadyConnectionThread_{Thread::CleanupPtr::eAbortBeforeWaiting,
281 Thread::New ([
this] () { WaitForReadyConnectionLoop_ (); },
"WebServer-ConnectionMgr-Wait4IOReady"_k)}
298 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
300 DbgTrace (
"Constructing WebServer::ConnectionManager ({}), with threadpoolSize={}, backlog={}, and listening on {}"_f,
301 static_cast<const void*
> (
this), fActiveConnectionThreads_.
GetPoolSize (), ComputeConnectionBacklog_ (
options),
303 fWaitForReadyConnectionThread_.
Start ();
306#if qStroika_Foundation_Debug_DefaultTracingOn
307ConnectionManager::~ConnectionManager ()
309 DbgTrace (
"Starting destructor for WebServer::ConnectionManager ({})"_f,
static_cast<const void*
> (
this));
313void ConnectionManager::DeriveConnectionDefaultOptionsFromEffectiveOptions_ ()
315#if qStroika_Foundation_Debug_DefaultTracingOn
316 auto prev = fUseDefaultConnectionOptions_.fInterceptorChain;
318 fUseDefaultConnectionOptions_ = Connection::Options{
319 .fInterceptorChain = mkInterceptorChain_ (fRouter_, fEarlyInterceptors_, fBeforeInterceptors_, fAfterInterceptors_),
327#if qStroika_Foundation_Debug_DefaultTracingOn
328 if (prev != fUseDefaultConnectionOptions_.fInterceptorChain) {
329 DbgTrace (
"Updated InterceptorChain: {}"_f, fUseDefaultConnectionOptions_.fInterceptorChain);
336#if USE_NOISY_TRACE_IN_THIS_MODULE_
342 shared_ptr<Connection> conn = MakeSharedPtr<Connection> (s, fUseDefaultConnectionOptions_);
343 fInactiveSockSetPoller_.Add (conn, kInactiveSocketMonitorEvents2Watch4_);
344#if USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_
346 scoped_lock critSec{fActiveConnections_};
347 DbgTrace (
"In onConnect_ (after adding connection {}): fActiveConnections_={}, inactiveOpenConnections_={}"_f, conn,
348 fActiveConnections_.load (), GetInactiveConnections_ ());
353void ConnectionManager::WaitForReadyConnectionLoop_ ()
355#if USE_NOISY_TRACE_IN_THIS_MODULE_
364#if USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_
368 scoped_lock critSec{fActiveConnections_};
369 DbgTrace (
"At top of WaitForReadyConnectionLoop_: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
370 fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
373 for (shared_ptr<Connection> readyConnection : fInactiveSockSetPoller_.WaitQuietly ()) {
375 auto handleActivatedConnection = [
this, readyConnection] ()
mutable {
380#if USE_NOISY_TRACE_IN_THIS_MODULE_
382 "ConnectionManager::...handleActivatedConnection",
"readyConnection={}"_f, readyConnection)};
388 bool keepAlive = (readyConnection->ReadAndProcessMessage () == Connection::eTryAgainLater);
394 scoped_lock critSec{fActiveConnections_};
395 fActiveConnections_.rwget ().rwref ().Remove (readyConnection);
397 fInactiveSockSetPoller_.Add (readyConnection, kInactiveSocketMonitorEvents2Watch4_);
399#if USE_NOISY_TRACE_IN_THIS_MODULE_
401 DbgTrace (
"Closing connection {}"_f, readyConnection);
415 Assert (readyConnection->response ().responseCompleted ());
416 if (not readyConnection->response ().responseCompleted ()) {
417 IgnoreExceptionsForCall (readyConnection->rwResponse ().End ());
422#if USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_
424 scoped_lock critSec{fActiveConnections_};
425 DbgTrace (
"at end of read&process task (keepAlive={}) for connection {}: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
426 keepAlive, readyConnection, fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
432 scoped_lock critSec{fActiveConnections_};
433 fInactiveSockSetPoller_.Remove (readyConnection);
434 fActiveConnections_.rwget ().rwref ().Add (readyConnection);
440 fActiveConnectionThreads_.
AddTask (handleActivatedConnection);
443 catch (
const Thread::AbortException&) {
447 DbgTrace (
"Internal exception in WaitForReadyConnectionLoop_ loop suppressed: {}"_f, current_exception ());
457void ConnectionManager::ReplaceInEarlyInterceptor_ (
const optional<Interceptor>& oldValue,
const optional<Interceptor>& newValue)
460 auto rwLock = this->fEarlyInterceptors_.rwget ();
462 bool addedDefault =
false;
466 newInterceptors += *newValue;
471 newInterceptors += i;
474 if (newValue and not addedDefault) {
475 newInterceptors += *newValue;
477 rwLock.store (newInterceptors);
478 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
481void ConnectionManager::AbortConnection (
const shared_ptr<Connection>& )
486auto ConnectionManager::ComputeStatistics_ () const -> Statistics
488#if USE_NOISY_TRACE_IN_THIS_MODULE_
489 constexpr bool kExtraDebugging_ =
true;
491 constexpr bool kExtraDebugging_ =
false;
493 ConnectionStatsCollection conns;
495 scoped_lock critSec{fActiveConnections_};
496 Assert (
Set<shared_ptr<Connection>>{fActiveConnections_.load ()}.Intersection (GetInactiveConnections_ ()).empty ());
497 for (
auto i : fActiveConnections_.load ()) {
498 auto s = i->stats ();
501 if constexpr (kExtraDebugging_) {
505 for (
auto i : GetInactiveConnections_ ()) {
506 auto s = i->stats ();
509 if constexpr (kExtraDebugging_) {
514 Statistics::ThreadPool threadPoolStats = [&] () {
519 return Statistics::ThreadPool{{}, fActiveConnectionThreads_.
GetPoolSize ()};
522 Statistics::ConnectionStatistics connectionStats;
523 connectionStats.fNumberOfActiveConnections = conns.Count ([] (
const Connection::Stats& s) {
return s.
fActive ==
true; });
524 connectionStats.fNumberOfOpenConnections = conns.size ();
530#if qStroika_Framework_WebServer_Connection_TrackExtraStats
531 connectionStats.fDurationOfOpenConnectionsRequests =
533 if (cs.fMostRecentMessage) {
534 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
538 connectionStats.fDurationOfActiveConnectionsRequests =
540 if (cs.fActive == false)
542 if (cs.fMostRecentMessage) {
543 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
547 connectionStats.fConnectionsPiningForTheFjords =
550 if (cs.fActive ==
false)
552 if (cs.fMostRecentMessage) {
553 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
559 return Statistics{.fThreadPool = threadPoolStats, .fConnections = connectionStats};
564 switch (relativeTo) {
565 case ePrependsToEarly:
566 fEarlyInterceptors_.rwget ()->Prepend (i);
569 fBeforeInterceptors_.rwget ()->Prepend (i);
572 fAfterInterceptors_.rwget ()->Append (i);
574 case eAfterBeforeInterceptors:
575 fBeforeInterceptors_.rwget ()->Append (i);
578 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
581void ConnectionManager::RemoveInterceptor (
const Interceptor& i)
585 auto b4 = fBeforeInterceptors_.rwget ();
586 if (optional<size_t> idx = b4->IndexOf (i)) {
592 auto after = fAfterInterceptors_.rwget ();
593 if (optional<size_t> idx = after->IndexOf (i)) {
594 after->Remove (*idx);
599 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
#define AssertNotImplemented()
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
#define AssertNotReached()
CommonStatistics< T > ComputeCommonStatistics(const ITERATOR_OF_T &start, ITERATOR_OF_T2 &&end)
handy aggregation of several common random-variable statistics/measurements.
const OT & NullCoalesce(const OT &l, const OT &r)
return one of l, or r, with first preference for which is engaged, and second preference for left-to-...
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
nonvirtual String SubString(SZ from) const
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
nonvirtual void Start() const
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual unsigned int GetPoolSize() const
nonvirtual void SetAutomaticTCPDisconnectOnClose(const optional< Time::DurationSeconds > &waitFor) const
nonvirtual void SetLinger(const optional< int > &linger) const
Duration is a chrono::duration<double> (=.
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Common::ReadOnlyProperty< Statistics > statistics
Common::ReadOnlyProperty< ConnectionStatsCollection > connections
Common::ReadOnlyProperty< Traversal::Iterable< SocketAddress > > bindings
Return the socket addresses the webserver (connection manager) is listening on (to serve web content)...
Common::Property< Sequence< Interceptor > > earlyInterceptors
Common::Property< Sequence< Interceptor > > beforeInterceptors
Common::Property< optional< Interceptor > > defaultErrorHandler
Common::ReadOnlyProperty< const Options & > options
Common::Property< Sequence< Interceptor > > afterInterceptors
String ToString(T &&t, ARGS... args)
Return a debug-friendly, display version of the argument: not guaranteed parsable or usable except fo...
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
void CheckForInterruption()
nonvirtual Characters::String ToString() const
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...
optional< String > fThreadPoolName
optional< CORSOptions > fCORS
optional< Headers > fDefaultResponseHeaders
optional< bool > fAutoComputeETagResponse
sets the initial value for each Response. Harmless except for the slight performance cost (wont alway...
optional< Headers > fDefaultGETResponseHeaders
optional< Containers::Set< HTTP::ContentEncoding > > fSupportedCompressionEncodings
override the set of compression encodings the WebServer supports (default is all the Stroika implemen...
optional< unsigned int > fMaxConcurrentlyHandledConnections
optional< Duration > fAutomaticTCPDisconnectOnClose
Duration fConnectionPiningForTheFjordsDelay
optional< unsigned int > fTCPBacklog
optional< size_t > fAutomaticTransferChunkSize
controls whether to use Transfer-Coding: chunked or not
optional< Socket::BindFlags > fBindFlags
optional< bool > fTCPNoDelay
CommonStatistics< Duration > fDurationOfOpenConnections
CommonStatistics< Duration > fDurationOfOpenConnectionsRequests
size_t fNumberOfActiveConnections
CommonStatistics< Duration > fDurationOfActiveConnectionsRequests
size_t fNumberOfOpenConnections
nonvirtual Characters::String ToString() const
size_t fConnectionsPiningForTheFjords
nonvirtual Characters::String ToString() const