4#include "Stroika/Frameworks/StroikaPreComp.h"
10#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/Execution/Throw.h"
15#include "Stroika/Foundation/IO/Network/HTTP/Exception.h"
16#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
17#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
21#include "Stroika/Frameworks/WebServer/DefaultFaultInterceptor.h"
23#include "ConnectionManager.h"
29using namespace Stroika::Foundation::Math;
30using namespace Stroika::Foundation::Memory;
31using namespace Stroika::Foundation::Time;
32using namespace Stroika::Foundation::Traversal;
34using Memory::MakeSharedPtr;
36using namespace Stroika::Frameworks;
49 if (defaultFaultHandler) {
50 interceptors += *defaultFaultHandler;
58 interceptors += earlyInterceptors;
59 interceptors += beforeInterceptors;
60 interceptors += router;
61 interceptors += afterInterceptors;
71String WebServer::ConnectionManager::Statistics::ThreadPool::ToString ()
const
74 sb <<
", thread-entry-cont: "sv << fThreadEntryCount;
107 sb <<
"ThreadPool: "sv << fThreadPool;
108 sb <<
", Connections: "sv << fConnections;
127 constexpr unsigned int kMinThreadCnt_{1u};
128 return Math::AtLeast (kMinThreadCnt_,
options.fMaxConcurrentlyHandledConnections.value_or (
129 options.fMaxConnections.value_or (Options::kDefault_MaxConnections) / 5));
134 constexpr unsigned int kMinDefaultTCPBacklog_{3u};
136 Math::AtLeast (kMinDefaultTCPBacklog_,
options.fMaxConnections.value_or (Options::kDefault_MaxConnections) * 3 / 4));
143 result.fMaxConnections =
NullCoalesce (result.fMaxConnections, Options::kDefault_MaxConnections);
144 result.fMaxConcurrentlyHandledConnections =
NullCoalesce (result.fMaxConcurrentlyHandledConnections, ComputeThreadPoolSize_ (result));
145 result.fBindFlags =
NullCoalesce (result.fBindFlags, Options::kDefault_BindFlags);
146 result.fDefaultResponseHeaders =
NullCoalesce (result.fDefaultResponseHeaders, Options::kDefault_Headers);
147 result.fAutoComputeETagResponse =
NullCoalesce (result.fAutoComputeETagResponse, Options::kDefault_AutoComputeETagResponse);
148 result.fAutomaticTCPDisconnectOnClose =
NullCoalesce (result.fAutomaticTCPDisconnectOnClose, Options::kDefault_AutomaticTCPDisconnectOnClose);
149 result.fLinger =
NullCoalesce (result.fLinger, Options::kDefault_Linger);
150 result.fTCPNoDelay =
NullCoalesce (result.fTCPNoDelay, Options::kDefault_TCPNoDelay);
152 result.fTCPBacklog =
NullCoalesce (result.fTCPBacklog, ComputeConnectionBacklog_ (result));
172 v += HTTP::HeaderName::kOrigin;
174 result.fDefaultResponseHeaders = s;
182 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) ->
Sequence<
Interceptor> {
184 return thisObj->fAfterInterceptors_;
186 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto& afterInterceptors_) {
188 thisObj->fAfterInterceptors_ = afterInterceptors_;
189 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
192 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
Sequence<Interceptor> {
194 return thisObj->fBeforeInterceptors_;
196 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
beforeInterceptors) {
199 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
203 return thisObj->fBindings_;
205 ,
connections{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> ConnectionStatsCollection {
207 scoped_lock critSec{thisObj->fActiveConnections_};
208 Ensure (
Set<shared_ptr<Connection>>{thisObj->fActiveConnections_.load ()}.Intersection (thisObj->GetInactiveConnections_ ()).empty ());
209 ConnectionStatsCollection r;
210 for (
auto i : thisObj->fActiveConnections_.load ()) {
211 auto s = i->stats ();
215 for (
auto i : thisObj->GetInactiveConnections_ ()) {
216 auto s = i->stats ();
223 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> optional<Interceptor> {
225 return thisObj->fDefaultErrorHandler_;
227 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
defaultErrorHandler) {
230 thisObj->ReplaceInEarlyInterceptor_ (thisObj->fDefaultErrorHandler_.load (),
defaultErrorHandler);
235 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
Sequence<Interceptor> {
237 return thisObj->fEarlyInterceptors_;
239 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
earlyInterceptors) {
242 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
244 ,
options{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Options& {
246 return thisObj->fEffectiveOptions_;
248 ,
statistics{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Statistics {
252 return thisObj->ComputeStatistics_ ();
254 , fEffectiveOptions_{FillInDefaults_ (
options)}
255 , fBindings_{bindAddresses}
256 , fDefaultErrorHandler_{DefaultFaultInterceptor{}}
257 , fEarlyInterceptors_{mkEarlyInterceptors_ (fDefaultErrorHandler_)}
258 , fBeforeInterceptors_{}
259 , fAfterInterceptors_{}
260 , fRouter_{routes, *fEffectiveOptions_.
fCORS}
268 , fWaitForReadyConnectionThread_{Thread::CleanupPtr::eAbortBeforeWaiting,
269 Thread::New ([
this] () { WaitForReadyConnectionLoop_ (); },
"WebServer-ConnectionMgr-Wait4IOReady"_k)}
286 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
288 DbgTrace (
"Constructing WebServer::ConnectionManager ({}), with threadpoolSize={}, backlog={}, and listening on {}"_f,
289 static_cast<const void*
> (
this), fActiveConnectionThreads_.
GetPoolSize (), ComputeConnectionBacklog_ (
options),
291 fWaitForReadyConnectionThread_.
Start ();
294#if qStroika_Foundation_Debug_DefaultTracingOn
295ConnectionManager::~ConnectionManager ()
297 DbgTrace (
"Starting destructor for WebServer::ConnectionManager ({})"_f,
static_cast<const void*
> (
this));
301void ConnectionManager::DeriveConnectionDefaultOptionsFromEffectiveOptions_ ()
303#if qStroika_Foundation_Debug_DefaultTracingOn
304 auto prev = fUseDefaultConnectionOptions_.fInterceptorChain;
306 fUseDefaultConnectionOptions_ = Connection::Options{
307 .fInterceptorChain = mkInterceptorChain_ (fRouter_, fEarlyInterceptors_, fBeforeInterceptors_, fAfterInterceptors_),
315#if qStroika_Foundation_Debug_DefaultTracingOn
316 if (prev != fUseDefaultConnectionOptions_.fInterceptorChain) {
317 DbgTrace (
"Updated InterceptorChain: {}"_f, fUseDefaultConnectionOptions_.fInterceptorChain);
324#if USE_NOISY_TRACE_IN_THIS_MODULE_
330 shared_ptr<Connection> conn = MakeSharedPtr<Connection> (s, fUseDefaultConnectionOptions_);
331 fInactiveSockSetPoller_.Add (conn);
332#if USE_NOISY_TRACE_IN_THIS_MODULE_
334 scoped_lock critSec{fActiveConnections_};
335 DbgTrace (
"In onConnect_ (after adding connection {}): fActiveConnections_={}, inactiveOpenConnections_={}"_f, conn,
336 fActiveConnections_.load (), GetInactiveConnections_ ());
341void ConnectionManager::WaitForReadyConnectionLoop_ ()
343#if USE_NOISY_TRACE_IN_THIS_MODULE_
352#if USE_NOISY_TRACE_IN_THIS_MODULE_
354 scoped_lock critSec{fActiveConnections_};
355 DbgTrace (L
"At top of WaitForReadyConnectionLoop_: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
356 fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
359 for (shared_ptr<Connection> readyConnection : fInactiveSockSetPoller_.WaitQuietly ()) {
361 auto handleActivatedConnection = [
this, readyConnection] ()
mutable {
362#if USE_NOISY_TRACE_IN_THIS_MODULE_
370 bool keepAlive = (readyConnection->ReadAndProcessMessage () == Connection::eTryAgainLater);
376 scoped_lock critSec{fActiveConnections_};
377 fActiveConnections_.rwget ().rwref ().Remove (readyConnection);
379 fInactiveSockSetPoller_.Add (readyConnection);
392 Assert (readyConnection->response ().responseCompleted ());
393 if (not readyConnection->response ().responseCompleted ()) {
394 IgnoreExceptionsForCall (readyConnection->rwResponse ().End ());
399#if USE_NOISY_TRACE_IN_THIS_MODULE_
401 scoped_lock critSec{fActiveConnections_};
402 DbgTrace (
"at end of read&process task (keepAlive={}) for connection {}: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
403 keepAlive, readyConnection, fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
409 scoped_lock critSec{fActiveConnections_};
410 fInactiveSockSetPoller_.Remove (readyConnection);
411 fActiveConnections_.rwget ().rwref ().Add (readyConnection);
417 fActiveConnectionThreads_.
AddTask (handleActivatedConnection);
420 catch (
const Thread::AbortException&) {
424 DbgTrace (
"Internal exception in WaitForReadyConnectionLoop_ loop suppressed: {}"_f, current_exception ());
434void ConnectionManager::ReplaceInEarlyInterceptor_ (
const optional<Interceptor>& oldValue,
const optional<Interceptor>& newValue)
437 auto rwLock = this->fEarlyInterceptors_.rwget ();
439 bool addedDefault =
false;
443 newInterceptors += *newValue;
448 newInterceptors += i;
451 if (newValue and not addedDefault) {
452 newInterceptors += *newValue;
454 rwLock.store (newInterceptors);
455 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
458void ConnectionManager::AbortConnection (
const shared_ptr<Connection>& )
463auto ConnectionManager::ComputeStatistics_ () const -> Statistics
465#if USE_NOISY_TRACE_IN_THIS_MODULE_
466 constexpr bool kExtraDebugging_ =
true;
468 constexpr bool kExtraDebugging_ =
false;
470 ConnectionStatsCollection conns;
472 scoped_lock critSec{fActiveConnections_};
473 Assert (
Set<shared_ptr<Connection>>{fActiveConnections_.load ()}.Intersection (GetInactiveConnections_ ()).empty ());
474 for (
auto i : fActiveConnections_.load ()) {
475 auto s = i->stats ();
478 if constexpr (kExtraDebugging_) {
482 for (
auto i : GetInactiveConnections_ ()) {
483 auto s = i->stats ();
486 if constexpr (kExtraDebugging_) {
491 Statistics::ThreadPool threadPoolStats = [&] () {
496 return Statistics::ThreadPool{{}, fActiveConnectionThreads_.
GetPoolSize ()};
499 Statistics::ConnectionStatistics connectionStats;
500 connectionStats.fNumberOfActiveConnections = conns.Count ([] (
const Connection::Stats& s) {
return s.
fActive ==
true; });
501 connectionStats.fNumberOfOpenConnections = conns.size ();
507#if qStroika_Framework_WebServer_Connection_TrackExtraStats
508 connectionStats.fDurationOfOpenConnectionsRequests =
510 if (cs.fMostRecentMessage) {
511 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
515 connectionStats.fDurationOfActiveConnectionsRequests =
517 if (cs.fActive == false)
519 if (cs.fMostRecentMessage) {
520 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
524 connectionStats.fConnectionsPiningForTheFjords =
527 if (cs.fActive ==
false)
529 if (cs.fMostRecentMessage) {
530 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
536 return Statistics{.fThreadPool = threadPoolStats, .fConnections = connectionStats};
541 switch (relativeTo) {
542 case ePrependsToEarly:
543 fEarlyInterceptors_.rwget ()->Prepend (i);
546 fBeforeInterceptors_.rwget ()->Prepend (i);
549 fAfterInterceptors_.rwget ()->Append (i);
551 case eAfterBeforeInterceptors:
552 fBeforeInterceptors_.rwget ()->Append (i);
555 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
558void ConnectionManager::RemoveInterceptor (
const Interceptor& i)
562 auto b4 = fBeforeInterceptors_.rwget ();
563 if (optional<size_t> idx = b4->IndexOf (i)) {
569 auto after = fAfterInterceptors_.rwget ();
570 if (optional<size_t> idx = after->IndexOf (i)) {
571 after->Remove (*idx);
576 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
#define AssertNotImplemented()
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
#define AssertNotReached()
CommonStatistics< T > ComputeCommonStatistics(const ITERATOR_OF_T &start, ITERATOR_OF_T2 &&end)
handy aggregation of several common random-variable statistics/measurements.
const OT & NullCoalesce(const OT &l, const OT &r)
return one of l, or r, with first preference for which is engaged, and second preference for left-to-...
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
nonvirtual String SubString(SZ from) const
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
nonvirtual void Start() const
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual unsigned int GetPoolSize() const
nonvirtual void SetAutomaticTCPDisconnectOnClose(const optional< Time::DurationSeconds > &waitFor) const
nonvirtual void SetLinger(const optional< int > &linger) const
Duration is a chrono::duration<double> (=.
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Common::ReadOnlyProperty< Statistics > statistics
Common::ReadOnlyProperty< ConnectionStatsCollection > connections
Common::ReadOnlyProperty< Traversal::Iterable< SocketAddress > > bindings
Return the socket addresses the webserver (connection manager) is listening on (to serve web content)...
Common::Property< Sequence< Interceptor > > earlyInterceptors
Common::Property< Sequence< Interceptor > > beforeInterceptors
Common::Property< optional< Interceptor > > defaultErrorHandler
Common::ReadOnlyProperty< const Options & > options
Common::Property< Sequence< Interceptor > > afterInterceptors
String ToString(T &&t, ARGS... args)
Return a debug-friendly, display version of the argument: not guaranteed parsable or usable except fo...
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
void CheckForInterruption()
nonvirtual Characters::String ToString() const
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...
optional< String > fThreadPoolName
optional< CORSOptions > fCORS
optional< Headers > fDefaultResponseHeaders
optional< bool > fAutoComputeETagResponse
sets the initial value for each Response. Harmless except for the slight performance cost (wont alway...
optional< Headers > fDefaultGETResponseHeaders
optional< Containers::Set< HTTP::ContentEncoding > > fSupportedCompressionEncodings
override the set of compression encodings the WebServer supports (default is all the Stroika implemen...
optional< unsigned int > fMaxConcurrentlyHandledConnections
optional< Duration > fAutomaticTCPDisconnectOnClose
Duration fConnectionPiningForTheFjordsDelay
optional< unsigned int > fTCPBacklog
optional< size_t > fAutomaticTransferChunkSize
controls whether to use Transfer-Coding: chunked or not
optional< Socket::BindFlags > fBindFlags
optional< bool > fTCPNoDelay
CommonStatistics< Duration > fDurationOfOpenConnections
CommonStatistics< Duration > fDurationOfOpenConnectionsRequests
size_t fNumberOfActiveConnections
CommonStatistics< Duration > fDurationOfActiveConnectionsRequests
size_t fNumberOfOpenConnections
nonvirtual Characters::String ToString() const
size_t fConnectionsPiningForTheFjords
nonvirtual Characters::String ToString() const