Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
ConnectionManager.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#include "Stroika/Frameworks/StroikaPreComp.h"
5
6#include <algorithm>
7#include <cstdlib>
8
10#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/Execution/Throw.h"
15#include "Stroika/Foundation/IO/Network/HTTP/Exception.h"
16#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
17#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
19
20#include "Stroika/Frameworks/WebServer/DefaultFaultInterceptor.h"
21
22#include "ConnectionManager.h"
23
24using namespace Stroika::Foundation;
27using namespace Stroika::Foundation::Execution;
28using namespace Stroika::Foundation::Math;
29using namespace Stroika::Foundation::Memory;
30using namespace Stroika::Foundation::Time;
31using namespace Stroika::Foundation::Traversal;
32
33using namespace Stroika::Frameworks;
35using namespace Stroika::Frameworks::WebServer;
36
37// Comment this in to turn on aggressive noisy DbgTrace in this module
38// #define USE_NOISY_TRACE_IN_THIS_MODULE_ 1
39
41
42namespace {
43 Sequence<Interceptor> mkEarlyInterceptors_ (const optional<Interceptor>& defaultFaultHandler)
44 {
45 Sequence<Interceptor> interceptors;
46 if (defaultFaultHandler) {
47 interceptors += *defaultFaultHandler;
48 }
49 return interceptors;
50 }
51 InterceptorChain mkInterceptorChain_ (const Router& router, const Sequence<Interceptor>& earlyInterceptors,
52 const Sequence<Interceptor>& beforeInterceptors, const Sequence<Interceptor>& afterInterceptors)
53 {
54 Sequence<Interceptor> interceptors;
55 interceptors += earlyInterceptors;
56 interceptors += beforeInterceptors;
57 interceptors += router;
58 interceptors += afterInterceptors;
59 return InterceptorChain{interceptors};
60 }
61}
62
63/*
64 ********************************************************************************
65 ************ WebServer::ConnectionManager::Statistics::ThreadPool **************
66 ********************************************************************************
67 */
68String WebServer::ConnectionManager::Statistics::ThreadPool::ToString () const
69{
71 sb << ", thread-entry-cont: "sv << fThreadEntryCount;
72 sb << "}"sv;
73 return sb;
74}
75
76/*
77 ********************************************************************************
78 ******* WebServer::ConnectionManager::Statistics::ConnectionStatistics *********
79 ********************************************************************************
80 */
82{
84 sb << "{"sv;
85 sb << "n-open-connections: "sv << fNumberOfOpenConnections;
86 sb << ", n-active-connections: "sv << fNumberOfActiveConnections;
87 sb << ", duration-open-connections: "sv << fDurationOfOpenConnections;
88 sb << ", duration-active-requests: "sv << fDurationOfOpenConnectionsRequests;
89 sb << ", duration-open-requests: "sv << fDurationOfActiveConnectionsRequests;
90 sb << ", n-connections-pining-for-the-fjords: "sv << fConnectionsPiningForTheFjords;
91 sb << "}"sv;
92 return sb;
93}
94
95/*
96 ********************************************************************************
97 **************** WebServer::ConnectionManager::Statistics **********************
98 ********************************************************************************
99 */
101{
102 StringBuilder sb;
103 sb << "{"sv;
104 sb << "ThreadPool: "sv << fThreadPool;
105 sb << ", Connections: "sv << fConnections;
106 sb << "}"sv;
107 return sb;
108}
109
110/*
111 ********************************************************************************
112 ************************ WebServer::ConnectionManager **************************
113 ********************************************************************************
114 */
115ConnectionManager::ConnectionManager (const SocketAddress& bindAddress, const Sequence<Route>& routes, const Options& options)
116 : ConnectionManager{Sequence<SocketAddress>{bindAddress}, routes, options}
117{
118}
119
120namespace {
121 inline unsigned int ComputeThreadPoolSize_ (const ConnectionManager::Options& options)
122 {
124 constexpr unsigned int kMinThreadCnt_{1u}; // one enough now that we support separate thread doing epoll/select and one read when data avail
125 return Math::AtLeast (kMinThreadCnt_, options.fMaxConcurrentlyHandledConnections.value_or (
126 options.fMaxConnections.value_or (Options::kDefault_MaxConnections) / 10));
127 }
128 inline unsigned int ComputeConnectionBacklog_ (const ConnectionManager::Options& options)
129 {
131 constexpr unsigned int kMinDefaultTCPBacklog_{3u};
132 return options.fTCPBacklog.value_or (
133 Math::AtLeast (kMinDefaultTCPBacklog_, options.fMaxConnections.value_or (Options::kDefault_MaxConnections) * 3 / 4));
134 }
136 {
138 Options result{o};
139 result.fCORS = NullCoalesce (result.fCORS, Options::kDefault_CORS);
140 result.fMaxConnections = NullCoalesce (result.fMaxConnections, Options::kDefault_MaxConnections);
141 result.fMaxConcurrentlyHandledConnections = NullCoalesce (result.fMaxConcurrentlyHandledConnections, ComputeThreadPoolSize_ (result));
142 result.fBindFlags = NullCoalesce (result.fBindFlags, Options::kDefault_BindFlags);
143 result.fDefaultResponseHeaders = NullCoalesce (result.fDefaultResponseHeaders, Options::kDefault_Headers);
144 result.fAutoComputeETagResponse = NullCoalesce (result.fAutoComputeETagResponse, Options::kDefault_AutoComputeETagResponse);
145 result.fAutomaticTCPDisconnectOnClose = NullCoalesce (result.fAutomaticTCPDisconnectOnClose, Options::kDefault_AutomaticTCPDisconnectOnClose);
146 result.fLinger = NullCoalesce (result.fLinger, Options::kDefault_Linger); // for now this is special and can be null/optional
147 result.fTCPNoDelay = NullCoalesce (result.fTCPNoDelay, Options::kDefault_TCPNoDelay);
148 // result.fThreadPoolName; can remain nullopt
149 result.fTCPBacklog = NullCoalesce (result.fTCPBacklog, ComputeConnectionBacklog_ (result));
150
151 {
152 // Not super clear. BUT - it appears that if you combine CORS with Caching, then the value returned from
153 // the cache maybe WRONG due to not having a (correct) Origin header. Seems most logical that the CORS
154 // logic would not check the origin if its coming from the cache, but ... who knows...
155 // According to https://blog.keul.it/chrome-cors-issue-due-to-cache/ Chrome does, and this fits with the
156 // sporadic bug I'm seeing (hard to tell cuz what seems to happen is the web server not called, but Chrome debugger reports
157 // CORS error and Chrome debugger not super clear here - not letting me see what it pulled from cache or even
158 // mentioning - so I'm guessing - that it came from the cache.
159 // Anyhow, piecing things together, it appears that IF you are using CORS, you must set the vary header
160 // to vary on origin.
161 //
162 // Note - we emit it as a default for all responses because https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Vary says:
163 // "The same Vary header value should be used on all responses for a given URL,
164 // including 304 Not Modified responses and the "default" response."
165 //
166 // -- LGP 2022-12-09
167 HTTP::Headers s = NullCoalesce (result.fDefaultResponseHeaders);
168 Set<String> v = NullCoalesce (s.vary ());
169 v += HTTP::HeaderName::kOrigin;
170 s.vary = v;
171 result.fDefaultResponseHeaders = s;
172 }
173 return result;
174 }
175}
176
177ConnectionManager::ConnectionManager (const Iterable<SocketAddress>& bindAddresses, const Sequence<Route>& routes, const Options& options)
179 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
180 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::afterInterceptors);
181 return thisObj->fAfterInterceptors_;
182 },
183 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& afterInterceptors_) {
184 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::afterInterceptors);
185 thisObj->fAfterInterceptors_ = afterInterceptors_;
186 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
187 }}
189 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
190 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::beforeInterceptors);
191 return thisObj->fBeforeInterceptors_;
192 },
193 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& beforeInterceptors) {
194 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::beforeInterceptors);
195 thisObj->fBeforeInterceptors_ = beforeInterceptors;
196 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
197 }}
198 , bindings{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Iterable<SocketAddress> {
199 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::bindings);
200 return thisObj->fBindings_;
201 }}
202 , connections{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> ConnectionStatsCollection {
203 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::connections);
204 scoped_lock critSec{thisObj->fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
205 Ensure (Set<shared_ptr<Connection>>{thisObj->fActiveConnections_.load ()}.Intersection (thisObj->GetInactiveConnections_ ()).empty ());
206 ConnectionStatsCollection r;
207 for (auto i : thisObj->fActiveConnections_.load ()) {
208 auto s = i->stats ();
209 s.fActive = true;
210 r += s;
211 }
212 for (auto i : thisObj->GetInactiveConnections_ ()) {
213 auto s = i->stats ();
214 s.fActive = false;
215 r += s;
216 }
217 return r;
218 }}
220 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> optional<Interceptor> {
221 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::defaultErrorHandler);
222 return thisObj->fDefaultErrorHandler_;
223 },
224 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& defaultErrorHandler) {
225 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::defaultErrorHandler);
226 if (thisObj->fDefaultErrorHandler_ != defaultErrorHandler) {
227 thisObj->ReplaceInEarlyInterceptor_ (thisObj->fDefaultErrorHandler_.load (), defaultErrorHandler);
228 thisObj->fDefaultErrorHandler_ = defaultErrorHandler;
229 }
230 }}
232 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
233 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::earlyInterceptors);
234 return thisObj->fEarlyInterceptors_;
235 },
236 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& earlyInterceptors) {
237 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::earlyInterceptors);
238 thisObj->fEarlyInterceptors_ = earlyInterceptors;
239 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
240 }}
241 , options{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> const Options& {
242 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::options);
243 return thisObj->fEffectiveOptions_;
244 }}
245 , statistics{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Statistics {
246 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::statistics);
247 // NOTE - this computation can be expensive, so consider caching so only recomputed at most every 30 seconds or so...???
248 // would need to be controlled via OPTIONS!
249 return thisObj->ComputeStatistics_ ();
250 }}
251 , fEffectiveOptions_{FillInDefaults_ (options)}
252 , fBindings_{bindAddresses}
253 , fDefaultErrorHandler_{DefaultFaultInterceptor{}}
254 , fEarlyInterceptors_{mkEarlyInterceptors_ (fDefaultErrorHandler_)}
255 , fBeforeInterceptors_{}
256 , fAfterInterceptors_{}
257 , fRouter_{routes, *fEffectiveOptions_.fCORS}
258 // note since Stroika v3.0d5, we set fQMax = so we don't get lots of useless requests that fill the Q. Probably shouldn't happen
259 // anyhow, since we have set 'backlog' - but in case, better failure mode ; arguably could be zero length, but for latency of threads waking up to pickup work;
260 // --LGP 2023-11-27
261 , fActiveConnectionThreads_{ThreadPool::Options{.fThreadCount = *fEffectiveOptions_.fMaxConcurrentlyHandledConnections,
262 .fThreadPoolName = fEffectiveOptions_.fThreadPoolName,
263 .fQMax = ThreadPool::QMax{*fEffectiveOptions_.fMaxConcurrentlyHandledConnections},
264 .fCollectStatistics = fEffectiveOptions_.fCollectStatistics}}
265 , fWaitForReadyConnectionThread_{Thread::CleanupPtr::eAbortBeforeWaiting,
266 Thread::New ([this] () { WaitForReadyConnectionLoop_ (); }, "WebServer-ConnectionMgr-Wait4IOReady"_k)}
267 , fListener_{bindAddresses, *fEffectiveOptions_.fBindFlags, [this] (const ConnectionOrientedStreamSocket::Ptr& s) { onConnect_ (s); },
268 *fEffectiveOptions_.fTCPBacklog}
269{
270 // validate fDefaultResponseHeaders contains no bad/inappropriate headers (like Content-Length), probably CORS headers worth a warning as well
271 // just a bunch of sanity checks for things you really DONT want to set here for any reason I can think of
272 if (fEffectiveOptions_.fDefaultResponseHeaders) {
273 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->allow () == nullopt); // unsure
274 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->cookie ().cookieDetails ().empty ());
275 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->connection () == nullopt);
276 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->contentLength () == nullopt);
277 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->ETag () == nullopt);
278 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->location () == nullopt);
279 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->origin () == nullopt); // request only header
280 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->ifNoneMatch () == nullopt); // request only header
281 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->setCookie ().cookieDetails ().empty ());
282 }
283 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
284
285 DbgTrace ("Constructing WebServer::ConnectionManager ({}), with threadpoolSize={}, backlog={}, and listening on {}"_f,
286 static_cast<const void*> (this), fActiveConnectionThreads_.GetPoolSize (), ComputeConnectionBacklog_ (options),
287 Characters::ToString (bindAddresses));
288 fWaitForReadyConnectionThread_.Start (); // start here instead of AutoStart so a guaranteed initialized before thread main starts - see http://stroika-bugs.sophists.com/browse/STK-706
289}
290
291#if qStroika_Foundation_Debug_DefaultTracingOn
292ConnectionManager::~ConnectionManager ()
293{
294 DbgTrace ("Starting destructor for WebServer::ConnectionManager ({})"_f, static_cast<const void*> (this));
295}
296#endif
297
298void ConnectionManager::DeriveConnectionDefaultOptionsFromEffectiveOptions_ ()
299{
300#if qStroika_Foundation_Debug_DefaultTracingOn
301 auto prev = fUseDefaultConnectionOptions_.fInterceptorChain;
302#endif
303 fUseDefaultConnectionOptions_ = Connection::Options{
304 .fInterceptorChain = mkInterceptorChain_ (fRouter_, fEarlyInterceptors_, fBeforeInterceptors_, fAfterInterceptors_),
305 .fDefaultResponseHeaders = *fEffectiveOptions_.fDefaultResponseHeaders,
306 .fDefaultGETResponseHeaders = fEffectiveOptions_.fDefaultGETResponseHeaders,
307 .fAutoComputeETagResponse = *fEffectiveOptions_.fAutoComputeETagResponse,
308 .fAutomaticTransferChunkSize = fEffectiveOptions_.fAutomaticTransferChunkSize,
309 .fSupportedCompressionEncodings = fEffectiveOptions_.fSupportedCompressionEncodings,
310 };
311 // @todo could add trace messages on other values changing...
312#if qStroika_Foundation_Debug_DefaultTracingOn
313 if (prev != fUseDefaultConnectionOptions_.fInterceptorChain) {
314 DbgTrace ("Updated InterceptorChain: {}"_f, fUseDefaultConnectionOptions_.fInterceptorChain);
315 }
316#endif
317}
318
319void ConnectionManager::onConnect_ (const ConnectionOrientedStreamSocket::Ptr& s)
320{
321#if USE_NOISY_TRACE_IN_THIS_MODULE_
322 Debug::TraceContextBumper ctx{"ConnectionManager::onConnect_", "s={}"_f, s};
323#endif
324 s.SetTCPNoDelay (*fEffectiveOptions_.fTCPNoDelay);
326 s.SetLinger (fEffectiveOptions_.fLinger); // 'missing' has meaning (feature disabled) for socket, so allow setting that too - doesn't mean don't pass on/use-default
327 shared_ptr<Connection> conn = make_shared<Connection> (s, fUseDefaultConnectionOptions_);
328 fInactiveSockSetPoller_.Add (conn);
329#if USE_NOISY_TRACE_IN_THIS_MODULE_
330 {
331 scoped_lock critSec{fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
332 DbgTrace ("In onConnect_ (after adding connection {}): fActiveConnections_={}, inactiveOpenConnections_={}"_f, conn,
333 fActiveConnections_.load (), GetInactiveConnections_ ());
334 }
335#endif
336}
337
338void ConnectionManager::WaitForReadyConnectionLoop_ ()
339{
340#if USE_NOISY_TRACE_IN_THIS_MODULE_
341 Debug::TraceContextBumper ctx{"ConnectionManager::WaitForReadyConnectionLoop_"};
342#endif
343
344 // run til thread aborted
345 while (true) {
346 try {
348
349#if USE_NOISY_TRACE_IN_THIS_MODULE_
350 {
351 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
352 DbgTrace (L"At top of WaitForReadyConnectionLoop_: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
353 fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
354 }
355#endif
356 for (shared_ptr<Connection> readyConnection : fInactiveSockSetPoller_.WaitQuietly ()) {
357
358 auto handleActivatedConnection = [this, readyConnection] () mutable {
359#if USE_NOISY_TRACE_IN_THIS_MODULE_
361 Stroika_Foundation_Debug_OptionalizeTraceArgs ("ConnectionManager::...processConnectionLoop")};
362#endif
363
364 /*
365 * Process the request
366 */
367 bool keepAlive = (readyConnection->ReadAndProcessMessage () == Connection::eTryAgainLater);
368
369 /*
370 * Handle the Connection object, moving it to the appropriate list etc...
371 */
372 try {
373 scoped_lock critSec{fActiveConnections_}; // lock not strictly needed here, but used to assure consistency between the active/inactive lists
374 fActiveConnections_.rwget ().rwref ().Remove (readyConnection); // no matter what, remove from active connections
375 if (keepAlive) {
376 fInactiveSockSetPoller_.Add (readyConnection);
377 }
378 }
379 catch (...) {
380 AssertNotReached (); // these two lists need to be kept in sync, so really assume updating them cannot fail/break
381 ReThrow ();
382 }
383
384// @todo lose this code after a bit of testing that never triggered - LGP 2021-03-02
385// I think logic was corner case and backwards (sb if keep alive)
386//anyhow should never happen except for incomplete header in which case we dont want to END
387#if 0
388 if (not keepAlive) {
389 Assert (readyConnection->response ().responseCompleted ()); // don't think this test is needed - I think we always assure this in Connection - LGP 2021-03-02
390 if (not readyConnection->response ().responseCompleted ()) {
391 IgnoreExceptionsForCall (readyConnection->rwResponse ().End ());
392 }
393 }
394#endif
395
396#if USE_NOISY_TRACE_IN_THIS_MODULE_
397 {
398 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
399 DbgTrace ("at end of read&process task (keepAlive={}) for connection {}: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
400 keepAlive, readyConnection, fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
401 }
402#endif
403 };
404
405 try {
406 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
407 fInactiveSockSetPoller_.Remove (readyConnection);
408 fActiveConnections_.rwget ().rwref ().Add (readyConnection);
409 }
410 catch (...) {
411 AssertNotReached (); // these two lists need to be kept in sync, so really assume updating them cannot fail/break
412 ReThrow ();
413 }
414 fActiveConnectionThreads_.AddTask (handleActivatedConnection);
415 }
416 }
417 catch (const Thread::AbortException&) {
418 ReThrow ();
419 }
420 catch (...) {
421 DbgTrace ("Internal exception in WaitForReadyConnectionLoop_ loop suppressed: {}"_f, current_exception ());
422 }
423 }
424}
425
426Collection<shared_ptr<Connection>> ConnectionManager::GetInactiveConnections_ () const
427{
428 return fInactiveSockSetPoller_.GetDescriptors ().Map<Collection<shared_ptr<Connection>>> ([] (const auto& i) { return i.first; });
429}
430
431void ConnectionManager::ReplaceInEarlyInterceptor_ (const optional<Interceptor>& oldValue, const optional<Interceptor>& newValue)
432{
433 // replace old error handler in the interceptor chain, in the same spot if possible, and otherwise append
434 auto rwLock = this->fEarlyInterceptors_.rwget ();
435 Sequence<Interceptor> newInterceptors;
436 bool addedDefault = false;
437 for (const Interceptor& i : rwLock.load ()) {
438 if (oldValue == i) {
439 if (newValue) {
440 newInterceptors += *newValue;
441 }
442 addedDefault = true;
443 }
444 else {
445 newInterceptors += i;
446 }
447 }
448 if (newValue and not addedDefault) {
449 newInterceptors += *newValue;
450 }
451 rwLock.store (newInterceptors);
452 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
453}
454
455void ConnectionManager::AbortConnection (const shared_ptr<Connection>& /*conn*/)
456{
458}
459
460auto ConnectionManager::ComputeStatistics_ () const -> Statistics
461{
462#if USE_NOISY_TRACE_IN_THIS_MODULE_
463 constexpr bool kExtraDebugging_ = true;
464#else
465 constexpr bool kExtraDebugging_ = false;
466#endif
467 ConnectionStatsCollection conns;
468 {
469 scoped_lock critSec{fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
470 Assert (Set<shared_ptr<Connection>>{fActiveConnections_.load ()}.Intersection (GetInactiveConnections_ ()).empty ());
471 for (auto i : fActiveConnections_.load ()) {
472 auto s = i->stats ();
473 s.fActive = true;
474 conns += s;
475 if constexpr (kExtraDebugging_) {
476 DbgTrace ("Conn={}"_f, s);
477 }
478 }
479 for (auto i : GetInactiveConnections_ ()) {
480 auto s = i->stats ();
481 s.fActive = false;
482 conns += s;
483 if constexpr (kExtraDebugging_) {
484 DbgTrace ("Conn={}"_f, s);
485 }
486 }
487 }
488 Statistics::ThreadPool threadPoolStats = [&] () {
489 if (fEffectiveOptions_.fCollectStatistics) {
490 return Statistics::ThreadPool{{fActiveConnectionThreads_.GetCurrentStatistics ()}, fActiveConnectionThreads_.GetPoolSize ()};
491 }
492 else {
493 return Statistics::ThreadPool{{}, fActiveConnectionThreads_.GetPoolSize ()};
494 }
495 }();
496 Statistics::ConnectionStatistics connectionStats;
497 connectionStats.fNumberOfActiveConnections = conns.Count ([] (const Connection::Stats& s) { return s.fActive == true; });
498 connectionStats.fNumberOfOpenConnections = conns.size ();
499
500 TimePointSeconds now = Time::GetTickCount ();
501 connectionStats.fDurationOfOpenConnections = ComputeCommonStatistics (
502 conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> { return now - cs.fCreatedAt; }));
503
504#if qStroika_Framework_WebServer_Connection_TrackExtraStats
505 connectionStats.fDurationOfOpenConnectionsRequests =
506 ComputeCommonStatistics (conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
507 if (cs.fMostRecentMessage) {
508 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
509 }
510 return nullopt;
511 }));
512 connectionStats.fDurationOfActiveConnectionsRequests =
513 ComputeCommonStatistics (conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
514 if (cs.fActive == false)
515 return nullopt;
516 if (cs.fMostRecentMessage) {
517 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
518 }
519 return nullopt;
520 }));
521 connectionStats.fConnectionsPiningForTheFjords =
522 conns
523 .Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
524 if (cs.fActive == false)
525 return nullopt;
526 if (cs.fMostRecentMessage) {
527 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
528 }
529 return nullopt;
530 })
531 .Count ([&] (const Duration& d) { return d > fEffectiveOptions_.fConnectionPiningForTheFjordsDelay; });
532#endif
533 return Statistics{.fThreadPool = threadPoolStats, .fConnections = connectionStats};
534}
535
536void ConnectionManager::AddInterceptor (const Interceptor& i, InterceptorAddRelativeTo relativeTo)
537{
538 switch (relativeTo) {
539 case ePrependsToEarly:
540 fEarlyInterceptors_.rwget ()->Prepend (i);
541 break;
542 case ePrepend:
543 fBeforeInterceptors_.rwget ()->Prepend (i);
544 break;
545 case eAppend:
546 fAfterInterceptors_.rwget ()->Append (i);
547 break;
548 case eAfterBeforeInterceptors:
549 fBeforeInterceptors_.rwget ()->Append (i);
550 break;
551 }
552 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
553}
554
555void ConnectionManager::RemoveInterceptor (const Interceptor& i)
556{
557 bool found = false;
558 {
559 auto b4 = fBeforeInterceptors_.rwget ();
560 if (optional<size_t> idx = b4->IndexOf (i)) {
561 b4->Remove (*idx);
562 found = true;
563 }
564 }
565 if (not found) {
566 auto after = fAfterInterceptors_.rwget ();
567 if (optional<size_t> idx = after->IndexOf (i)) {
568 after->Remove (*idx);
569 found = true;
570 }
571 }
572 Require (found);
573 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
574}
#define AssertNotImplemented()
Definition Assertions.h:401
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
Definition Assertions.h:438
#define AssertNotReached()
Definition Assertions.h:355
CommonStatistics< T > ComputeCommonStatistics(const ITERATOR_OF_T &start, ITERATOR_OF_T2 &&end)
handy aggregation of several common random-variable statistics/measurements.
const OT & NullCoalesce(const OT &l, const OT &r)
return one of l, or r, with first preference for which is engaged, and second preference for left-to-...
Definition Optional.inl:134
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
#define DbgTrace
Definition Trace.h:309
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Definition Trace.h:270
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
Definition String.h:201
nonvirtual String SubString(SZ from) const
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
Definition Collection.h:102
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Definition Sequence.h:187
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
Definition Set.h:105
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual unsigned int GetPoolSize() const
nonvirtual void SetAutomaticTCPDisconnectOnClose(const optional< Time::DurationSeconds > &waitFor) const
roughly equivalent to Association<String,String>, except that the class is smart about certain keys a...
Definition Headers.h:129
Common::Property< optional< Containers::Set< String > > > vary
Definition Headers.h:432
Duration is a chrono::duration<double> (=.
Definition Duration.h:96
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Definition Iterable.h:237
Common::ReadOnlyProperty< Statistics > statistics
Common::ReadOnlyProperty< ConnectionStatsCollection > connections
Common::ReadOnlyProperty< Traversal::Iterable< SocketAddress > > bindings
Return the socket addresses the webserver (connection manager) is listening on (to serve web content)...
Common::Property< Sequence< Interceptor > > earlyInterceptors
Common::Property< Sequence< Interceptor > > beforeInterceptors
Common::Property< optional< Interceptor > > defaultErrorHandler
Common::ReadOnlyProperty< const Options & > options
Common::Property< Sequence< Interceptor > > afterInterceptors
String ToString(T &&t, ARGS... args)
Return a debug-friendly, display version of the argument: not guaranteed parsable or usable except fo...
Definition ToString.inl:465
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
Definition Thread.cpp:955
nonvirtual Characters::String ToString() const
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...
optional< bool > fAutoComputeETagResponse
sets the initial value for each Response. Harmless except for the slight performance cost (wont alway...
optional< Containers::Set< HTTP::ContentEncoding > > fSupportedCompressionEncodings
override the set of compression encodings the WebServer supports (default is all the Stroika implemen...
optional< size_t > fAutomaticTransferChunkSize
controls whether to use Transfer-Coding: chunked or not