Stroika Library 3.0d23
 
Loading...
Searching...
No Matches
ConnectionManager.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "Stroika/Frameworks/StroikaPreComp.h"
5
6#include <algorithm>
7#include <cstdlib>
8
10#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/Execution/Throw.h"
15#include "Stroika/Foundation/IO/Network/HTTP/Exception.h"
16#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
17#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
20
21#include "Stroika/Frameworks/WebServer/DefaultFaultInterceptor.h"
22
23#include "ConnectionManager.h"
24
25using namespace Stroika::Foundation;
28using namespace Stroika::Foundation::Execution;
29using namespace Stroika::Foundation::Math;
30using namespace Stroika::Foundation::Memory;
31using namespace Stroika::Foundation::Time;
32using namespace Stroika::Foundation::Traversal;
33
34using Memory::MakeSharedPtr;
35
36using namespace Stroika::Frameworks;
38using namespace Stroika::Frameworks::WebServer;
39
40// Comment this in to turn on aggressive noisy DbgTrace in this module
41// #define USE_NOISY_TRACE_IN_THIS_MODULE_ 1
42
43// Comment this in to turn on aggressive noisy DbgTrace in this module
44// As name suggests, DANGEROUS because called from threads BESIDES the threadpool ones, and so can easily cause assert failures cuz Connection
45// objects CHECK they are not used un-externally-synrchonized!
46// issue is the DbgTrace functions DEREFERENCE the shared_ptrs (in print function) and they do so to OTHER connections
47// than their own (GetActiveConnections/GetInactiveConnections).
48// #define USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_ 1
49
50using Options = ConnectionManager::Options;
51
52namespace {
53 Sequence<Interceptor> mkEarlyInterceptors_ (const optional<Interceptor>& defaultFaultHandler)
54 {
55 Sequence<Interceptor> interceptors;
56 if (defaultFaultHandler) {
57 interceptors += *defaultFaultHandler;
58 }
59 return interceptors;
60 }
61 InterceptorChain mkInterceptorChain_ (const Router& router, const Sequence<Interceptor>& earlyInterceptors,
62 const Sequence<Interceptor>& beforeInterceptors, const Sequence<Interceptor>& afterInterceptors)
63 {
64 Sequence<Interceptor> interceptors;
65 interceptors += earlyInterceptors;
66 interceptors += beforeInterceptors;
67 interceptors += router;
68 interceptors += afterInterceptors;
69 return InterceptorChain{interceptors};
70 }
71}
72
73namespace {
74 using TypeOfMonitor = WaitForIOReady_Support::WaitForIOReady_Base::TypeOfMonitor;
75 const Set<TypeOfMonitor> kInactiveSocketMonitorEvents2Watch4_{TypeOfMonitor::eRead};
76}
77
78/*
79 ********************************************************************************
80 ************ WebServer::ConnectionManager::Statistics::ThreadPool **************
81 ********************************************************************************
82 */
83String WebServer::ConnectionManager::Statistics::ThreadPool::ToString () const
84{
86 sb << ", thread-entry-cont: "sv << fThreadEntryCount;
87 sb << "}"sv;
88 return sb;
89}
90
91/*
92 ********************************************************************************
93 ******* WebServer::ConnectionManager::Statistics::ConnectionStatistics *********
94 ********************************************************************************
95 */
97{
99 sb << "{"sv;
100 sb << "n-open-connections: "sv << fNumberOfOpenConnections;
101 sb << ", n-active-connections: "sv << fNumberOfActiveConnections;
102 sb << ", duration-open-connections: "sv << fDurationOfOpenConnections;
103 sb << ", duration-active-requests: "sv << fDurationOfOpenConnectionsRequests;
104 sb << ", duration-open-requests: "sv << fDurationOfActiveConnectionsRequests;
105 sb << ", n-connections-pining-for-the-fjords: "sv << fConnectionsPiningForTheFjords;
106 sb << "}"sv;
107 return sb;
108}
109
110/*
111 ********************************************************************************
112 **************** WebServer::ConnectionManager::Statistics **********************
113 ********************************************************************************
114 */
116{
117 StringBuilder sb;
118 sb << "{"sv;
119 sb << "ThreadPool: "sv << fThreadPool;
120 sb << ", Connections: "sv << fConnections;
121 sb << "}"sv;
122 return sb;
123}
124
125/*
126 ********************************************************************************
127 ************************ WebServer::ConnectionManager **************************
128 ********************************************************************************
129 */
130ConnectionManager::ConnectionManager (const SocketAddress& bindAddress, const Sequence<Route>& routes, const Options& options)
131 : ConnectionManager{Sequence<SocketAddress>{bindAddress}, routes, options}
132{
133}
134
135namespace {
136 inline unsigned int ComputeThreadPoolSize_ (const ConnectionManager::Options& options)
137 {
138 using Options = ConnectionManager::Options;
139 constexpr unsigned int kMinThreadCnt_{1u}; // one enough now that we support separate thread doing epoll/select and one read when data avail
140 return Math::AtLeast (kMinThreadCnt_, options.fMaxConcurrentlyHandledConnections.value_or (
141 options.fMaxConnections.value_or (Options::kDefault_MaxConnections) / 5));
142 }
143 inline unsigned int ComputeConnectionBacklog_ (const ConnectionManager::Options& options)
144 {
145 using Options = ConnectionManager::Options;
146 constexpr unsigned int kMinDefaultTCPBacklog_{3u};
147 return options.fTCPBacklog.value_or (
148 Math::AtLeast (kMinDefaultTCPBacklog_, options.fMaxConnections.value_or (Options::kDefault_MaxConnections) * 3 / 4));
149 }
151 {
152 using Options = ConnectionManager::Options;
153 Options result{o};
154 result.fCORS = NullCoalesce (result.fCORS, Options::kDefault_CORS);
155 result.fMaxConnections = NullCoalesce (result.fMaxConnections, Options::kDefault_MaxConnections);
156 result.fMaxConcurrentlyHandledConnections = NullCoalesce (result.fMaxConcurrentlyHandledConnections, ComputeThreadPoolSize_ (result));
157 result.fBindFlags = NullCoalesce (result.fBindFlags, Options::kDefault_BindFlags);
158 result.fDefaultResponseHeaders = NullCoalesce (result.fDefaultResponseHeaders, Options::kDefault_Headers);
159 result.fAutoComputeETagResponse = NullCoalesce (result.fAutoComputeETagResponse, Options::kDefault_AutoComputeETagResponse);
160 result.fAutomaticTCPDisconnectOnClose = NullCoalesce (result.fAutomaticTCPDisconnectOnClose, Options::kDefault_AutomaticTCPDisconnectOnClose);
161 result.fLinger = NullCoalesce (result.fLinger, Options::kDefault_Linger); // for now this is special and can be null/optional
162 result.fTCPNoDelay = NullCoalesce (result.fTCPNoDelay, Options::kDefault_TCPNoDelay);
163 // result.fThreadPoolName; can remain nullopt
164 result.fTCPBacklog = NullCoalesce (result.fTCPBacklog, ComputeConnectionBacklog_ (result));
165
166 {
167 // Not super clear. BUT - it appears that if you combine CORS with Caching, then the value returned from
168 // the cache maybe WRONG due to not having a (correct) Origin header. Seems most logical that the CORS
169 // logic would not check the origin if its coming from the cache, but ... who knows...
170 // According to https://blog.keul.it/chrome-cors-issue-due-to-cache/ Chrome does, and this fits with the
171 // sporadic bug I'm seeing (hard to tell cuz what seems to happen is the web server not called, but Chrome debugger reports
172 // CORS error and Chrome debugger not super clear here - not letting me see what it pulled from cache or even
173 // mentioning - so I'm guessing - that it came from the cache.
174 // Anyhow, piecing things together, it appears that IF you are using CORS, you must set the vary header
175 // to vary on origin.
176 //
177 // Note - we emit it as a default for all responses because https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Vary says:
178 // "The same Vary header value should be used on all responses for a given URL,
179 // including 304 Not Modified responses and the "default" response."
180 //
181 // -- LGP 2022-12-09
182 HTTP::Headers s = NullCoalesce (result.fDefaultResponseHeaders);
183 Set<String> v = NullCoalesce (s.vary ());
184 v += HTTP::HeaderName::kOrigin;
185 s.vary = v;
186 result.fDefaultResponseHeaders = s;
187 }
188 return result;
189 }
190}
191
192ConnectionManager::ConnectionManager (const Iterable<SocketAddress>& bindAddresses, const Sequence<Route>& routes, const Options& options)
194 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
195 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::afterInterceptors);
196 return thisObj->fAfterInterceptors_;
197 },
198 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& afterInterceptors_) {
199 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::afterInterceptors);
200 thisObj->fAfterInterceptors_ = afterInterceptors_;
201 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
202 }}
204 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
205 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::beforeInterceptors);
206 return thisObj->fBeforeInterceptors_;
207 },
208 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& beforeInterceptors) {
209 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::beforeInterceptors);
210 thisObj->fBeforeInterceptors_ = beforeInterceptors;
211 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
212 }}
213 , bindings{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Iterable<SocketAddress> {
214 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::bindings);
215 return thisObj->fBindings_;
216 }}
217 , connections{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> ConnectionStatsCollection {
218 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::connections);
219 scoped_lock critSec{thisObj->fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
220 Ensure (Set<shared_ptr<Connection>>{thisObj->fActiveConnections_.load ()}.Intersection (thisObj->GetInactiveConnections_ ()).empty ());
221 ConnectionStatsCollection r;
222 for (auto i : thisObj->fActiveConnections_.load ()) {
223 auto s = i->stats ();
224 s.fActive = true;
225 r += s;
226 }
227 for (auto i : thisObj->GetInactiveConnections_ ()) {
228 auto s = i->stats ();
229 s.fActive = false;
230 r += s;
231 }
232 return r;
233 }}
235 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> optional<Interceptor> {
236 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::defaultErrorHandler);
237 return thisObj->fDefaultErrorHandler_;
238 },
239 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& defaultErrorHandler) {
240 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::defaultErrorHandler);
241 if (thisObj->fDefaultErrorHandler_ != defaultErrorHandler) {
242 thisObj->ReplaceInEarlyInterceptor_ (thisObj->fDefaultErrorHandler_.load (), defaultErrorHandler);
243 thisObj->fDefaultErrorHandler_ = defaultErrorHandler;
244 }
245 }}
247 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Sequence<Interceptor> {
248 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::earlyInterceptors);
249 return thisObj->fEarlyInterceptors_;
250 },
251 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] auto* property, const auto& earlyInterceptors) {
252 ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::earlyInterceptors);
253 thisObj->fEarlyInterceptors_ = earlyInterceptors;
254 thisObj->DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
255 }}
256 , options{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> const Options& {
257 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::options);
258 return thisObj->fEffectiveOptions_;
259 }}
260 , statistics{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> Statistics {
261 const ConnectionManager* thisObj = qStroika_Foundation_Common_Property_OuterObjPtr (property, &ConnectionManager::statistics);
262 // NOTE - this computation can be expensive, so consider caching so only recomputed at most every 30 seconds or so...???
263 // would need to be controlled via OPTIONS!
264 return thisObj->ComputeStatistics_ ();
265 }}
266 , fEffectiveOptions_{FillInDefaults_ (options)}
267 , fBindings_{bindAddresses}
268 , fDefaultErrorHandler_{DefaultFaultInterceptor{}}
269 , fEarlyInterceptors_{mkEarlyInterceptors_ (fDefaultErrorHandler_)}
270 , fBeforeInterceptors_{}
271 , fAfterInterceptors_{}
272 , fRouter_{routes, *fEffectiveOptions_.fCORS}
273 // note since Stroika v3.0d5, we set fQMax = so we don't get lots of useless requests that fill the Q. Probably shouldn't happen
274 // anyhow, since we have set 'backlog' - but in case, better failure mode ; arguably could be zero length, but for latency of threads waking up to pickup work;
275 // --LGP 2023-11-27
276 , fActiveConnectionThreads_{ThreadPool::Options{.fThreadCount = *fEffectiveOptions_.fMaxConcurrentlyHandledConnections,
277 .fThreadPoolName = fEffectiveOptions_.fThreadPoolName,
278 .fQMax = ThreadPool::QMax{*fEffectiveOptions_.fMaxConcurrentlyHandledConnections},
279 .fCollectStatistics = fEffectiveOptions_.fCollectStatistics}}
280 , fWaitForReadyConnectionThread_{Thread::CleanupPtr::eAbortBeforeWaiting,
281 Thread::New ([this] () { WaitForReadyConnectionLoop_ (); }, "WebServer-ConnectionMgr-Wait4IOReady"_k)}
282 , fListener_{bindAddresses, *fEffectiveOptions_.fBindFlags, [this] (const ConnectionOrientedStreamSocket::Ptr& s) { onConnect_ (s); },
283 *fEffectiveOptions_.fTCPBacklog}
284{
285 // validate fDefaultResponseHeaders contains no bad/inappropriate headers (like Content-Length), probably CORS headers worth a warning as well
286 // just a bunch of sanity checks for things you really DONT want to set here for any reason I can think of
287 if (fEffectiveOptions_.fDefaultResponseHeaders) {
288 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->allow () == nullopt); // unsure
289 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->cookie ().cookieDetails ().empty ());
290 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->connection () == nullopt);
291 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->contentLength () == nullopt);
292 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->ETag () == nullopt);
293 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->location () == nullopt);
294 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->origin () == nullopt); // request only header
295 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->ifNoneMatch () == nullopt); // request only header
296 WeakAssert (fEffectiveOptions_.fDefaultResponseHeaders->setCookie ().cookieDetails ().empty ());
297 }
298 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
299
300 DbgTrace ("Constructing WebServer::ConnectionManager ({}), with threadpoolSize={}, backlog={}, and listening on {}"_f,
301 static_cast<const void*> (this), fActiveConnectionThreads_.GetPoolSize (), ComputeConnectionBacklog_ (options),
302 Characters::ToString (bindAddresses));
303 fWaitForReadyConnectionThread_.Start (); // start here instead of AutoStart so a guaranteed initialized before thread main starts - see http://stroika-bugs.sophists.com/browse/STK-706
304}
305
306#if qStroika_Foundation_Debug_DefaultTracingOn
307ConnectionManager::~ConnectionManager ()
308{
309 DbgTrace ("Starting destructor for WebServer::ConnectionManager ({})"_f, static_cast<const void*> (this));
310}
311#endif
312
313void ConnectionManager::DeriveConnectionDefaultOptionsFromEffectiveOptions_ ()
314{
315#if qStroika_Foundation_Debug_DefaultTracingOn
316 auto prev = fUseDefaultConnectionOptions_.fInterceptorChain;
317#endif
318 fUseDefaultConnectionOptions_ = Connection::Options{
319 .fInterceptorChain = mkInterceptorChain_ (fRouter_, fEarlyInterceptors_, fBeforeInterceptors_, fAfterInterceptors_),
320 .fDefaultResponseHeaders = *fEffectiveOptions_.fDefaultResponseHeaders,
321 .fDefaultGETResponseHeaders = fEffectiveOptions_.fDefaultGETResponseHeaders,
322 .fAutoComputeETagResponse = *fEffectiveOptions_.fAutoComputeETagResponse,
323 .fAutomaticTransferChunkSize = fEffectiveOptions_.fAutomaticTransferChunkSize,
324 .fSupportedCompressionEncodings = fEffectiveOptions_.fSupportedCompressionEncodings,
325 };
326 // @todo could add trace messages on other values changing...
327#if qStroika_Foundation_Debug_DefaultTracingOn
328 if (prev != fUseDefaultConnectionOptions_.fInterceptorChain) {
329 DbgTrace ("Updated InterceptorChain: {}"_f, fUseDefaultConnectionOptions_.fInterceptorChain);
330 }
331#endif
332}
333
334void ConnectionManager::onConnect_ (const ConnectionOrientedStreamSocket::Ptr& s)
335{
336#if USE_NOISY_TRACE_IN_THIS_MODULE_
337 Debug::TraceContextBumper ctx{"ConnectionManager::onConnect_", "s={}"_f, s};
338#endif
339 s.SetTCPNoDelay (*fEffectiveOptions_.fTCPNoDelay);
341 s.SetLinger (fEffectiveOptions_.fLinger); // 'missing' has meaning (feature disabled) for socket, so allow setting that too - doesn't mean don't pass on/use-default
342 shared_ptr<Connection> conn = MakeSharedPtr<Connection> (s, fUseDefaultConnectionOptions_);
343 fInactiveSockSetPoller_.Add (conn, kInactiveSocketMonitorEvents2Watch4_);
344#if USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_
345 {
346 scoped_lock critSec{fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
347 DbgTrace ("In onConnect_ (after adding connection {}): fActiveConnections_={}, inactiveOpenConnections_={}"_f, conn,
348 fActiveConnections_.load (), GetInactiveConnections_ ());
349 }
350#endif
351}
352
353void ConnectionManager::WaitForReadyConnectionLoop_ ()
354{
355#if USE_NOISY_TRACE_IN_THIS_MODULE_
356 Debug::TraceContextBumper ctx{"ConnectionManager::WaitForReadyConnectionLoop_"};
357#endif
358
359 // run til thread aborted
360 while (true) {
361 try {
363
364#if USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_
365 {
366 // DANGEROUS - OK to print out POINTERS, but Connection is NOT re-entrant and could be in use in threadpool or other thread.
367 // So CAREFUL not to call even CONST methods of those Connection objects here!!!
368 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
369 DbgTrace ("At top of WaitForReadyConnectionLoop_: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
370 fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
371 }
372#endif
373 for (shared_ptr<Connection> readyConnection : fInactiveSockSetPoller_.WaitQuietly ()) {
374
375 auto handleActivatedConnection = [this, readyConnection] () mutable {
376 /*
377 * This ENTIRE lambda runs in a single threadpool task, and is the only thing that reads/writes
378 * the readyConnection object, so no locking needed for that object.
379 */
380#if USE_NOISY_TRACE_IN_THIS_MODULE_
382 "ConnectionManager::...handleActivatedConnection", "readyConnection={}"_f, readyConnection)};
383#endif
384
385 /*
386 * Process the request
387 */
388 bool keepAlive = (readyConnection->ReadAndProcessMessage () == Connection::eTryAgainLater);
389
390 /*
391 * Handle the Connection object, moving it to the appropriate list etc...
392 */
393 try {
394 scoped_lock critSec{fActiveConnections_}; // lock not strictly needed here, but used to assure consistency between the active/inactive lists
395 fActiveConnections_.rwget ().rwref ().Remove (readyConnection); // no matter what, remove from active connections
396 if (keepAlive) {
397 fInactiveSockSetPoller_.Add (readyConnection, kInactiveSocketMonitorEvents2Watch4_);
398 }
399#if USE_NOISY_TRACE_IN_THIS_MODULE_
400 else {
401 DbgTrace ("Closing connection {}"_f, readyConnection); // cuz it goes out of scope, and is no longer referenced in either list
402 }
403#endif
404 }
405 catch (...) {
406 AssertNotReached (); // these two lists need to be kept in sync, so really assume updating them cannot fail/break
407 ReThrow ();
408 }
409
410// @todo lose this code after a bit of testing that never triggered - LGP 2021-03-02
411// I think logic was corner case and backwards (sb if keep alive)
412//anyhow should never happen except for incomplete header in which case we dont want to END
413#if 0
414 if (not keepAlive) {
415 Assert (readyConnection->response ().responseCompleted ()); // don't think this test is needed - I think we always assure this in Connection - LGP 2021-03-02
416 if (not readyConnection->response ().responseCompleted ()) {
417 IgnoreExceptionsForCall (readyConnection->rwResponse ().End ());
418 }
419 }
420#endif
421
422#if USE_NOISY_TRACE_IN_THIS_MODULE_DANGEROUS_ASSERT_FAILURY_
423 {
424 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
425 DbgTrace ("at end of read&process task (keepAlive={}) for connection {}: fActiveConnections_={}, inactiveOpenConnections_={}"_f,
426 keepAlive, readyConnection, fActiveConnections_.cget ().cref (), GetInactiveConnections_ ());
427 }
428#endif
429 };
430
431 try {
432 scoped_lock critSec{fActiveConnections_}; // Any place SWAPPING between active and inactive, hold this lock so both lists reamain consistent
433 fInactiveSockSetPoller_.Remove (readyConnection);
434 fActiveConnections_.rwget ().rwref ().Add (readyConnection);
435 }
436 catch (...) {
437 AssertNotReached (); // these two lists need to be kept in sync, so really assume updating them cannot fail/break
438 ReThrow ();
439 }
440 fActiveConnectionThreads_.AddTask (handleActivatedConnection);
441 }
442 }
443 catch (const Thread::AbortException&) {
444 ReThrow ();
445 }
446 catch (...) {
447 DbgTrace ("Internal exception in WaitForReadyConnectionLoop_ loop suppressed: {}"_f, current_exception ());
448 }
449 }
450}
451
452Collection<shared_ptr<Connection>> ConnectionManager::GetInactiveConnections_ () const
453{
454 return fInactiveSockSetPoller_.GetDescriptors ().Map<Collection<shared_ptr<Connection>>> ([] (const auto& i) { return i.first; });
455}
456
457void ConnectionManager::ReplaceInEarlyInterceptor_ (const optional<Interceptor>& oldValue, const optional<Interceptor>& newValue)
458{
459 // replace old error handler in the interceptor chain, in the same spot if possible, and otherwise append
460 auto rwLock = this->fEarlyInterceptors_.rwget ();
461 Sequence<Interceptor> newInterceptors;
462 bool addedDefault = false;
463 for (const Interceptor& i : rwLock.load ()) {
464 if (oldValue == i) {
465 if (newValue) {
466 newInterceptors += *newValue;
467 }
468 addedDefault = true;
469 }
470 else {
471 newInterceptors += i;
472 }
473 }
474 if (newValue and not addedDefault) {
475 newInterceptors += *newValue;
476 }
477 rwLock.store (newInterceptors);
478 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
479}
480
481void ConnectionManager::AbortConnection (const shared_ptr<Connection>& /*conn*/)
482{
484}
485
486auto ConnectionManager::ComputeStatistics_ () const -> Statistics
487{
488#if USE_NOISY_TRACE_IN_THIS_MODULE_
489 constexpr bool kExtraDebugging_ = true;
490#else
491 constexpr bool kExtraDebugging_ = false;
492#endif
493 ConnectionStatsCollection conns;
494 {
495 scoped_lock critSec{fActiveConnections_}; // fActiveConnections_ lock used for inactive connections too (only for exchanges between the two lists)
496 Assert (Set<shared_ptr<Connection>>{fActiveConnections_.load ()}.Intersection (GetInactiveConnections_ ()).empty ());
497 for (auto i : fActiveConnections_.load ()) {
498 auto s = i->stats ();
499 s.fActive = true;
500 conns += s;
501 if constexpr (kExtraDebugging_) {
502 DbgTrace ("Active Conn={}"_f, s);
503 }
504 }
505 for (auto i : GetInactiveConnections_ ()) {
506 auto s = i->stats ();
507 s.fActive = false;
508 conns += s;
509 if constexpr (kExtraDebugging_) {
510 DbgTrace ("Inactive Conn={}"_f, s);
511 }
512 }
513 }
514 Statistics::ThreadPool threadPoolStats = [&] () {
515 if (fEffectiveOptions_.fCollectStatistics) {
516 return Statistics::ThreadPool{{fActiveConnectionThreads_.GetCurrentStatistics ()}, fActiveConnectionThreads_.GetPoolSize ()};
517 }
518 else {
519 return Statistics::ThreadPool{{}, fActiveConnectionThreads_.GetPoolSize ()};
520 }
521 }();
522 Statistics::ConnectionStatistics connectionStats;
523 connectionStats.fNumberOfActiveConnections = conns.Count ([] (const Connection::Stats& s) { return s.fActive == true; });
524 connectionStats.fNumberOfOpenConnections = conns.size ();
525
526 TimePointSeconds now = Time::GetTickCount ();
527 connectionStats.fDurationOfOpenConnections = ComputeCommonStatistics (
528 conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> { return now - cs.fCreatedAt; }));
529
530#if qStroika_Framework_WebServer_Connection_TrackExtraStats
531 connectionStats.fDurationOfOpenConnectionsRequests =
532 ComputeCommonStatistics (conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
533 if (cs.fMostRecentMessage) {
534 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
535 }
536 return nullopt;
537 }));
538 connectionStats.fDurationOfActiveConnectionsRequests =
539 ComputeCommonStatistics (conns.Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
540 if (cs.fActive == false)
541 return nullopt;
542 if (cs.fMostRecentMessage) {
543 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
544 }
545 return nullopt;
546 }));
547 connectionStats.fConnectionsPiningForTheFjords =
548 conns
549 .Map<Iterable<Duration>> ([&] (const Connection::Stats& cs) -> optional<Duration> {
550 if (cs.fActive == false)
551 return nullopt;
552 if (cs.fMostRecentMessage) {
553 return cs.fMostRecentMessage->ReplaceEnd (min (cs.fMostRecentMessage->GetUpperBound (), now)).GetDistanceSpanned ();
554 }
555 return nullopt;
556 })
557 .Count ([&] (const Duration& d) { return d > fEffectiveOptions_.fConnectionPiningForTheFjordsDelay; });
558#endif
559 return Statistics{.fThreadPool = threadPoolStats, .fConnections = connectionStats};
560}
561
562void ConnectionManager::AddInterceptor (const Interceptor& i, InterceptorAddRelativeTo relativeTo)
563{
564 switch (relativeTo) {
565 case ePrependsToEarly:
566 fEarlyInterceptors_.rwget ()->Prepend (i);
567 break;
568 case ePrepend:
569 fBeforeInterceptors_.rwget ()->Prepend (i);
570 break;
571 case eAppend:
572 fAfterInterceptors_.rwget ()->Append (i);
573 break;
574 case eAfterBeforeInterceptors:
575 fBeforeInterceptors_.rwget ()->Append (i);
576 break;
577 }
578 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
579}
580
581void ConnectionManager::RemoveInterceptor (const Interceptor& i)
582{
583 bool found = false;
584 {
585 auto b4 = fBeforeInterceptors_.rwget ();
586 if (optional<size_t> idx = b4->IndexOf (i)) {
587 b4->Remove (*idx);
588 found = true;
589 }
590 }
591 if (not found) {
592 auto after = fAfterInterceptors_.rwget ();
593 if (optional<size_t> idx = after->IndexOf (i)) {
594 after->Remove (*idx);
595 found = true;
596 }
597 }
598 Require (found);
599 DeriveConnectionDefaultOptionsFromEffectiveOptions_ ();
600}
#define AssertNotImplemented()
Definition Assertions.h:402
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
Definition Assertions.h:439
#define AssertNotReached()
Definition Assertions.h:356
CommonStatistics< T > ComputeCommonStatistics(const ITERATOR_OF_T &start, ITERATOR_OF_T2 &&end)
handy aggregation of several common random-variable statistics/measurements.
const OT & NullCoalesce(const OT &l, const OT &r)
return one of l, or r, with first preference for which is engaged, and second preference for left-to-...
Definition Optional.inl:134
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
#define DbgTrace
Definition Trace.h:317
#define Stroika_Foundation_Debug_OptionalizeTraceArgs(...)
Definition Trace.h:278
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
Definition String.h:201
nonvirtual String SubString(SZ from) const
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
A generalization of a vector: a container whose elements are keyed by the natural numbers.
Set<T> is a container of T, where once an item is added, additionally adds () do nothing.
nonvirtual Statistics GetCurrentStatistics() const
nonvirtual TaskType AddTask(const TaskType &task, const optional< Characters::String > &name=nullopt)
nonvirtual unsigned int GetPoolSize() const
nonvirtual void SetAutomaticTCPDisconnectOnClose(const optional< Time::DurationSeconds > &waitFor) const
roughly equivalent to Association<String,String>, except that the class is smart about certain keys a...
Definition Headers.h:129
Common::Property< optional< Containers::Set< String > > > vary
Definition Headers.h:432
Duration is a chrono::duration<double> (=.
Definition Duration.h:96
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
Definition Iterable.h:237
Common::ReadOnlyProperty< Statistics > statistics
Common::ReadOnlyProperty< ConnectionStatsCollection > connections
Common::ReadOnlyProperty< Traversal::Iterable< SocketAddress > > bindings
Return the socket addresses the webserver (connection manager) is listening on (to serve web content)...
Common::Property< Sequence< Interceptor > > earlyInterceptors
Common::Property< Sequence< Interceptor > > beforeInterceptors
Common::Property< optional< Interceptor > > defaultErrorHandler
Common::ReadOnlyProperty< const Options & > options
Common::Property< Sequence< Interceptor > > afterInterceptors
String ToString(T &&t, ARGS... args)
Return a debug-friendly, display version of the argument: not guaranteed parsable or usable except fo...
Definition ToString.inl:465
Ptr New(const function< void()> &fun2CallOnce, const optional< Characters::String > &name, const optional< Configuration > &configuration)
Definition Thread.cpp:961
nonvirtual Characters::String ToString() const
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...
optional< bool > fAutoComputeETagResponse
sets the initial value for each Response. Harmless except for the slight performance cost (wont alway...
optional< Containers::Set< HTTP::ContentEncoding > > fSupportedCompressionEncodings
override the set of compression encodings the WebServer supports (default is all the Stroika implemen...
optional< size_t > fAutomaticTransferChunkSize
controls whether to use Transfer-Coding: chunked or not