Stroika Library 3.0d23x
 
Loading...
Searching...
No Matches
BufferedOutputStream.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "InternallySynchronizedOutputStream.h"
9
10namespace Stroika::Foundation::Streams::BufferedOutputStream {
11
12 namespace Private_ {
13 // note note using block-allocation cuz aggregates kDefaultBufSize_ sized buffer
14 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
15 class Rep_ : public IRep_<ELEMENT_TYPE> {
16 public:
17 Rep_ (const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut)
18 : fRealOut_{realOut}
19 {
20 }
21 ~Rep_ ()
22 {
23 if (IsOpenWrite ()) {
24 IgnoreExceptionsForCall (Flush ());
25 }
26 WeakAssert (fUnwrittenAppends_.size () == 0); // advisory - not quite right - could happen if a flush exception was eaten (@todo clean this up)
27 }
28
29 public:
30 virtual size_t GetBufferSize () const override
31 {
32 Debug::AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
33 return fUnwrittenAppends_.capacity ();
34 }
35 virtual void SetBufferSize (size_t bufSize) override
36 {
37 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
38 bufSize = Math::AtLeast (bufSize, INLINE_BUF_SIZE);
39 if (bufSize < fUnwrittenAppends_.size ()) {
40 Flush_ (); // this logic only write because stream not seekable, and buffer is for unwritten appends
41 }
42 fUnwrittenAppends_.reserve (bufSize);
43 }
44
45 public:
46 // Throws away all data about to be written (buffered). Once this is called, its illegal to call Flush or another write
47 nonvirtual void Abort ()
48 {
49 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
50 fAborted_ = true; // for debug sake track this
51 fUnwrittenAppends_.clear ();
52 }
53 virtual bool IsSeekable () const override
54 {
55 return false; // @todo - COULD be seekable if underlying fRealOut_ was!!!
56 }
57 virtual void CloseWrite () override
58 {
59 if (IsOpenWrite ()) {
60 Flush ();
61 fRealOut_.Close ();
62 }
63 Assert (fRealOut_ == nullptr);
64 Ensure (not IsOpenWrite ());
65 }
66 virtual bool IsOpenWrite () const override
67 {
68 return fRealOut_ != nullptr;
69 }
70 virtual SeekOffsetType GetWriteOffset () const override
71 {
73 Require (IsOpenWrite ());
74 return 0;
75 }
76 virtual SeekOffsetType SeekWrite ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset) override
77 {
78 RequireNotReached (); // cuz we are not seekable, but could be changed/improved
79 Require (IsOpenWrite ());
80 return 0;
81 }
82 virtual void Flush () override
83 {
84 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
85 Require (IsOpenWrite ());
86 Flush_ ();
87 }
88 // pointer must refer to valid memory at least bufSize long, and cannot be nullptr. BufSize must always be >= 1.
89 // Writes always succeed fully or throw.
90 virtual void Write (span<const ELEMENT_TYPE> elts) override
91 {
92 Require (not elts.empty ()); // for OutputStream<byte> - this function requires non-empty write
93 Require (not fAborted_);
94 Require (IsOpenWrite ());
95 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
96 /*
97 * Minimize the number of writes at the possible cost of extra copying.
98 *
99 * See if there is room in the buffer, and use it up. Only when no more room do we flush.
100 */
101 size_t bufSpaceRemaining = fUnwrittenAppends_.capacity () - fUnwrittenAppends_.size ();
102 size_t size2WriteRemaining = elts.size ();
103 size_t copy2Buffer = min (bufSpaceRemaining, size2WriteRemaining);
104#if qStroika_Foundation_Debug_AssertionsChecked
105 size_t oldCap = fUnwrittenAppends_.capacity ();
106#endif
107 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data (), elts.data () + copy2Buffer);
108#if qStroika_Foundation_Debug_AssertionsChecked
109 Assert (oldCap == fUnwrittenAppends_.capacity ());
110#endif
111
112 Assert (size2WriteRemaining >= copy2Buffer);
113 size2WriteRemaining -= copy2Buffer;
114
115 /*
116 * At this point - either the buffer is full, OR we are done writing. EITHER way - if the buffer is full - we may as well write it now.
117 */
118 if (fUnwrittenAppends_.capacity () == fUnwrittenAppends_.size ()) {
119 Flush_ ();
120 Assert (fUnwrittenAppends_.empty ());
121 }
122#if qStroika_Foundation_Debug_AssertionsChecked
123 Assert (oldCap == fUnwrittenAppends_.capacity ());
124#endif
125
126 // If the remaining will fit in the buffer, then buffer. But if it won't - no point in using the buffer - just write directly to avoid the copy.
127 // And no point - even if equal to buffer size - since it won't save any writes...
128 if (size2WriteRemaining == 0) {
129 // DONE
130 }
131 else if (size2WriteRemaining < fUnwrittenAppends_.capacity ()) {
132 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data () + copy2Buffer, elts.data () + elts.size ());
133 }
134 else {
135 fRealOut_.Write (elts.subspan (copy2Buffer));
136 }
137 }
138
139 private:
140 nonvirtual void Flush_ ()
141 {
142 if (fAborted_) {
143 fUnwrittenAppends_.clear ();
144 }
145 else {
146 if (not fUnwrittenAppends_.empty ()) {
147 fRealOut_.Write (span{fUnwrittenAppends_});
148 fUnwrittenAppends_.clear ();
149 }
150 fRealOut_.Flush ();
151 }
152 Ensure (fUnwrittenAppends_.empty ());
153 }
154
155 private:
156 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fUnwrittenAppends_{};
157 typename OutputStream::Ptr<ELEMENT_TYPE> fRealOut_{};
158 bool fAborted_{false};
159 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
160 };
161 }
162
163 /*
164 ********************************************************************************
165 ********************* Streams::BufferedOutputStream::New ***********************
166 ********************************************************************************
167 */
168 template <typename ELEMENT_TYPE>
169 inline auto New (const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut, const optional<size_t>& bufferSize) -> Ptr<ELEMENT_TYPE>
170 {
171 if (bufferSize and *bufferSize == 0) {
172 return Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::Rep_<ELEMENT_TYPE, 0>> (realOut)};
173 }
174 else if (bufferSize and *bufferSize <= 4 * 1024) {
175 return Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> (realOut)};
176 }
177 else {
178 Ptr<ELEMENT_TYPE> p{Memory::MakeSharedPtr<Private_::Rep_<ELEMENT_TYPE>> (realOut)};
179 if (bufferSize) {
180 p.SetBufferSize (*bufferSize);
181 }
182 return p;
183 }
184 }
185 template <typename ELEMENT_TYPE>
186 inline auto New (Execution::InternallySynchronized internallySynchronized, const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut,
187 const optional<size_t>& bufferSize) -> Ptr<ELEMENT_TYPE>
188 {
189 switch (internallySynchronized) {
190 case Execution::eInternallySynchronized: {
191 if (bufferSize and *bufferSize == 0) {
192 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 0>> ({}, realOut)};
193 }
194 else if (bufferSize and *bufferSize <= 4 * 1024) {
195 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> ({}, realOut)};
196 }
197 else {
198 Ptr<ELEMENT_TYPE> p{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE>> ({}, realOut)};
199 if (bufferSize) {
200 p.SetBufferSize (*bufferSize);
201 }
202 return p;
203 }
204 }
205 case Execution::eNotKnownInternallySynchronized:
206 return New<ELEMENT_TYPE> (realOut, bufferSize);
207 default:
209 return nullptr;
210 }
211 }
212
213 /*
214 ********************************************************************************
215 ****************** BufferedOutputStream::Ptr<ELEMENT_TYPE> *********************
216 ********************************************************************************
217 */
218 template <typename ELEMENT_TYPE>
219 inline Ptr<ELEMENT_TYPE>::Ptr (const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
220 : inherited{from}
221 {
222 }
223 template <typename ELEMENT_TYPE>
224 inline size_t Ptr<ELEMENT_TYPE>::GetBufferSize () const
225 {
226 auto rep = this->GetSharedRep_ ();
227 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
228 AssertNotNull (r);
229 return r->GetBufferSize ();
230 }
231 template <typename ELEMENT_TYPE>
232 inline void Ptr<ELEMENT_TYPE>::SetBufferSize (size_t bufSize)
233 {
234 auto rep = this->GetSharedRep_ ();
235 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
236 AssertNotNull (r);
237 r->SetBufferSize (bufSize);
238 }
239 template <typename ELEMENT_TYPE>
241 {
242 auto rep = this->GetSharedRep_ ();
243 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
244 AssertNotNull (r);
245 r->Abort ();
246 }
247 template <typename ELEMENT_TYPE>
248 inline shared_ptr<Private_::IRep_<ELEMENT_TYPE>> Ptr<ELEMENT_TYPE>::GetSharedRep_ () const
249 {
250 return Debug::UncheckedDynamicPointerCast<Private_::IRep_<ELEMENT_TYPE>> (inherited::GetSharedRep ());
251 }
252
253}
#define AssertNotNull(p)
Definition Assertions.h:333
#define RequireNotReached()
Definition Assertions.h:385
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
Definition Assertions.h:438
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
OutputStream<>::Ptr is Smart pointer to a stream-based sink of data.
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
Definition Stream.h:170
void Abort(const Traversal::Iterable< Ptr > &threads)
foreach Thread t: t.Abort ()
Definition Thread.cpp:992