Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
BufferedOutputStream.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
4#include "InternallySynchronizedOutputStream.h"
8
9namespace Stroika::Foundation::Streams::BufferedOutputStream {
10
11 namespace Private_ {
12 // note note using block-allocation cuz aggregates kDefaultBufSize_ sized buffer
13 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
14 class Rep_ : public IRep_<ELEMENT_TYPE> {
15 public:
16 Rep_ (const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut)
17 : fRealOut_{realOut}
18 {
19 }
20 ~Rep_ ()
21 {
22 if (IsOpenWrite ()) {
23 IgnoreExceptionsForCall (Flush ());
24 }
25 WeakAssert (fUnwrittenAppends_.size () == 0); // advisory - not quite right - could happen if a flush exception was eaten (@todo clean this up)
26 }
27
28 public:
29 virtual size_t GetBufferSize () const override
30 {
31 Debug::AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
32 return fUnwrittenAppends_.capacity ();
33 }
34 virtual void SetBufferSize (size_t bufSize) override
35 {
36 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
37 bufSize = Math::AtLeast (bufSize, INLINE_BUF_SIZE);
38 if (bufSize < fUnwrittenAppends_.size ()) {
39 Flush_ (); // this logic only write because stream not seekable, and buffer is for unwritten appends
40 }
41 fUnwrittenAppends_.reserve (bufSize);
42 }
43
44 public:
45 // Throws away all data about to be written (buffered). Once this is called, its illegal to call Flush or another write
46 nonvirtual void Abort ()
47 {
48 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
49 fAborted_ = true; // for debug sake track this
50 fUnwrittenAppends_.clear ();
51 }
52 virtual bool IsSeekable () const override
53 {
54 return false; // @todo - COULD be seekable if underlying fRealOut_ was!!!
55 }
56 virtual void CloseWrite () override
57 {
58 if (IsOpenWrite ()) {
59 Flush ();
60 fRealOut_.Close ();
61 }
62 Assert (fRealOut_ == nullptr);
63 Ensure (not IsOpenWrite ());
64 }
65 virtual bool IsOpenWrite () const override
66 {
67 return fRealOut_ != nullptr;
68 }
69 virtual SeekOffsetType GetWriteOffset () const override
70 {
72 Require (IsOpenWrite ());
73 return 0;
74 }
75 virtual SeekOffsetType SeekWrite ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset) override
76 {
77 RequireNotReached (); // cuz we are not seekable, but could be changed/improved
78 Require (IsOpenWrite ());
79 return 0;
80 }
81 virtual void Flush () override
82 {
83 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
84 Require (IsOpenWrite ());
85 Flush_ ();
86 }
87 // pointer must refer to valid memory at least bufSize long, and cannot be nullptr. BufSize must always be >= 1.
88 // Writes always succeed fully or throw.
89 virtual void Write (span<const ELEMENT_TYPE> elts) override
90 {
91 Require (not elts.empty ()); // for OutputStream<byte> - this function requires non-empty write
92 Require (not fAborted_);
93 Require (IsOpenWrite ());
94 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
95 /*
96 * Minimize the number of writes at the possible cost of extra copying.
97 *
98 * See if there is room in the buffer, and use it up. Only when no more room do we flush.
99 */
100 size_t bufSpaceRemaining = fUnwrittenAppends_.capacity () - fUnwrittenAppends_.size ();
101 size_t size2WriteRemaining = elts.size ();
102 size_t copy2Buffer = min (bufSpaceRemaining, size2WriteRemaining);
103#if qStroika_Foundation_Debug_AssertionsChecked
104 size_t oldCap = fUnwrittenAppends_.capacity ();
105#endif
106 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data (), elts.data () + copy2Buffer);
107#if qStroika_Foundation_Debug_AssertionsChecked
108 Assert (oldCap == fUnwrittenAppends_.capacity ());
109#endif
110
111 Assert (size2WriteRemaining >= copy2Buffer);
112 size2WriteRemaining -= copy2Buffer;
113
114 /*
115 * At this point - either the buffer is full, OR we are done writing. EITHER way - if the buffer is full - we may as well write it now.
116 */
117 if (fUnwrittenAppends_.capacity () == fUnwrittenAppends_.size ()) {
118 Flush_ ();
119 Assert (fUnwrittenAppends_.empty ());
120 }
121#if qStroika_Foundation_Debug_AssertionsChecked
122 Assert (oldCap == fUnwrittenAppends_.capacity ());
123#endif
124
125 // If the remaining will fit in the buffer, then buffer. But if it won't - no point in using the buffer - just write directly to avoid the copy.
126 // And no point - even if equal to buffer size - since it won't save any writes...
127 if (size2WriteRemaining == 0) {
128 // DONE
129 }
130 else if (size2WriteRemaining < fUnwrittenAppends_.capacity ()) {
131 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data () + copy2Buffer, elts.data () + elts.size ());
132 }
133 else {
134 fRealOut_.Write (elts.subspan (copy2Buffer));
135 }
136 }
137
138 private:
139 nonvirtual void Flush_ ()
140 {
141 if (fAborted_) {
142 fUnwrittenAppends_.clear ();
143 }
144 else {
145 if (not fUnwrittenAppends_.empty ()) {
146 fRealOut_.Write (span{fUnwrittenAppends_});
147 fUnwrittenAppends_.clear ();
148 }
149 fRealOut_.Flush ();
150 }
151 Ensure (fUnwrittenAppends_.empty ());
152 }
153
154 private:
155 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fUnwrittenAppends_{};
156 typename OutputStream::Ptr<ELEMENT_TYPE> fRealOut_{};
157 bool fAborted_{false};
158 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
159 };
160 }
161
162 /*
163 ********************************************************************************
164 ********************* Streams::BufferedOutputStream::New ***********************
165 ********************************************************************************
166 */
167 template <typename ELEMENT_TYPE>
168 inline auto New (const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut, const optional<size_t>& bufferSize) -> Ptr<ELEMENT_TYPE>
169 {
170 if (bufferSize and *bufferSize == 0) {
171 return Ptr<ELEMENT_TYPE>{make_shared<Private_::Rep_<ELEMENT_TYPE, 0>> (realOut)};
172 }
173 else if (bufferSize and *bufferSize <= 4 * 1024) {
174 return Ptr<ELEMENT_TYPE>{make_shared<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> (realOut)};
175 }
176 else {
177 Ptr<ELEMENT_TYPE> p{make_shared<Private_::Rep_<ELEMENT_TYPE>> (realOut)};
178 if (bufferSize) {
179 p.SetBufferSize (*bufferSize);
180 }
181 return p;
182 }
183 }
184 template <typename ELEMENT_TYPE>
185 inline auto New (Execution::InternallySynchronized internallySynchronized, const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut,
186 const optional<size_t>& bufferSize) -> Ptr<ELEMENT_TYPE>
187 {
188 switch (internallySynchronized) {
189 case Execution::eInternallySynchronized: {
190 if (bufferSize and *bufferSize == 0) {
191 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 0>> ({}, realOut)};
192 }
193 else if (bufferSize and *bufferSize <= 4 * 1024) {
194 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> ({}, realOut)};
195 }
196 else {
197 Ptr<ELEMENT_TYPE> p{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE>> ({}, realOut)};
198 if (bufferSize) {
199 p.SetBufferSize (*bufferSize);
200 }
201 return p;
202 }
203 }
204 case Execution::eNotKnownInternallySynchronized:
205 return New<ELEMENT_TYPE> (realOut, bufferSize);
206 default:
208 return nullptr;
209 }
210 }
211
212 /*
213 ********************************************************************************
214 ****************** BufferedOutputStream::Ptr<ELEMENT_TYPE> *********************
215 ********************************************************************************
216 */
217 template <typename ELEMENT_TYPE>
218 inline Ptr<ELEMENT_TYPE>::Ptr (const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
219 : inherited{from}
220 {
221 }
222 template <typename ELEMENT_TYPE>
223 inline size_t Ptr<ELEMENT_TYPE>::GetBufferSize () const
224 {
225 auto rep = this->GetSharedRep_ ();
226 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
227 AssertNotNull (r);
228 return r->GetBufferSize ();
229 }
230 template <typename ELEMENT_TYPE>
231 inline void Ptr<ELEMENT_TYPE>::SetBufferSize (size_t bufSize)
232 {
233 auto rep = this->GetSharedRep_ ();
234 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
235 AssertNotNull (r);
236 r->SetBufferSize (bufSize);
237 }
238 template <typename ELEMENT_TYPE>
240 {
241 auto rep = this->GetSharedRep_ ();
242 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
243 AssertNotNull (r);
244 r->Abort ();
245 }
246 template <typename ELEMENT_TYPE>
247 inline shared_ptr<Private_::IRep_<ELEMENT_TYPE>> Ptr<ELEMENT_TYPE>::GetSharedRep_ () const
248 {
249 return Debug::UncheckedDynamicPointerCast<Private_::IRep_<ELEMENT_TYPE>> (inherited::GetSharedRep ());
250 }
251
252}
#define AssertNotNull(p)
Definition Assertions.h:333
#define RequireNotReached()
Definition Assertions.h:385
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
Definition Assertions.h:438
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
OutputStream<>::Ptr is Smart pointer to a stream-based sink of data.
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
Definition Stream.h:170
void Abort(const Traversal::Iterable< Ptr > &threads)
foreach Thread t: t.Abort ()
Definition Thread.cpp:987