Stroika Library 3.0d23x
 
Loading...
Searching...
No Matches
ConnectionPool.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "Stroika/Foundation/StroikaPreComp.h"
5
6#include "Stroika/Foundation/Containers/Collection.h"
8#include "Stroika/Foundation/Execution/TimeOutException.h"
10
11#include "ConnectionPool.h"
12
13using namespace Stroika::Foundation;
16using namespace Stroika::Foundation::Execution;
17using namespace Stroika::Foundation::IO;
20
21namespace {
22 using Ptr = Connection::Ptr;
23 using Options = Connection::Options;
24
25 /**
26 * Dynamically wrap this around a connection pool entry, so that when its destroyed, it returns
27 * the underlying entry to the pool
28 */
29 struct DelegatingConnectionRepWithDeleteHook_ : Connection::IRep {
30
31 Ptr fDelegateTo;
32 function<void (Ptr)> fDeleter; // call on delete
33
34 DelegatingConnectionRepWithDeleteHook_ (const Ptr& delegateTo, function<void (Ptr)> deleter)
35 : fDelegateTo{delegateTo}
36 , fDeleter{deleter}
37 {
38 }
39 DelegatingConnectionRepWithDeleteHook_ (const DelegatingConnectionRepWithDeleteHook_&) = delete;
40 virtual ~DelegatingConnectionRepWithDeleteHook_ ()
41 {
42 fDeleter (fDelegateTo);
43 }
44
45 public:
46 nonvirtual DelegatingConnectionRepWithDeleteHook_& operator= (const DelegatingConnectionRepWithDeleteHook_&) = delete;
47
48 public:
49 virtual Options GetOptions () const override
50 {
51 return fDelegateTo.GetOptions ();
52 }
53 virtual Time::DurationSeconds GetTimeout () const override
54 {
55 return fDelegateTo.GetTimeout ();
56 }
57 virtual void SetTimeout (Time::DurationSeconds timeout) override
58 {
59 fDelegateTo.SetTimeout (timeout);
60 }
61 virtual URI GetSchemeAndAuthority () const override
62 {
63 return fDelegateTo.GetSchemeAndAuthority ();
64 }
65 virtual void SetSchemeAndAuthority (const URI& schemeAndAuthority) override
66 {
67 fDelegateTo.SetSchemeAndAuthority (schemeAndAuthority);
68 }
69 virtual void Close () override
70 {
71 fDelegateTo.Close ();
72 }
73 virtual Response Send (const Request& request) override
74 {
75 return fDelegateTo.Send (request);
76 }
77 };
78}
79
80/*
81 ********************************************************************************
82 ********************* Transfer::ConnectionPool::Options ************************
83 ********************************************************************************
84 */
85ConnectionPool::Options::Options (const optional<unsigned int>& maxConnections, const function<Connection::Ptr ()>& connectionFactory)
86 : fMaxConnections{maxConnections}
87 , fConnectionFactory{connectionFactory}
88{
89}
90
91/*
92 ********************************************************************************
93 ************************ Transfer::ConnectionPool::Rep_ ************************
94 ********************************************************************************
95 */
96class ConnectionPool::Rep_ {
97public:
98 Options fOptions;
99
100 Rep_ (const Options& options)
101 : fOptions{options}
102 {
103 }
104 ~Rep_ ()
105 {
106 Require (fOutstandingConnections == 0); // otherwise when they are destroyed, they will try to add to a list that
107 // no longer exists...
108 }
109 Connection::Ptr New (const optional<Time::Duration>& timeout, optional<URI> hint, optional<AllocateGloballyIfTimeout> allocateGloballyOnTimeoutFlag)
110 {
111 /*
112 * Maintain an LRU(like) list. Not strictly LRU, because we want to walk/pick not arbitrarily, but by URI matching.
113 * This is why we cannot use the Stroika LRUCache class.
114 *
115 * Add items dynamically to the list, but never more than the max.
116 *
117 * If failed to find a match, wait (if argument given).
118 *
119 * If still failed, either throw or allocate new connection (again depending on function argument).
120 */
121 Time::TimePointSeconds timeoutAt = Time::GetTickCount () + timeout.value_or (0s);
122 optional<Connection::Ptr> poolEntryResult;
123 again:
124 if (hint) {
125 poolEntryResult = FindAndAllocateFromAvailableByURIMatch_ (hint->GetSchemeAndAuthority ());
126 }
127 if (not poolEntryResult) {
128 poolEntryResult = FindAndAllocateFromAvailable_ ();
129 }
130 if (not poolEntryResult) {
131 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
132 size_t totalAllocated = fAvailableConnections.size () + fOutstandingConnections;
133 if (totalAllocated < fOptions.fMaxConnections) {
134 fAvailableConnections += fOptions.fConnectionFactory ();
135 goto again; // multithreaded, someone else could allocate, or return a better match
136 // no need to notify_all () since we will try again anyhow
137 }
138 }
139 if (not poolEntryResult and Time::GetTickCount () > timeoutAt) {
140 // Let's see if we can wait a little
141 unique_lock lock{fAvailableConnectionsChanged.fMutex};
142 if (fAvailableConnectionsChanged.wait_until (lock, Time::Pin2SafeSeconds (timeoutAt)) == cv_status::no_timeout) {
143 goto again; // a new one maybe available
144 }
145 }
146 if (not poolEntryResult) {
147 // Here the rubber hits the road. We didn't find a free entry so we allocate, or throw
148 if (allocateGloballyOnTimeoutFlag) {
149 return fOptions.fConnectionFactory ();
150 }
151 else {
153 }
154 }
155 // wrap the connection-ptr in an envelope that will restore the connection to the pool
156 return Connection::Ptr{Memory::MakeSharedPtr<DelegatingConnectionRepWithDeleteHook_> (
157 *poolEntryResult, [this] (const Ptr& p) { this->AddConnection_ (p); })};
158 }
159 optional<Connection::Ptr> FindAndAllocateFromAvailableByURIMatch_ (const URI& matchScemeAndAuthority)
160 {
161 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
162 for (auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
163 if (i->GetSchemeAndAuthority () == matchScemeAndAuthority) {
164 fAvailableConnections.Remove (i);
165 ++fOutstandingConnections;
166 return *i;
167 }
168 }
169 return nullopt;
170 }
171 optional<Connection::Ptr> FindAndAllocateFromAvailable_ ()
172 {
173 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
174 for (auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
175 auto result = *i;
176 fAvailableConnections.Remove (i);
177 ++fOutstandingConnections;
178 return result;
179 }
180 return nullopt;
181 }
182 void AddConnection_ (const Connection::Ptr p)
183 {
184 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
185 fAvailableConnections.Add (p);
186 --fOutstandingConnections;
187 fAvailableConnectionsChanged.notify_all ();
188 }
189
190 // Use a collection instead of an LRUCache, because the 'key' can change as the connection is used, and this
191 // isn't handled well by LRUCache code (moving buckets etc automatically). And it doesn't handle the case
192 // where there is no URL/hint
193
194 Collection<Connection::Ptr> fAvailableConnections;
195 size_t fOutstandingConnections{}; // # connections handed out : this number + fAvailableConnections.size () must be less_or_equal to fOptions.GetMaxConnections - but
196 // then don't actually allocate extra connections until/unless needed
197 ConditionVariable<> fAvailableConnectionsChanged;
198};
199
200/*
201 ********************************************************************************
202 ************************** Transfer::ConnectionPool ****************************
203 ********************************************************************************
204 */
205ConnectionPool::ConnectionPool (const Options& options)
206 : fRep_{make_unique<Rep_> (options)}
207{
208}
209
210ConnectionPool::~ConnectionPool ()
211{
212}
213
215{
216 return fRep_->New (nullopt, hint, nullopt);
217}
218
220{
221 return fRep_->New (timeout, hint, nullopt);
222}
223
224Connection::Ptr ConnectionPool::New (AllocateGloballyIfTimeout, const Time::Duration& timeout, URI hint)
225{
226 return fRep_->New (timeout, hint, AllocateGloballyIfTimeout::eAllocateGloballyIfTimeout);
227}
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
nonvirtual URI GetSchemeAndAuthority() const
Definition URI.inl:90
Duration is a chrono::duration<double> (=.
Definition Duration.h:96
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
Definition Throw.inl:43