Stroika Library 3.0d23x
 
Loading...
Searching...
No Matches
ConnectionManager.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "Stroika/Frameworks/StroikaPreComp.h"
5
6#include <algorithm>
7#include <cstdlib>
8
10#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/Execution/Throw.h"
15#include "Stroika/Foundation/IO/Network/HTTP/Exception.h"
16#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
17#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
20
21#include "Stroika/Frameworks/WebServer/DefaultFaultInterceptor.h"
22
23#include "ConnectionManager.h"
24
25using namespace Stroika::Foundation;
28using namespace Stroika::Foundation::Execution;
29using namespace Stroika::Foundation::Math;
30using namespace Stroika::Foundation::Memory;
31using namespace Stroika::Foundation::Time;
32using namespace Stroika::Foundation::Traversal;
33
34using Memory::MakeSharedPtr;
35
36using namespace Stroika::Frameworks;
38using namespace Stroika::Frameworks::WebServer;
39
40// Comment this in to turn on aggressive noisy DbgTrace in this module
41// #define USE_NOISY_TRACE_IN_THIS_MODULE_ 1
42
43using Options = ConnectionManager::Options;
44
45namespace {
46 Sequence<Interceptor> mkEarlyInterceptors_ (const optional<Interceptor>& defaultFaultHandler)
47 {
48 Sequence<Interceptor> interceptors;
49 if (defaultFaultHandler) {
50 interceptors += *defaultFaultHandler;
51 }
52 return interceptors;
53 }
54 InterceptorChain mkInterceptorChain_ (const Router& router, const Sequence<Interceptor>& earlyInterceptors,
55 const Sequence<Interceptor>& beforeInterceptors, const Sequence<Interceptor>& afterInterceptors)
56 {
57 Sequence<Interceptor> interceptors;
58 interceptors += earlyInterceptors;
59 interceptors += beforeInterceptors;
60 interceptors += router;
61 interceptors += afterInterceptors;
62 return InterceptorChain{interceptors};
63 }
64}
65
66/*
67 ********************************************************************************
68 ************ WebServer::ConnectionManager::Statistics::ThreadPool **************
69 ********************************************************************************
70 */
71String WebServer::ConnectionManager::Statistics::ThreadPool::ToString () const
72{
74 sb << ", thread-entry-cont: "sv << fThreadEntryCount;
75 sb << "}"sv;
76 return sb;
77}
78
79/*
80 ********************************************************************************
81 ******* WebServer::ConnectionManager::Statistics::ConnectionStatistics *********
82 ********************************************************************************
83 */
85{
87 sb << "{"sv;
88 sb << "n-open-connections: "sv << fNumberOfOpenConnections;
89 sb << ", n-active-connections: "sv << fNumberOfActiveConnections;
90 sb << ", duration-open-connections: "sv << fDurationOfOpenConnections;
91 sb << ", duration-active-requests: "sv << fDurationOfOpenConnectionsRequests;
92 sb << ", duration-open-requests: "sv << fDurationOfActiveConnectionsRequests;
93 sb << ", n-connections-pining-for-the-fjords: "sv << fConnectionsPiningForTheFjords;
94 sb << "}"sv;
95 return sb;
96}
97
98/*
99 ********************************************************************************
100 **************** WebServer::ConnectionManager::Statistics **********************
101 ********************************************************************************
102 */
104{
105 StringBuilder sb;
106 sb << "{"sv;
107 sb << "ThreadPool: "sv << fThreadPool;
108 sb << ", Connections: "sv << fConnections;
109 sb << "}"sv;
110 return sb;
111}
112
113/*
114 ********************************************************************************
115 ************************ WebServer::ConnectionManager **************************
116 ********************************************************************************
117 */
118ConnectionManager::ConnectionManager (const SocketAddress& bindAddress, const Sequence<Route>& routes, const Options& options)
119 : ConnectionManager{Sequence<SocketAddress>{bindAddress}, routes, options}
120{
121}
122
123namespace {
124 inline unsigned int ComputeThreadPoolSize_ (const ConnectionManager::Options& options)
125 {
126 using Options = ConnectionManager::Options;
127 constexpr unsigned int kMinThreadCnt_{1u}; // one enough now that we support separate thread doing epoll/select and one read when data avail
128 return Math::AtLeast (kMinThreadCnt_, options.fMaxConcurrentlyHandledConnections.value_or (
129 options.fMaxConnections.value_or (Options::kDefault_MaxConnections) / 5));
130 }
131 inline unsigned int ComputeConnectionBacklog_ (const ConnectionManager::Options& options)
132 {
133 using Options = ConnectionManager::Options;
134 constexpr unsigned int kMinDefaultTCPBacklog_{3u};
135 return options.fTCPBacklog.value_or (
136 Math::AtLeast (kMinDefaultTCPBacklog_, options.fMaxConnections.value_or (Options::kDefault_MaxConnections) * 3 / 4));
137 }
139 {
140 using Options = ConnectionManager::Options;
141 Options result{o};
142 result.fCORS = NullCoalesce (result.fCORS, Options::kDefault_CORS);
143 result.fMaxConnections = NullCoalesce (result.fMaxConnections, Options::kDefault_MaxConnections);
144 result.fMaxConcurrentlyHandledConnections = NullCoalesce (result.fMaxConcurrentlyHandledConnections, ComputeThreadPoolSize_ (result));
145 result.fBindFlags = NullCoalesce (result.fBindFlags, Options::kDefault_BindFlags);
146 result.fDefaultResponseHeaders = NullCoalesce (result.fDefaultResponseHeaders, Options::kDefault_Headers);
147 result.fAutoComputeETagResponse = NullCoalesce (result.fAutoComputeETagResponse, Options::kDefault_AutoComputeETagResponse);
148 result.fAutomaticTCPDisconnectOnClose = NullCoalesce (result.fAutomaticTCPDisconnectOnClose, Options::kDefault_AutomaticTCPDisconnectOnClose);
149 result.fLinger = NullCoalesce (result.fLinger, Options::kDefault_Linger); // for now this is special and can be null/optional
150 result.fTCPNoDelay = NullCoalesce (result.fTCPNoDelay, Options::kDefault_TCPNoDelay);
151 // result.fThreadPoolName; can remain nullopt
152 result.fTCPBacklog = NullCoalesce (result.fTCPBacklog, ComputeConnectionBacklog_ (result));
153
154 {
155 // Not super clear. BUT - it appears that if you combine CORS with Caching, then the value returned from
156 // the cache maybe WRONG due to not having a (correct) Origin header. Seems most logical that the CORS
157 // logic would not check the origin if its coming from the cache, but ... who knows...
158 // According to https://blog.keul.it/chrome-cors-issue-due-to-cache/ Chrome does, and this fits with the
159 // sporadic bug I'm seeing (hard to tell cuz what seems to happen is the web server not called, but Chrome debugger reports
160 // CORS error and Chrome debugger not super clear here - not letting me see what it pulled from cache or even
161 // mentioning - so I'm guessing - that it came from the cache.
162 // Anyhow, piecing things together, it appears that IF you are using CORS, you must set the vary header
163 // to vary on origin.
164 //
165 // Note - we emit it as a default for all responses because https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Vary says:
166 // "The same Vary header value should be used on all responses for a given URL,
167 // including 304 Not Modified responses and the "default" response."
168 //
169 // -- LGP 2022-12-09
170 HTTP::Headers s = NullCoalesce (result.fDefaultResponseHeaders);
171 Set<String> v = NullCoalesce (s.vary ());
172 v += HTTP::HeaderName::kOrigin;
173 s.vary = v;
174 result.fDefaultResponseHeaders = s;
175 }
176 return result;
177 }
178}
179
180ConnectionManager::ConnectionManager (const Iterable<SocketAddress>& bindAddresses, const Sequence<Route>& routes, const Options& options)
182 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
183 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::afterInterceptors);
184 return thisObj->fAfterInterceptors_;
185 },
186 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& afterInterceptors_) {
187 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::afterInterceptors);
188 thisObj->fAfterInterceptors_ = afterInterceptors_;
189 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
190 }}
192 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
193 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::beforeInterceptors);
194 return thisObj->fBeforeInterceptors_;
195 },
196 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& beforeInterceptors) {
197 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::beforeInterceptors);
198 thisObj->fBeforeInterceptors_ = beforeInterceptors;
199 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
200 }}
201 , bindings{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Iterable<SocketAddress> {
202 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::bindings);
203 return thisObj->fBindings_;
204 }}
205 , connections{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> ConnectionStatsCollection {
206 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::connections);
207 scoped_lock critSec{thisObj->fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
208 Ensure (Set<shared_ptr<Connection>>{thisObj->fActiveConnections_.load ()}.Intersection (thisObj->GetInactiveConnections_ ()).empty ());
209 ConnectionStatsCollection r;
210 for (auto i : thisObj->fActiveConnections_.load ()) {
211 auto s = i->stats ();
212 s.fActive = true;
213 r += s;
214 }
215 for (auto i : thisObj->GetInactiveConnections_ ()) {
216 auto s = i->stats ();
217 s.fActive = false;
218 r += s;
219 }
220 return r;
221 }}
223 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> optional<Interceptor> {
224 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::defaultErrorHandler);
225 return thisObj->fDefaultErrorHandler_;
226 },
227 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& defaultErrorHandler) {
228 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::defaultErrorHandler);
229 if (thisObj->fDefaultErrorHandler_ != defaultErrorHandler) {
230 thisObj->ReplaceInEarlyInterceptor_ (thisObj->fDefaultErrorHandler_.load (), defaultErrorHandler);
231 thisObj->fDefaultErrorHandler_ = defaultErrorHandler;
232 }
233 }}
235 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
236 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::earlyInterceptors);
237 return thisObj->fEarlyInterceptors_;
238 },
239 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& earlyInterceptors) {
240 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::earlyInterceptors);
241 thisObj->fEarlyInterceptors_ = earlyInterceptors;
242 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
243 }}
244 , options{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> const Options& {
245 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::options);
246 return thisObj->fEffectiveOptions_;
247 }}
248 , statistics{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Statistics {
249 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::statistics);
250 // NOTE - this computation can be expensive, so consider caching so only recomputed at most every 30 seconds or so...???
251 // would need to be controlled via OPTIONS!
252 return thisObj->ComputeStatistics_ ();
253 }}
254 , fEffectiveOptions_{FillInDefaults_ (options)}
255 , fBindings_{bindAddresses}
256 , fDefaultErrorHandler_{DefaultFaultInterceptor{}}
257 , fEarlyInterceptors_{mkEarlyInterceptors_ (fDefaultErrorHandler_)}
258 , fBeforeInterceptors_{}
259 , fAfterInterceptors_{}
260 , fRouter_{routes, *fEffectiveOptions_.fCORS}
261 // note since Stroika v3.0d5, we set fQMax = so we don't get lots of useless requests that fill the Q. Probably shouldn't happen
262 // anyhow, since we have set 'backlog' - but in case, better failure mode ; arguably could be zero length, but for latency of threads waking up to pickup work;
263 // --LGP 2023-11-27
264 , fActiveConnectionThreads_{ThreadPool::Options{.fThreadCount = *fEffectiveOptions_.fMaxConcurrentlyHandledConnections,
265 .fThreadPoolName = fEffectiveOptions_.fThreadPoolName,
266 .fQMax = ThreadPool::QMax{*fEffectiveOptions_.fMaxConcurrentlyHandledConnections},
267 .fCollectStatistics = fEffectiveOptions_.fCollectStatistics}}
268 , fWaitForReadyConnectionThread_{Thread::CleanupPtr::eAbortBeforeWaiting,
269 Thread::New ([this] () { WaitForReadyConnectionLoop_ (); }, "WebServer-ConnectionMgr-Wait4IOReady"_k)}
270 , fListener_{bindAddresses, *fEffectiveOptions_.fBindFlags, [this] (const ConnectionOrientedStreamSocket::Ptr& s) { onConnect_ (s); },
271 *fEffectiveOptions_.fTCPBacklog}
272{
273 // validate fDefaultResponseHeaders contains no bad/inappropriate headers (like Content-Length), probably CORS headers worth a warning as well
274 // just a bunch of sanity checks for things you really DONT want to set here for any reason I can think of
275 if (fEffectiveOptions_.fDefaultResponseHeaders) {
276 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->allow () == nullopt); // unsure
277 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->cookie ().cookieDetails ().empty ());
278 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->connection () == nullopt);
279 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->contentLength () == nullopt);
280 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->ETag () == nullopt);
281 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->location () == nullopt);
282 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->origin () == nullopt); // request only header
283 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->ifNoneMatch () == nullopt); // request only header
284 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->setCookie ().cookieDetails ().empty ());
285 }
286 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
287
288 DbgTrace ("Constructing WebServer::ConnectionManager ({}), with threadpoolSize={}, backlog={}, and listening on {}"_f,
289 static_cast<const void*> (this), fActiveConnectionThreads_.GetPoolSize (), ComputeConnectionBacklog_ (options),
290 Characters::ToString (bindAddresses));
291 fWaitForReadyConnectionThread_.Start (); // start here instead of AutoStart so a guaranteed initialized before thread main starts - see http://stroika-bugs.sophists.com/browse/STK-706
292}
293
294#if qStroika_Foundation_Debug_DefaultTracingOn
295ConnectionManager::~ConnectionManager ()
296{
297 DbgTrace ("Starting destructor for WebServer::ConnectionManager ({})"_f, static_cast<const void*> (this));
298}
299#endif
300
301void ConnectionManager::DeriveConnectionDefaultOptionsFromEffectiveOptions_ ()
302{
303#if qStroika_Foundation_Debug_DefaultTracingOn
304 auto prev = fUseDefaultConnectionOptions_.fInterceptorChain;
305#endif
306 fUseDefaultConnectionOptions_ = Connection::Options{
307 .fInterceptorChain = mkInterceptorChain_ (fRouter_, fEarlyInterceptors_, fBeforeInterceptors_, fAfterInterceptors_),
308 .fDefaultResponseHeaders = *fEffectiveOptions_.fDefaultResponseHeaders,
309 .fDefaultGETResponseHeaders = fEffectiveOptions_.fDefaultGETResponseHeaders,
310 .fAutoComputeETagResponse = *fEffectiveOptions_.fAutoComputeETagResponse,
311 .fAutomaticTransferChunkSize = fEffectiveOptions_.fAutomaticTransferChunkSize,
312 .fSupportedCompressionEncodings = fEffectiveOptions_.fSupportedCompressionEncodings,
313 };
314 // @todo could add trace messages on other values changing...
315#if qStroika_Foundation_Debug_DefaultTracingOn
316 if (prev != fUseDefaultConnectionOptions_.fInterceptorChain) {
317 DbgTrace ("Updated InterceptorChain: {}"_f, fUseDefaultConnectionOptions_.fInterceptorChain);
318 }
319#endif
320}
321
322void ConnectionManager::onConnect_ (const ConnectionOrientedStreamSocket::Ptr& s)
323{
324#if USE_NOISY_TRACE_IN_THIS_MODULE_
325 Debug::TraceContextBumper ctx{"ConnectionManager::onConnect_", "s={}"_f, s};
326#endif
327 s.SetTCPNoDelay (*fEffectiveOptions_.fTCPNoDelay);
329 s.SetLinger (fEffectiveOptions_.fLinger); // 'missing' has meaning (feature disabled) for socket, so allow setting that too - doesn't mean don't pass on/use-default
330 shared_ptr<Connection> conn = MakeSharedPtr<Connection> (s, fUseDefaultConnectionOptions_);
331 fInactiveSockSetPoller_.Add (conn);
332#if USE_NOISY_TRACE_IN_THIS_MODULE_
333 {
334 scoped_lock critSec{fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
335 DbgTrace ("In onConnect_ (after adding connection {}): fActiveConnections_={}, inactiveOpenConnections_={}"_f, conn,
336 fActiveConnections_.load (), GetInactiveConnections_ ());
337 }
338#endif
339}
340
341void ConnectionManager::WaitForReadyConnectionLoop_ ()
342{
343#if USE_NOISY_TRACE_IN_THIS_MODULE_
344 Debug::TraceContextBumper ctx{"ConnectionManager::WaitForReadyConnectionLoop_"};
345#endif
346
347 // run til thread aborted
348 while (true) {
349 try {
351
352#if USE_NOISY_TRACE_IN_THIS_MODULE_
353 {
354 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
355 DbgTrace (L"At top of WaitForReadyConnectionLoop_: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
356 fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
357 }
358#endif
359 for (shared_ptr<Connection> readyConnection : fInactiveSockSetPoller_.WaitQuietly ()) {
360
361 auto handleActivatedConnection = [this, readyConnection] () mutable {
362#if USE_NOISY_TRACE_IN_THIS_MODULE_
364 Stroika_Foundation_Debug_OptionalizeTraceArgs ("ConnectionManager::...processConnectionLoop")};
365#endif
366
367 /*
368 * Process the request
369 */
370 bool keepAlive = (readyConnection->ReadAndProcessMessage () == Connection::eTryAgainLater);
371
372 /*
373 * Handle the Connection object, moving it to the appropriate list etc...
374 */
375 try {
376 scoped_lock critSec{fActiveConnections_}; // lock not strictly needed here, but used to assure consistency between the active/inactive lists
377 fActiveConnections_.rwget ().rwref ().Remove (readyConnection); // no matter what, remove from active connections
378 if (keepAlive) {
379 fInactiveSockSetPoller_.Add (readyConnection);
380 }
381 }
382 catch (...) {
383 AssertNotReached (); // these two lists need to be kept in sync, so really assume updating them cannot fail/break
384 ReThrow ();
385 }
386
387// @todo lose this code after a bit of testing that never triggered - LGP 2021-03-02
388// I think logic was corner case and backwards (sb if keep alive)
389//anyhow should never happen except for incomplete header in which case we dont want to END
390#if 0
391 if (not keepAlive) {
392 Assert (readyConnection->response ().responseCompleted ()); // don't think this test is needed - I think we always assure this in Connection - LGP 2021-03-02
393 if (not readyConnection->response ().responseCompleted ()) {
394 IgnoreExceptionsForCall (readyConnection->rwResponse ().End ());
395 }
396 }
397#endif
398
399#if USE_NOISY_TRACE_IN_THIS_MODULE_
400 {
401 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
402 DbgTrace ("at end of read&process task (keepAlive={}) for connection {}: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
403 keepAlive, readyConnection, fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
404 }
405#endif
406 };
407
408 try {
409 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
410 fInactiveSockSetPoller_.Remove (readyConnection);
411 fActiveConnections_.rwget ().rwref ().Add (readyConnection);
412 }
413 catch (...) {
414 AssertNotReached (); // these two lists need to be kept in sync, so really assume updating them cannot fail/break
415 ReThrow ();
416 }
417 fActiveConnectionThreads_.AddTask (handleActivatedConnection);
418 }
419 }
420 catch (const Thread::AbortException&) {
421 ReThrow ();
422 }
423 catch (...) {
424 DbgTrace ("Internal exception in WaitForReadyConnectionLoop_ loop suppressed: {}"_f, current_exception ());
425 }
426 }
427}
428
429Collection<shared_ptr<Connection>> ConnectionManager::GetInactiveConnections_ () const
430{
431 return fInactiveSockSetPoller_.GetDescriptors ().Map<Collection<shared_ptr<Connection>>> ([] (const auto& i) { return i.first; });
432}
433
434void ConnectionManager::ReplaceInEarlyInterceptor_ (const optional<Interceptor>& oldValue, const optional<Interceptor>& newValue)
435{
436 // replace old error handler in the interceptor chain, in the same spot if possible, and otherwise append
437 auto rwLock = this->fEarlyInterceptors_.rwget ();
438 Sequence<Interceptor> newInterceptors;
439 bool addedDefault = false;
440 for (const Interceptor& i : rwLock.load ()) {
441 if (oldValue == i) {
442 if (newValue) {
443 newInterceptors += *newValue;
444 }
445 addedDefault = true;
446 }
447 else {
448 newInterceptors += i;
449 }
450 }
451 if (newValue and not addedDefault) {
452 newInterceptors += *newValue;
453 }
454 rwLock.store (newInterceptors);
455 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
456}
457
458void ConnectionManager::AbortConnection (const shared_ptr<Connection>& /*conn*/)
459{
461}
462
463auto ConnectionManager::ComputeStatistics_ () const -> Statistics
464{
465#if USE_NOISY_TRACE_IN_THIS_MODULE_
466 constexpr bool kExtraDebugging_ = true;
467#else
468 constexpr bool kExtraDebugging_ = false;
469#endif
470 ConnectionStatsCollection conns;
471 {
472 scoped_lock critSec{fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
473 Assert (Set<shared_ptr<Connection>>{fActiveConnections_.load ()}.Intersection (GetInactiveConnections_ ()).empty ());
474 for (auto i : fActiveConnections_.load ()) {
475 auto s = i->stats ();
476 s.fActive = true;
477 conns += s;
478 if constexpr (kExtraDebugging_) {
479 DbgTrace ("Conn={}"_f, s);
480 }
481 }
482 for (auto i : GetInactiveConnections_ ()) {
483 auto s = i->stats ();
484 s.fActive = false;
485 conns += s;
486 if constexpr (kExtraDebugging_) {
487 DbgTrace ("Conn={}"_f, s);
488 }
489 }
490 }
491 Statistics::ThreadPool threadPoolStats = [&] () {
492 if (fEffectiveOptions_.fCollectStatistics) {
493 return Statistics::ThreadPool{{fActiveConnectionThreads_.GetCurrentStatistics ()}, fActiveConnectionThreads_.GetPoolSize ()};
494 }
495 else {
496 return Statistics::ThreadPool{{}, fActiveConnectionThreads_.GetPoolSize ()};
497 }
498 }();
499 Statistics::ConnectionStatistics connectionStats;
500 connectionStats.fNumberOfActiveConnections = conns.Count ([] (const Connection::Stats& s) { return s.fActive == true; });
501 connectionStats.fNumberOfOpenConnections = conns.size ();
502
503 TimePointSeconds now = Time::GetTickCount ();
504 connectionStats.fDurationOfOpenConnections = ComputeCommonStatistics (
505 conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> { return now - cs.fCreatedAt; }));
506
507#if qStroika_Framework_WebServer_Connection_TrackExtraStats
508 connectionStats.fDurationOfOpenConnectionsRequests =
509 ComputeCommonStatistics (conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
510 if (cs.fMostRecentMessage) {
511 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
512 }
513 return nullopt;
514 }));
515 connectionStats.fDurationOfActiveConnectionsRequests =
516 ComputeCommonStatistics (conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
517 if (cs.fActive == false)
518 return nullopt;
519 if (cs.fMostRecentMessage) {
520 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
521 }
522 return nullopt;
523 }));
524 connectionStats.fConnectionsPiningForTheFjords =
525 conns
526 .Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
527 if (cs.fActive == false)
528 return nullopt;
529 if (cs.fMostRecentMessage) {
530 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
531 }
532 return nullopt;
533 })
534 .Count ([&] (const Duration& d) { return d > fEffectiveOptions_.fConnectionPiningForTheFjordsDelay; });
535#endif
536 return Statistics{.fThreadPool = threadPoolStats, .fConnections = connectionStats};
537}
538
539void ConnectionManager::AddInterceptor (const Interceptor& i, InterceptorAddRelativeTo relativeTo)
540{
541 switch (relativeTo) {
542 case ePrependsToEarly:
543 fEarlyInterceptors_.rwget ()->Prepend (i);
544 break;
545 case ePrepend:
546 fBeforeInterceptors_.rwget ()->Prepend (i);
547 break;
548 case eAppend:
549 fAfterInterceptors_.rwget ()->Append (i);
550 break;
551 case eAfterBeforeInterceptors:
552 fBeforeInterceptors_.rwget ()->Append (i);
553 break;
554 }
555 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
556}
557
558void ConnectionManager::RemoveInterceptor (const Interceptor& i)
559{
560 bool found = false;
561 {
562 auto b4 = fBeforeInterceptors_.rwget ();
563 if (optional<size_t> idx = b4->IndexOf (i)) {
564 b4->Remove (*idx);
565 found = true;
566 }
567 }
568 if (not found) {
569 auto after = fAfterInterceptors_.rwget ();
570 if (optional<size_t> idx = after->IndexOf (i)) {
571 after->Remove (*idx);
572 found = true;
573 }
574 }
575 Require (found);
576 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
577}
#define AssertNotImplemented()
Definition Assertions.h:401
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
Definition Assertions.h:438
#define AssertNotReached()
Definition Assertions.h:355
CommonStatistics< T > ComputeCommonStatistics(const ITERATOR_OF_T &start, ITERATOR_OF_T2 &&end)
handy aggregation of several common random-variable statistics/measurements.
const OT & NullCoalesce(const OT &l, const OT &r)
return one of l, or r, with first preference for which is engaged, and second preference for left-to-...
Definition Optional.inl:134
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
#define DbgTrace
Definition Trace.h:309
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Definition Trace.h:270
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
Definition String.h:201
nonvirtual String SubString(SZ from) const
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual unsigned int GetPoolSize() const
nonvirtual void SetAutomaticTCPDisconnectOnClose(const optional< Time::DurationSeconds > &waitFor) const
roughly equivalent to Association<String,String>, except that the class is smart about certain keys a...
Definition Headers.h:129
Common::Property< optional< Containers::Set< String > > > vary
Definition Headers.h:432
Duration is a chrono::duration<double> (=.
Definition Duration.h:96
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Definition Iterable.h:237
Common::ReadOnlyProperty< Statistics > statistics
Common::ReadOnlyProperty< ConnectionStatsCollection > connections
Common::ReadOnlyProperty< Traversal::Iterable< SocketAddress > > bindings
Return the socket addresses the webserver (connection manager) is listening on (to serve web content)...
Common::Property< Sequence< Interceptor > > earlyInterceptors
Common::Property< Sequence< Interceptor > > beforeInterceptors
Common::Property< optional< Interceptor > > defaultErrorHandler
Common::ReadOnlyProperty< const Options & > options
Common::Property< Sequence< Interceptor > > afterInterceptors
String ToString(T &&t, ARGS... args)
Return a debug-friendly, display version of the argument: not guaranteed parsable or usable except fo...
Definition ToString.inl:465
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
Definition Thread.cpp:960
nonvirtual Characters::String ToString() const
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...
optional< bool > fAutoComputeETagResponse
sets the initial value for each Response. Harmless except for the slight performance cost (wont alway...
optional< Containers::Set< HTTP::ContentEncoding > > fSupportedCompressionEncodings
override the set of compression encodings the WebServer supports (default is all the Stroika implemen...
optional< size_t > fAutomaticTransferChunkSize
controls whether to use Transfer-Coding: chunked or not