Stroika Library 3.0d20
 
Loading...
Searching...
No Matches
BufferedInputStream.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
5#include "Stroika/Foundation/Streams/InternallySynchronizedInputStream.h"
7
8namespace Stroika::Foundation::Streams::BufferedInputStream {
9
10 namespace Private_ {
11
12 [[noreturn]] void ThrowCannotSeekFromEnd_ ();
13
14 // this case easy, delegate to StreamReader to do all the work
15 template <typename ELEMENT_TYPE>
16 class Rep_Seekable_FromSeekable_ : public IRep_<ELEMENT_TYPE> {
17 public:
18 Rep_Seekable_FromSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
19 : fRealIn_{realIn}
20 , fReader_{realIn}
21 {
22 Require (realIn.IsSeekable ());
23 }
24 virtual bool IsSeekable () const override
25 {
26 return true;
27 }
28 virtual void CloseRead () override
29 {
30 if (fRealIn_ != nullptr) {
31 fRealIn_.Close ();
32 }
33 Ensure (not IsOpenRead ());
34 Assert (fRealIn_ == nullptr);
35 }
36 virtual bool IsOpenRead () const override
37 {
38 return fRealIn_ != nullptr;
39 }
40 virtual SeekOffsetType GetReadOffset () const override
41 {
42 Require (IsOpenRead ());
43 return fReader_.GetOffset ();
44 }
45 virtual optional<size_t> AvailableToRead () override
46 {
47 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
48 Require (IsOpenRead ());
49 return fReader_.AvailableToRead (); // since no actual buffering here yet
50 }
51 virtual optional<SeekOffsetType> RemainingLength () override
52 {
53 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
54 Require (IsOpenRead ());
55 return fReader_.RemainingLength ();
56 }
57 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
58 {
59 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
60 Require (IsOpenRead ());
61 return fReader_.Read (intoBuffer, blockFlag);
62 }
63
64 private:
65 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
66 StreamReader<ELEMENT_TYPE> fReader_;
67 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
68 };
69
70 // read the source into one big buffer. Keep it all around, so seekable
71 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
72 class Rep_Seekable_FromUnSeekable_ : public IRep_<ELEMENT_TYPE> {
73 public:
74 Rep_Seekable_FromUnSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
75 : fRealIn_{realIn}
76 {
77 }
78 virtual bool IsSeekable () const override
79 {
80 return true;
81 }
82 virtual void CloseRead () override
83 {
84 if (fRealIn_ != nullptr) {
85 fRealIn_.Close ();
86 }
87 Ensure (not IsOpenRead ());
88 Assert (fRealIn_ == nullptr);
89 }
90 virtual bool IsOpenRead () const override
91 {
92 return fRealIn_ != nullptr;
93 }
94 virtual SeekOffsetType GetReadOffset () const override
95 {
96 Require (IsOpenRead ());
97 return fSeekOffset_;
98 }
99 virtual optional<size_t> AvailableToRead () override
100 {
101 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
102 Require (IsOpenRead ());
103 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
104 return fBufferOfAllReadDataSoFar_.size () - static_cast<size_t> (fSeekOffset_); // don't include what we might get upstream cuz more costly to compute
105 }
106 return fRealIn_.AvailableToRead ();
107 }
108 virtual optional<SeekOffsetType> RemainingLength () override
109 {
110 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
111 Require (IsOpenRead ());
112 if (auto rl = fRealIn_.RemainingLength ()) {
113 return MapOffsetFromReal2Mine_ (*rl);
114 }
115 return nullopt;
116 }
117 virtual auto SeekRead (Whence whence, SignedSeekOffsetType offset) -> SeekOffsetType override
118 {
119 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
120 Require (IsOpenRead ());
121 // @todo - allow seek forward past fBufferOfAllReadDataSoFar_?
122 switch (whence) {
123 case Whence::eFromCurrent:
124 fSeekOffset_ += offset;
125 break;
126 case Whence::eFromStart:
127 fSeekOffset_ = offset;
128 break;
129 case Whence::eFromEnd:
130 if (auto remaining = this->RemainingLength ()) {
131 fSeekOffset_ += static_cast<SignedSeekOffsetType> (*remaining) - offset;
132 break;
133 }
134 else {
135 Private_::ThrowCannotSeekFromEnd_ ();
136 }
137 default:
139 }
140 return fSeekOffset_;
141 }
142 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
143 {
144 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
145 Require (IsOpenRead ());
146 Assert (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ());
147 if (fSeekOffset_ == fBufferOfAllReadDataSoFar_.size ()) [[unlikely]] {
148 ELEMENT_TYPE buf[1024];
149 if (auto r = fRealIn_.Read (span{buf}, blockFlag)) {
150 fBufferOfAllReadDataSoFar_.push_back (*r); // continue, and fall through
151 }
152 else {
153 return nullopt; // no data pre-read, and nothing available upstream
154 }
155 }
156 if (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
157 size_t n2Read = min<size_t> (intoBuffer.size (), static_cast<size_t> (fBufferOfAllReadDataSoFar_.size () - fSeekOffset_));
158 auto result = Memory::CopySpanData (span{fBufferOfAllReadDataSoFar_}.subspan (static_cast<size_t> (fSeekOffset_), n2Read), intoBuffer);
159 Assert (result.size () == n2Read);
160 fSeekOffset_ += n2Read;
161 return result;
162 }
163 return nullopt;
164 }
165
166 private:
167 nonvirtual SeekOffsetType MapOffsetFromReal2Mine_ (SeekOffsetType so) const
168 {
169 return static_cast<SeekOffsetType> (static_cast<SignedSeekOffsetType> (so) + static_cast<SignedSeekOffsetType> (fRealIn_.GetOffset ()) -
170 static_cast<SignedSeekOffsetType> (fSeekOffset_));
171 }
172
173 private:
174 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
175 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fBufferOfAllReadDataSoFar_;
176 SeekOffsetType fSeekOffset_{0}; // always inside fBufferOfAllReadDataSoFar_
177 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
178 };
179
180 // pretty easy/efficient case cuz we can throw away data as we go, and since not seekable, not many cases to analyze
181 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
182 class Rep_UnSeekable_ : public IRep_<ELEMENT_TYPE> {
183 public:
184 Rep_UnSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
185 : fRealIn_{realIn}
186 {
187 }
188 virtual bool IsSeekable () const override
189 {
190 return false;
191 }
192 virtual void CloseRead () override
193 {
194 if (fRealIn_ != nullptr) {
195 fRealIn_.Close ();
196 }
197 Ensure (not IsOpenRead ());
198 Assert (fRealIn_ == nullptr);
199 }
200 virtual bool IsOpenRead () const override
201 {
202 return fRealIn_ != nullptr;
203 }
204 virtual SeekOffsetType GetReadOffset () const override
205 {
206 Require (IsOpenRead ());
207 return fReadOffsetIntoIntermediateBuf_ + fRealIn_.GetOffset ();
208 }
209 virtual optional<size_t> AvailableToRead () override
210 {
211 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
212 Require (IsOpenRead ());
213 size_t n = GetNEltsAlreadyBufferedFromUpstream_ ();
214 if (auto o = fRealIn_.AvailableToRead ()) {
215 // if we KNOW what's available upstream, add to what we've pre-read
216 return *o + n;
217 }
218 else if (n != 0) {
219 return n; // if zero buffered, and nothing KNOWN about upstream, return nullopt
220 }
221 return nullopt; // if nothing upstream available (but not zero/eof)
222 }
223 virtual optional<SeekOffsetType> RemainingLength () override
224 {
225 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
226 Require (IsOpenRead ());
227 if (auto o = fRealIn_.RemainingLength ()) {
228 return *o + GetNEltsAlreadyBufferedFromUpstream_ ();
229 }
230 return nullopt; // if nothing upstream available (but not zero/eof)
231 }
232 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
233 {
234 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
235 Require (IsOpenRead ());
236 auto n = GetNEltsAlreadyBufferedFromUpstream_ ();
237 if (n == 0) [[unlikely]] {
238 // read into fIntermediateBuffer_ (not intoBuffer)- OK to overwrite cuz
239 // no seeking allowed, so we will never re-examine that data/buffer
240 auto bufR = fRealIn_.Read (span{fIntermediateBuffer_}, blockFlag);
241 if (bufR) {
242 // we filled buffer (possibly with zero elements)
243 fReadOffsetIntoIntermediateBuf_ = 0;
244 n = bufR->size ();
245 fIntermediateBuffer_.resize_uninitialized (n);
246 }
247 else {
248 return nullopt; // no new information, don't change state so can Read again
249 }
250 }
251 // if we get here, and n = 0, really EOF, cuz return above on NOT-AVAIL case
252 Assert (n != 0 or fRealIn_.IsAtEOF ());
253 size_t n2Read = Math::AtMost (n, intoBuffer.size ());
254 auto t = Memory::CopySpanData (span{fIntermediateBuffer_}.subspan (fReadOffsetIntoIntermediateBuf_, n2Read), intoBuffer);
255 Assert (t.size () == n2Read);
256 fReadOffsetIntoIntermediateBuf_ += n2Read;
257 return t;
258 }
259
260 private:
261 nonvirtual size_t GetNEltsAlreadyBufferedFromUpstream_ () const
262 {
263 Assert (fReadOffsetIntoIntermediateBuf_ <= fIntermediateBuffer_.size ());
264 return fIntermediateBuffer_.size () - fReadOffsetIntoIntermediateBuf_;
265 }
266
267 private:
268 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
269 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fIntermediateBuffer_;
270 size_t fReadOffsetIntoIntermediateBuf_{0};
271 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
272 };
273 }
274
275 /*
276 ********************************************************************************
277 ********************* Streams::BufferedInputStream::New ************************
278 ********************************************************************************
279 */
280 template <typename ELEMENT_TYPE>
281 inline auto New (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
282 {
283 using PTR = Ptr<ELEMENT_TYPE>;
284 SeekableFlag srcSeekable = realIn.GetSeekability ();
285 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
286 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
287 if (useSeekable == SeekableFlag::eSeekable) {
288 return (srcSeekable == SeekableFlag::eSeekable)
289 ? PTR{make_shared<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> (realIn)}
290 : PTR{make_shared<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
291 }
292 else {
293 return PTR{make_shared<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
294 }
295 }
296 template <typename ELEMENT_TYPE>
297 inline auto New (Execution::InternallySynchronized internallySynchronized, const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
298 optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
299 {
300 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
301 switch (internallySynchronized) {
302 case Execution::eInternallySynchronized: {
303 SeekableFlag srcSeekable = realIn.GetSeekability ();
304 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
305 if (useSeekable == SeekableFlag::eSeekable) {
306 return (srcSeekable == SeekableFlag::eSeekable)
307 ? InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> ({}, realIn)
308 : InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
309 }
310 else {
311 return InternallySynchronizedInputStream::New<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
312 }
313 }
314 case Execution::eNotKnownInternallySynchronized:
315 return New<ELEMENT_TYPE> (realIn, seekable);
316 default:
318 return nullptr;
319 }
320 }
321
322 /*
323 ********************************************************************************
324 ******************* BufferedInputStream::Ptr<ELEMENT_TYPE> *********************
325 ********************************************************************************
326 */
327 template <typename ELEMENT_TYPE>
328 inline Ptr<ELEMENT_TYPE>::Ptr (const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
329 : inherited{from}
330 {
331 }
332
333 template <typename ELEMENT_TYPE>
334 [[deprecated ("Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE> New (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
335 optional<bool> seekable)
336 {
337 optional<SeekableFlag> sf;
338 if (seekable) {
339 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
340 }
341 return New (realIn, sf);
342 }
343 template <typename ELEMENT_TYPE>
344 [[deprecated ("Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE>
345 New (Execution::InternallySynchronized internallySynchronized, const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<bool> seekable = {})
346 {
347 optional<SeekableFlag> sf;
348 if (seekable) {
349 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
350 }
351 return New (internallySynchronized, realIn, sf);
352 }
353}
#define RequireNotReached()
Definition Assertions.h:385
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...