Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
ConnectionPool.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#include "Stroika/Foundation/StroikaPreComp.h"
5
6#include "Stroika/Foundation/Containers/Collection.h"
8#include "Stroika/Foundation/Execution/TimeOutException.h"
9
10#include "ConnectionPool.h"
11
12using namespace Stroika::Foundation;
15using namespace Stroika::Foundation::Execution;
16using namespace Stroika::Foundation::IO;
19
20namespace {
21 using Ptr = Connection::Ptr;
22 using Options = Connection::Options;
23
24 /**
25 * Dynamically wrap this around a connection pool entry, so that when its destroyed, it returns
26 * the underlying entry to the pool
27 */
28 struct DelegatingConnectionRepWithDeleteHook_ : Connection::IRep {
29
30 Ptr fDelegateTo;
31 function<void (Ptr)> fDeleter; // call on delete
32
33 DelegatingConnectionRepWithDeleteHook_ (const Ptr& delegateTo, function<void (Ptr)> deleter)
34 : fDelegateTo{delegateTo}
35 , fDeleter{deleter}
36 {
37 }
38 DelegatingConnectionRepWithDeleteHook_ (const DelegatingConnectionRepWithDeleteHook_&) = delete;
39 virtual ~DelegatingConnectionRepWithDeleteHook_ ()
40 {
41 fDeleter (fDelegateTo);
42 }
43
44 public:
45 nonvirtual DelegatingConnectionRepWithDeleteHook_& operator= (const DelegatingConnectionRepWithDeleteHook_&) = delete;
46
47 public:
48 virtual Options GetOptions () const override
49 {
50 return fDelegateTo.GetOptions ();
51 }
52 virtual Time::DurationSeconds GetTimeout () const override
53 {
54 return fDelegateTo.GetTimeout ();
55 }
56 virtual void SetTimeout (Time::DurationSeconds timeout) override
57 {
58 fDelegateTo.SetTimeout (timeout);
59 }
60 virtual URI GetSchemeAndAuthority () const override
61 {
62 return fDelegateTo.GetSchemeAndAuthority ();
63 }
64 virtual void SetSchemeAndAuthority (const URI& schemeAndAuthority) override
65 {
66 fDelegateTo.SetSchemeAndAuthority (schemeAndAuthority);
67 }
68 virtual void Close () override
69 {
70 fDelegateTo.Close ();
71 }
72 virtual Response Send (const Request& request) override
73 {
74 return fDelegateTo.Send (request);
75 }
76 };
77}
78
79/*
80 ********************************************************************************
81 ********************* Transfer::ConnectionPool::Options ************************
82 ********************************************************************************
83 */
84ConnectionPool::Options::Options (const optional<unsigned int>& maxConnections, const function<Connection::Ptr ()>& connectionFactory)
85 : fMaxConnections{maxConnections}
86 , fConnectionFactory{connectionFactory}
87{
88}
89
90/*
91 ********************************************************************************
92 ************************ Transfer::ConnectionPool::Rep_ ************************
93 ********************************************************************************
94 */
95class ConnectionPool::Rep_ {
96public:
97 Options fOptions;
98
99 Rep_ (const Options& options)
100 : fOptions{options}
101 {
102 }
103 ~Rep_ ()
104 {
105 Require (fOutstandingConnections == 0); // otherwise when they are destroyed, they will try to add to a list that
106 // no longer exists...
107 }
108 Connection::Ptr New (const optional<Time::Duration>& timeout, optional<URI> hint, optional<AllocateGloballyIfTimeout> allocateGloballyOnTimeoutFlag)
109 {
110 /*
111 * Maintain an LRU(like) list. Not strictly LRU, because we want to walk/pick not arbitrarily, but by URI matching.
112 * This is why we cannot use the Stroika LRUCache class.
113 *
114 * Add items dynamically to the list, but never more than the max.
115 *
116 * If failed to find a match, wait (if argument given).
117 *
118 * If still failed, either throw or allocate new connection (again depending on function argument).
119 */
120 Time::TimePointSeconds timeoutAt = Time::GetTickCount () + timeout.value_or (0s);
121 optional<Connection::Ptr> poolEntryResult;
122 again:
123 if (hint) {
124 poolEntryResult = FindAndAllocateFromAvailableByURIMatch_ (hint->GetSchemeAndAuthority ());
125 }
126 if (not poolEntryResult) {
127 poolEntryResult = FindAndAllocateFromAvailable_ ();
128 }
129 if (not poolEntryResult) {
130 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
131 size_t totalAllocated = fAvailableConnections.size () + fOutstandingConnections;
132 if (totalAllocated < fOptions.fMaxConnections) {
133 fAvailableConnections += fOptions.fConnectionFactory ();
134 goto again; // multithreaded, someone else could allocate, or return a better match
135 // no need to notify_all () since we will try again anyhow
136 }
137 }
138 if (not poolEntryResult and Time::GetTickCount () > timeoutAt) {
139 // Let's see if we can wait a little
140 unique_lock lock{fAvailableConnectionsChanged.fMutex};
141 if (fAvailableConnectionsChanged.wait_until (lock, Time::Pin2SafeSeconds (timeoutAt)) == cv_status::no_timeout) {
142 goto again; // a new one maybe available
143 }
144 }
145 if (not poolEntryResult) {
146 // Here the rubber hits the road. We didn't find a free entry so we allocate, or throw
147 if (allocateGloballyOnTimeoutFlag) {
148 return fOptions.fConnectionFactory ();
149 }
150 else {
152 }
153 }
154 // wrap the connection-ptr in an envelope that will restore the connection to the pool
155 return Connection::Ptr{
156 make_shared<DelegatingConnectionRepWithDeleteHook_> (*poolEntryResult, [this] (const Ptr& p) { this->AddConnection_ (p); })};
157 }
158 optional<Connection::Ptr> FindAndAllocateFromAvailableByURIMatch_ (const URI& matchScemeAndAuthority)
159 {
160 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
161 for (auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
162 if (i->GetSchemeAndAuthority () == matchScemeAndAuthority) {
163 fAvailableConnections.Remove (i);
164 ++fOutstandingConnections;
165 return *i;
166 }
167 }
168 return nullopt;
169 }
170 optional<Connection::Ptr> FindAndAllocateFromAvailable_ ()
171 {
172 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
173 for (auto i = fAvailableConnections.begin (); i != fAvailableConnections.end (); ++i) {
174 auto result = *i;
175 fAvailableConnections.Remove (i);
176 ++fOutstandingConnections;
177 return result;
178 }
179 return nullopt;
180 }
181 void AddConnection_ (const Connection::Ptr p)
182 {
183 lock_guard critSec{fAvailableConnectionsChanged.fMutex};
184 fAvailableConnections.Add (p);
185 --fOutstandingConnections;
186 fAvailableConnectionsChanged.notify_all ();
187 }
188
189 // Use a collection instead of an LRUCache, because the 'key' can change as the connection is used, and this
190 // isn't handled well by LRUCache code (moving buckets etc automatically). And it doesn't handle the case
191 // where there is no URL/hint
192
193 Collection<Connection::Ptr> fAvailableConnections;
194 size_t fOutstandingConnections{}; // # connections handed out : this number + fAvailableConnections.size () must be less_or_equal to fOptions.GetMaxConnections - but
195 // then don't actually allocate extra connections until/unless needed
196 ConditionVariable<> fAvailableConnectionsChanged;
197};
198
199/*
200 ********************************************************************************
201 ************************** Transfer::ConnectionPool ****************************
202 ********************************************************************************
203 */
204ConnectionPool::ConnectionPool (const Options& options)
205 : fRep_{make_unique<Rep_> (options)}
206{
207}
208
209ConnectionPool::~ConnectionPool ()
210{
211}
212
214{
215 return fRep_->New (nullopt, hint, nullopt);
216}
217
219{
220 return fRep_->New (timeout, hint, nullopt);
221}
222
223Connection::Ptr ConnectionPool::New (AllocateGloballyIfTimeout, const Time::Duration& timeout, URI hint)
224{
225 return fRep_->New (timeout, hint, AllocateGloballyIfTimeout::eAllocateGloballyIfTimeout);
226}
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Definition Realtime.h:82
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Definition Realtime.h:57
A Collection<T> is a container to manage an un-ordered collection of items, without equality defined ...
Definition Collection.h:102
nonvirtual URI GetSchemeAndAuthority() const
Definition URI.inl:90
Duration is a chrono::duration<double> (=.
Definition Duration.h:96
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
Definition Throw.inl:43