4#include "Stroika/Foundation/StroikaPreComp.h"
6#include "Stroika/Foundation/Containers/Collection.h"
8#include "Stroika/Foundation/Execution/TimeOutException.h"
10#include "ConnectionPool.h"
16using namespace Stroika::Foundation::IO;
22 using Options = Connection::Options;
28 struct DelegatingConnectionRepWithDeleteHook_ : Connection::IRep {
31 function<void (Ptr)> fDeleter;
33 DelegatingConnectionRepWithDeleteHook_ (
const Ptr& delegateTo, function<
void (Ptr)> deleter)
34 : fDelegateTo{delegateTo}
38 DelegatingConnectionRepWithDeleteHook_ (
const DelegatingConnectionRepWithDeleteHook_&) =
delete;
39 virtual ~DelegatingConnectionRepWithDeleteHook_ ()
41 fDeleter (fDelegateTo);
45 nonvirtual DelegatingConnectionRepWithDeleteHook_& operator= (
const DelegatingConnectionRepWithDeleteHook_&) =
delete;
48 virtual Options GetOptions ()
const override
50 return fDelegateTo.GetOptions ();
54 return fDelegateTo.GetTimeout ();
58 fDelegateTo.SetTimeout (timeout);
60 virtual URI GetSchemeAndAuthority ()
const override
64 virtual void SetSchemeAndAuthority (
const URI& schemeAndAuthority)
override
66 fDelegateTo.SetSchemeAndAuthority (schemeAndAuthority);
68 virtual void Close ()
override
74 return fDelegateTo.Send (request);
84ConnectionPool::Options::Options (
const optional<unsigned int>& maxConnections,
const function<
Connection::Ptr ()>& connectionFactory)
85 : fMaxConnections{maxConnections}
86 , fConnectionFactory{connectionFactory}
95class ConnectionPool::Rep_ {
99 Rep_ (
const Options& options)
105 Require (fOutstandingConnections == 0);
108 Connection::Ptr New (
const optional<Time::Duration>& timeout, optional<URI> hint, optional<AllocateGloballyIfTimeout> allocateGloballyOnTimeoutFlag)
121 optional<Connection::Ptr> poolEntryResult;
124 poolEntryResult = FindAndAllocateFromAvailableByURIMatch_ (hint->GetSchemeAndAuthority ());
126 if (not poolEntryResult) {
127 poolEntryResult = FindAndAllocateFromAvailable_ ();
129 if (not poolEntryResult) {
130 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
131 size_t totalAllocated = fAvailableConnections.size () + fOutstandingConnections;
132 if (totalAllocated < fOptions.fMaxConnections) {
133 fAvailableConnections += fOptions.fConnectionFactory ();
138 if (not poolEntryResult and Time::GetTickCount () > timeoutAt) {
140 unique_lock lock{fAvailableConnectionsChanged.fMutex};
141 if (fAvailableConnectionsChanged.wait_until (lock, Time::Pin2SafeSeconds (timeoutAt)) == cv_status::no_timeout) {
145 if (not poolEntryResult) {
147 if (allocateGloballyOnTimeoutFlag) {
148 return fOptions.fConnectionFactory ();
156 make_shared<DelegatingConnectionRepWithDeleteHook_> (*poolEntryResult, [
this] (
const Ptr& p) { this->AddConnection_ (p); })};
158 optional<Connection::Ptr> FindAndAllocateFromAvailableByURIMatch_ (
const URI& matchScemeAndAuthority)
160 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
161 for (
auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
162 if (i->GetSchemeAndAuthority () == matchScemeAndAuthority) {
163 fAvailableConnections.Remove (i);
164 ++fOutstandingConnections;
170 optional<Connection::Ptr> FindAndAllocateFromAvailable_ ()
172 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
173 for (
auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
175 fAvailableConnections.Remove (i);
176 ++fOutstandingConnections;
183 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
184 fAvailableConnections.Add (p);
185 --fOutstandingConnections;
186 fAvailableConnectionsChanged.notify_all ();
194 size_t fOutstandingConnections{};
205 : fRep_{make_unique<Rep_> (options)}
209ConnectionPool::~ConnectionPool ()
215 return fRep_->New (nullopt, hint, nullopt);
220 return fRep_->New (timeout, hint, nullopt);
225 return fRep_->New (timeout, hint, AllocateGloballyIfTimeout::eAllocateGloballyIfTimeout);
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
static const TimeOutException kThe
nonvirtual Connection::Ptr New(URI hint={})
ConnectionPool(const Options &options={})
nonvirtual URI GetSchemeAndAuthority() const
Duration is a chrono::duration<double> (=.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...