4#include "Stroika/Foundation/StroikaPreComp.h"
6#include "Stroika/Foundation/Containers/Collection.h"
8#include "Stroika/Foundation/Execution/TimeOutException.h"
11#include "ConnectionPool.h"
17using namespace Stroika::Foundation::IO;
23 using Options = Connection::Options;
29 struct DelegatingConnectionRepWithDeleteHook_ : Connection::IRep {
32 function<void (Ptr)> fDeleter;
34 DelegatingConnectionRepWithDeleteHook_ (
const Ptr& delegateTo, function<
void (Ptr)> deleter)
35 : fDelegateTo{delegateTo}
39 DelegatingConnectionRepWithDeleteHook_ (
const DelegatingConnectionRepWithDeleteHook_&) =
delete;
40 virtual ~DelegatingConnectionRepWithDeleteHook_ ()
42 fDeleter (fDelegateTo);
46 nonvirtual DelegatingConnectionRepWithDeleteHook_& operator= (
const DelegatingConnectionRepWithDeleteHook_&) =
delete;
49 virtual Options GetOptions ()
const override
51 return fDelegateTo.GetOptions ();
55 return fDelegateTo.GetTimeout ();
59 fDelegateTo.SetTimeout (timeout);
61 virtual URI GetSchemeAndAuthority ()
const override
65 virtual void SetSchemeAndAuthority (
const URI& schemeAndAuthority)
override
67 fDelegateTo.SetSchemeAndAuthority (schemeAndAuthority);
69 virtual void Close ()
override
75 return fDelegateTo.Send (request);
85ConnectionPool::Options::Options (
const optional<unsigned int>& maxConnections,
const function<
Connection::Ptr ()>& connectionFactory)
86 : fMaxConnections{maxConnections}
87 , fConnectionFactory{connectionFactory}
96class ConnectionPool::Rep_ {
100 Rep_ (
const Options& options)
106 Require (fOutstandingConnections == 0);
109 Connection::Ptr New (
const optional<Time::Duration>& timeout, optional<URI> hint, optional<AllocateGloballyIfTimeout> allocateGloballyOnTimeoutFlag)
122 optional<Connection::Ptr> poolEntryResult;
125 poolEntryResult = FindAndAllocateFromAvailableByURIMatch_ (hint->GetSchemeAndAuthority ());
127 if (not poolEntryResult) {
128 poolEntryResult = FindAndAllocateFromAvailable_ ();
130 if (not poolEntryResult) {
131 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
132 size_t totalAllocated = fAvailableConnections.size () + fOutstandingConnections;
133 if (totalAllocated < fOptions.fMaxConnections) {
134 fAvailableConnections += fOptions.fConnectionFactory ();
139 if (not poolEntryResult and Time::GetTickCount () > timeoutAt) {
141 unique_lock lock{fAvailableConnectionsChanged.fMutex};
142 if (fAvailableConnectionsChanged.wait_until (lock, Time::Pin2SafeSeconds (timeoutAt)) == cv_status::no_timeout) {
146 if (not poolEntryResult) {
148 if (allocateGloballyOnTimeoutFlag) {
149 return fOptions.fConnectionFactory ();
156 return Connection::Ptr{Memory::MakeSharedPtr<DelegatingConnectionRepWithDeleteHook_> (
157 *poolEntryResult, [
this] (
const Ptr& p) { this->AddConnection_ (p); })};
159 optional<Connection::Ptr> FindAndAllocateFromAvailableByURIMatch_ (
const URI& matchScemeAndAuthority)
161 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
162 for (
auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
163 if (i->GetSchemeAndAuthority () == matchScemeAndAuthority) {
164 fAvailableConnections.Remove (i);
165 ++fOutstandingConnections;
171 optional<Connection::Ptr> FindAndAllocateFromAvailable_ ()
173 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
174 for (
auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
176 fAvailableConnections.Remove (i);
177 ++fOutstandingConnections;
184 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
185 fAvailableConnections.Add (p);
186 --fOutstandingConnections;
187 fAvailableConnectionsChanged.notify_all ();
195 size_t fOutstandingConnections{};
206 : fRep_{make_unique<Rep_> (options)}
210ConnectionPool::~ConnectionPool ()
216 return fRep_->New (nullopt, hint, nullopt);
221 return fRep_->New (timeout, hint, nullopt);
226 return fRep_->New (timeout, hint, AllocateGloballyIfTimeout::eAllocateGloballyIfTimeout);
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
static const TimeOutException kThe
nonvirtual Connection::Ptr New(URI hint={})
ConnectionPool(const Options &options={})
nonvirtual URI GetSchemeAndAuthority() const
Duration is a chrono::duration<double> (=.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...