4#include "Stroika/Frameworks/StroikaPreComp.h"
10#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/Execution/Throw.h"
15#include "Stroika/Foundation/IO/Network/HTTP/Exception.h"
16#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
17#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
20#include "Stroika/Frameworks/WebServer/DefaultFaultInterceptor.h"
22#include "ConnectionManager.h"
28using namespace Stroika::Foundation::Math;
29using namespace Stroika::Foundation::Memory;
30using namespace Stroika::Foundation::Time;
31using namespace Stroika::Foundation::Traversal;
33using namespace Stroika::Frameworks;
46 if (defaultFaultHandler) {
47 interceptors += *defaultFaultHandler;
55 interceptors += earlyInterceptors;
56 interceptors += beforeInterceptors;
57 interceptors += router;
58 interceptors += afterInterceptors;
68String WebServer::ConnectionManager::Statistics::ThreadPool::ToString ()
const
71 sb <<
", thread-entry-cont: "sv << fThreadEntryCount;
104 sb <<
"ThreadPool: "sv << fThreadPool;
105 sb <<
", Connections: "sv << fConnections;
124 constexpr unsigned int kMinThreadCnt_{1u};
125 return Math::AtLeast (kMinThreadCnt_,
options.fMaxConcurrentlyHandledConnections.value_or (
126 options.fMaxConnections.value_or (Options::kDefault_MaxConnections) / 10));
131 constexpr unsigned int kMinDefaultTCPBacklog_{3u};
133 Math::AtLeast (kMinDefaultTCPBacklog_,
options.fMaxConnections.value_or (Options::kDefault_MaxConnections) * 3 / 4));
140 result.fMaxConnections =
NullCoalesce (result.fMaxConnections, Options::kDefault_MaxConnections);
141 result.fMaxConcurrentlyHandledConnections =
NullCoalesce (result.fMaxConcurrentlyHandledConnections, ComputeThreadPoolSize_ (result));
142 result.fBindFlags =
NullCoalesce (result.fBindFlags, Options::kDefault_BindFlags);
143 result.fDefaultResponseHeaders =
NullCoalesce (result.fDefaultResponseHeaders, Options::kDefault_Headers);
144 result.fAutoComputeETagResponse =
NullCoalesce (result.fAutoComputeETagResponse, Options::kDefault_AutoComputeETagResponse);
145 result.fAutomaticTCPDisconnectOnClose =
NullCoalesce (result.fAutomaticTCPDisconnectOnClose, Options::kDefault_AutomaticTCPDisconnectOnClose);
146 result.fLinger =
NullCoalesce (result.fLinger, Options::kDefault_Linger);
147 result.fTCPNoDelay =
NullCoalesce (result.fTCPNoDelay, Options::kDefault_TCPNoDelay);
149 result.fTCPBacklog =
NullCoalesce (result.fTCPBacklog, ComputeConnectionBacklog_ (result));
169 v += HTTP::HeaderName::kOrigin;
171 result.fDefaultResponseHeaders = s;
179 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) ->
Sequence<
Interceptor> {
181 return thisObj->fAfterInterceptors_;
183 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto& afterInterceptors_) {
185 thisObj->fAfterInterceptors_ = afterInterceptors_;
186 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
189 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
Sequence<Interceptor> {
191 return thisObj->fBeforeInterceptors_;
193 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
beforeInterceptors) {
196 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
200 return thisObj->fBindings_;
202 ,
connections{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> ConnectionStatsCollection {
204 scoped_lock critSec{thisObj->fActiveConnections_};
205 Ensure (
Set<shared_ptr<Connection>>{thisObj->fActiveConnections_.load ()}.Intersection (thisObj->GetInactiveConnections_ ()).empty ());
206 ConnectionStatsCollection r;
207 for (
auto i : thisObj->fActiveConnections_.load ()) {
208 auto s = i->stats ();
212 for (
auto i : thisObj->GetInactiveConnections_ ()) {
213 auto s = i->stats ();
220 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> optional<Interceptor> {
222 return thisObj->fDefaultErrorHandler_;
224 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
defaultErrorHandler) {
227 thisObj->ReplaceInEarlyInterceptor_ (thisObj->fDefaultErrorHandler_.load (),
defaultErrorHandler);
232 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
Sequence<Interceptor> {
234 return thisObj->fEarlyInterceptors_;
236 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
earlyInterceptors) {
239 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
241 ,
options{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Options& {
243 return thisObj->fEffectiveOptions_;
245 ,
statistics{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Statistics {
249 return thisObj->ComputeStatistics_ ();
251 , fEffectiveOptions_{FillInDefaults_ (
options)}
252 , fBindings_{bindAddresses}
253 , fDefaultErrorHandler_{DefaultFaultInterceptor{}}
254 , fEarlyInterceptors_{mkEarlyInterceptors_ (fDefaultErrorHandler_)}
255 , fBeforeInterceptors_{}
256 , fAfterInterceptors_{}
257 , fRouter_{routes, *fEffectiveOptions_.
fCORS}
265 , fWaitForReadyConnectionThread_{Thread::CleanupPtr::eAbortBeforeWaiting,
266 Thread::New ([
this] () { WaitForReadyConnectionLoop_ (); },
"WebServer-ConnectionMgr-Wait4IOReady"_k)}
283 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
285 DbgTrace (
"Constructing WebServer::ConnectionManager ({}), with threadpoolSize={}, backlog={}, and listening on {}"_f,
286 static_cast<const void*
> (
this), fActiveConnectionThreads_.
GetPoolSize (), ComputeConnectionBacklog_ (
options),
288 fWaitForReadyConnectionThread_.
Start ();
291#if qStroika_Foundation_Debug_DefaultTracingOn
292ConnectionManager::~ConnectionManager ()
294 DbgTrace (
"Starting destructor for WebServer::ConnectionManager ({})"_f,
static_cast<const void*
> (
this));
298void ConnectionManager::DeriveConnectionDefaultOptionsFromEffectiveOptions_ ()
300#if qStroika_Foundation_Debug_DefaultTracingOn
301 auto prev = fUseDefaultConnectionOptions_.fInterceptorChain;
303 fUseDefaultConnectionOptions_ = Connection::Options{
304 .fInterceptorChain = mkInterceptorChain_ (fRouter_, fEarlyInterceptors_, fBeforeInterceptors_, fAfterInterceptors_),
312#if qStroika_Foundation_Debug_DefaultTracingOn
313 if (prev != fUseDefaultConnectionOptions_.fInterceptorChain) {
314 DbgTrace (
"Updated InterceptorChain: {}"_f, fUseDefaultConnectionOptions_.fInterceptorChain);
321#if USE_NOISY_TRACE_IN_THIS_MODULE_
327 shared_ptr<Connection> conn = make_shared<Connection> (s, fUseDefaultConnectionOptions_);
328 fInactiveSockSetPoller_.Add (conn);
329#if USE_NOISY_TRACE_IN_THIS_MODULE_
331 scoped_lock critSec{fActiveConnections_};
332 DbgTrace (
"In onConnect_ (after adding connection {}): fActiveConnections_={}, inactiveOpenConnections_={}"_f, conn,
333 fActiveConnections_.load (), GetInactiveConnections_ ());
338void ConnectionManager::WaitForReadyConnectionLoop_ ()
340#if USE_NOISY_TRACE_IN_THIS_MODULE_
349#if USE_NOISY_TRACE_IN_THIS_MODULE_
351 scoped_lock critSec{fActiveConnections_};
352 DbgTrace (L
"At top of WaitForReadyConnectionLoop_: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
353 fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
356 for (shared_ptr<Connection> readyConnection : fInactiveSockSetPoller_.WaitQuietly ()) {
358 auto handleActivatedConnection = [
this, readyConnection] ()
mutable {
359#if USE_NOISY_TRACE_IN_THIS_MODULE_
367 bool keepAlive = (readyConnection->ReadAndProcessMessage () == Connection::eTryAgainLater);
373 scoped_lock critSec{fActiveConnections_};
374 fActiveConnections_.rwget ().rwref ().Remove (readyConnection);
376 fInactiveSockSetPoller_.Add (readyConnection);
389 Assert (readyConnection->response ().responseCompleted ());
390 if (not readyConnection->response ().responseCompleted ()) {
391 IgnoreExceptionsForCall (readyConnection->rwResponse ().End ());
396#if USE_NOISY_TRACE_IN_THIS_MODULE_
398 scoped_lock critSec{fActiveConnections_};
399 DbgTrace (
"at end of read&process task (keepAlive={}) for connection {}: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
400 keepAlive, readyConnection, fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
406 scoped_lock critSec{fActiveConnections_};
407 fInactiveSockSetPoller_.Remove (readyConnection);
408 fActiveConnections_.rwget ().rwref ().Add (readyConnection);
414 fActiveConnectionThreads_.
AddTask (handleActivatedConnection);
417 catch (
const Thread::AbortException&) {
421 DbgTrace (
"Internal exception in WaitForReadyConnectionLoop_ loop suppressed: {}"_f, current_exception ());
431void ConnectionManager::ReplaceInEarlyInterceptor_ (
const optional<Interceptor>& oldValue,
const optional<Interceptor>& newValue)
434 auto rwLock = this->fEarlyInterceptors_.rwget ();
436 bool addedDefault =
false;
440 newInterceptors += *newValue;
445 newInterceptors += i;
448 if (newValue and not addedDefault) {
449 newInterceptors += *newValue;
451 rwLock.store (newInterceptors);
452 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
455void ConnectionManager::AbortConnection (
const shared_ptr<Connection>& )
460auto ConnectionManager::ComputeStatistics_ () const -> Statistics
462#if USE_NOISY_TRACE_IN_THIS_MODULE_
463 constexpr bool kExtraDebugging_ =
true;
465 constexpr bool kExtraDebugging_ =
false;
467 ConnectionStatsCollection conns;
469 scoped_lock critSec{fActiveConnections_};
470 Assert (
Set<shared_ptr<Connection>>{fActiveConnections_.load ()}.Intersection (GetInactiveConnections_ ()).empty ());
471 for (
auto i : fActiveConnections_.load ()) {
472 auto s = i->stats ();
475 if constexpr (kExtraDebugging_) {
479 for (
auto i : GetInactiveConnections_ ()) {
480 auto s = i->stats ();
483 if constexpr (kExtraDebugging_) {
488 Statistics::ThreadPool threadPoolStats = [&] () {
493 return Statistics::ThreadPool{{}, fActiveConnectionThreads_.
GetPoolSize ()};
496 Statistics::ConnectionStatistics connectionStats;
497 connectionStats.fNumberOfActiveConnections = conns.Count ([] (
const Connection::Stats& s) {
return s.
fActive ==
true; });
498 connectionStats.fNumberOfOpenConnections = conns.size ();
504#if qStroika_Framework_WebServer_Connection_TrackExtraStats
505 connectionStats.fDurationOfOpenConnectionsRequests =
507 if (cs.fMostRecentMessage) {
508 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
512 connectionStats.fDurationOfActiveConnectionsRequests =
514 if (cs.fActive == false)
516 if (cs.fMostRecentMessage) {
517 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
521 connectionStats.fConnectionsPiningForTheFjords =
524 if (cs.fActive ==
false)
526 if (cs.fMostRecentMessage) {
527 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
533 return Statistics{.fThreadPool = threadPoolStats, .fConnections = connectionStats};
538 switch (relativeTo) {
539 case ePrependsToEarly:
540 fEarlyInterceptors_.rwget ()->Prepend (i);
543 fBeforeInterceptors_.rwget ()->Prepend (i);
546 fAfterInterceptors_.rwget ()->Append (i);
548 case eAfterBeforeInterceptors:
549 fBeforeInterceptors_.rwget ()->Append (i);
552 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
555void ConnectionManager::RemoveInterceptor (
const Interceptor& i)
559 auto b4 = fBeforeInterceptors_.rwget ();
560 if (optional<size_t> idx = b4->IndexOf (i)) {
566 auto after = fAfterInterceptors_.rwget ();
567 if (optional<size_t> idx = after->IndexOf (i)) {
568 after->Remove (*idx);
573 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
#define AssertNotImplemented()
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
#define AssertNotReached()
CommonStatistics< T > ComputeCommonStatistics(const ITERATOR_OF_T &start, ITERATOR_OF_T2 &&end)
handy aggregation of several common random-variable statistics/measurements.
const OT & NullCoalesce(const OT &l, const OT &r)
return one of l, or r, with first preference for which is engaged, and second preference for left-to-...
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
nonvirtual String SubString(SZ from) const
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
nonvirtual void Start() const
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual unsigned int GetPoolSize() const
nonvirtual void SetAutomaticTCPDisconnectOnClose(const optional< Time::DurationSeconds > &waitFor) const
nonvirtual void SetLinger(const optional< int > &linger) const
Duration is a chrono::duration<double> (=.
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Common::ReadOnlyProperty< Statistics > statistics
Common::ReadOnlyProperty< ConnectionStatsCollection > connections
Common::ReadOnlyProperty< Traversal::Iterable< SocketAddress > > bindings
Return the socket addresses the webserver (connection manager) is listening on (to serve web content)...
Common::Property< Sequence< Interceptor > > earlyInterceptors
Common::Property< Sequence< Interceptor > > beforeInterceptors
Common::Property< optional< Interceptor > > defaultErrorHandler
Common::ReadOnlyProperty< const Options & > options
Common::Property< Sequence< Interceptor > > afterInterceptors
String ToString(T &&t, ARGS... args)
Return a debug-friendly, display version of the argument: not guaranteed parsable or usable except fo...
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
void CheckForInterruption()
nonvirtual Characters::String ToString() const
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...
optional< String > fThreadPoolName
optional< CORSOptions > fCORS
optional< Headers > fDefaultResponseHeaders
optional< bool > fAutoComputeETagResponse
sets the initial value for each Response. Harmless except for the slight performance cost (wont alway...
optional< Headers > fDefaultGETResponseHeaders
optional< Containers::Set< HTTP::ContentEncoding > > fSupportedCompressionEncodings
override the set of compression encodings the WebServer supports (default is all the Stroika implemen...
optional< unsigned int > fMaxConcurrentlyHandledConnections
optional< Duration > fAutomaticTCPDisconnectOnClose
Duration fConnectionPiningForTheFjordsDelay
optional< unsigned int > fTCPBacklog
optional< size_t > fAutomaticTransferChunkSize
controls whether to use Transfer-Coding: chunked or not
optional< Socket::BindFlags > fBindFlags
optional< bool > fTCPNoDelay
CommonStatistics< Duration > fDurationOfOpenConnections
CommonStatistics< Duration > fDurationOfOpenConnectionsRequests
size_t fNumberOfActiveConnections
CommonStatistics< Duration > fDurationOfActiveConnectionsRequests
size_t fNumberOfOpenConnections
nonvirtual Characters::String ToString() const
size_t fConnectionsPiningForTheFjords
nonvirtual Characters::String ToString() const