Stroika Library 3.0d23x
 
Loading...
Searching...
No Matches
BufferedInputStream.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
6#include "Stroika/Foundation/Streams/InternallySynchronizedInputStream.h"
8
9namespace Stroika::Foundation::Streams::BufferedInputStream {
10
11 namespace Private_ {
12
13 [[noreturn]] void ThrowCannotSeekFromEnd_ ();
14
15 // this case easy, delegate to StreamReader to do all the work
16 template <typename ELEMENT_TYPE>
17 class Rep_Seekable_FromSeekable_ : public IRep_<ELEMENT_TYPE> {
18 public:
19 Rep_Seekable_FromSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
20 : fRealIn_{realIn}
21 , fReader_{realIn}
22 {
23 Require (realIn.IsSeekable ());
24 }
25 virtual bool IsSeekable () const override
26 {
27 return true;
28 }
29 virtual void CloseRead () override
30 {
31 if (fRealIn_ != nullptr) {
32 fRealIn_.Close ();
33 }
34 Ensure (not IsOpenRead ());
35 Assert (fRealIn_ == nullptr);
36 }
37 virtual bool IsOpenRead () const override
38 {
39 return fRealIn_ != nullptr;
40 }
41 virtual SeekOffsetType GetReadOffset () const override
42 {
43 Require (IsOpenRead ());
44 return fReader_.GetOffset ();
45 }
46 virtual optional<size_t> AvailableToRead () override
47 {
48 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
49 Require (IsOpenRead ());
50 return fReader_.AvailableToRead (); // since no actual buffering here yet
51 }
52 virtual optional<SeekOffsetType> RemainingLength () override
53 {
54 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
55 Require (IsOpenRead ());
56 return fReader_.RemainingLength ();
57 }
58 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
59 {
60 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
61 Require (IsOpenRead ());
62 return fReader_.Read (intoBuffer, blockFlag);
63 }
64
65 private:
66 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
67 StreamReader<ELEMENT_TYPE> fReader_;
68 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
69 };
70
71 // read the source into one big buffer. Keep it all around, so seekable
72 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
73 class Rep_Seekable_FromUnSeekable_ : public IRep_<ELEMENT_TYPE> {
74 public:
75 Rep_Seekable_FromUnSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
76 : fRealIn_{realIn}
77 {
78 }
79 virtual bool IsSeekable () const override
80 {
81 return true;
82 }
83 virtual void CloseRead () override
84 {
85 if (fRealIn_ != nullptr) {
86 fRealIn_.Close ();
87 }
88 Ensure (not IsOpenRead ());
89 Assert (fRealIn_ == nullptr);
90 }
91 virtual bool IsOpenRead () const override
92 {
93 return fRealIn_ != nullptr;
94 }
95 virtual SeekOffsetType GetReadOffset () const override
96 {
97 Require (IsOpenRead ());
98 return fSeekOffset_;
99 }
100 virtual optional<size_t> AvailableToRead () override
101 {
102 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
103 Require (IsOpenRead ());
104 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
105 return fBufferOfAllReadDataSoFar_.size () - static_cast<size_t> (fSeekOffset_); // don't include what we might get upstream cuz more costly to compute
106 }
107 return fRealIn_.AvailableToRead ();
108 }
109 virtual optional<SeekOffsetType> RemainingLength () override
110 {
111 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
112 Require (IsOpenRead ());
113 if (auto rl = fRealIn_.RemainingLength ()) {
114 return MapOffsetFromReal2Mine_ (*rl);
115 }
116 return nullopt;
117 }
118 virtual auto SeekRead (Whence whence, SignedSeekOffsetType offset) -> SeekOffsetType override
119 {
120 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
121 Require (IsOpenRead ());
122 // @todo - allow seek forward past fBufferOfAllReadDataSoFar_?
123 switch (whence) {
124 case Whence::eFromCurrent:
125 fSeekOffset_ += offset;
126 break;
127 case Whence::eFromStart:
128 fSeekOffset_ = offset;
129 break;
130 case Whence::eFromEnd:
131 if (auto remaining = this->RemainingLength ()) {
132 fSeekOffset_ += static_cast<SignedSeekOffsetType> (*remaining) - offset;
133 break;
134 }
135 else {
136 Private_::ThrowCannotSeekFromEnd_ ();
137 }
138 default:
140 }
141 return fSeekOffset_;
142 }
143 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
144 {
145 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
146 Require (IsOpenRead ());
147 Assert (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ());
148 if (fSeekOffset_ == fBufferOfAllReadDataSoFar_.size ()) [[unlikely]] {
149 ELEMENT_TYPE buf[1024];
150 if (auto r = fRealIn_.Read (span{buf}, blockFlag)) {
151 fBufferOfAllReadDataSoFar_.push_back (*r); // continue, and fall through
152 }
153 else {
154 return nullopt; // no data pre-read, and nothing available upstream
155 }
156 }
157 if (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
158 size_t n2Read = min<size_t> (intoBuffer.size (), static_cast<size_t> (fBufferOfAllReadDataSoFar_.size () - fSeekOffset_));
159 auto result = Memory::CopySpanData (span{fBufferOfAllReadDataSoFar_}.subspan (static_cast<size_t> (fSeekOffset_), n2Read), intoBuffer);
160 Assert (result.size () == n2Read);
161 fSeekOffset_ += n2Read;
162 return result;
163 }
164 return nullopt;
165 }
166
167 private:
168 nonvirtual SeekOffsetType MapOffsetFromReal2Mine_ (SeekOffsetType so) const
169 {
170 return static_cast<SeekOffsetType> (static_cast<SignedSeekOffsetType> (so) + static_cast<SignedSeekOffsetType> (fRealIn_.GetOffset ()) -
171 static_cast<SignedSeekOffsetType> (fSeekOffset_));
172 }
173
174 private:
175 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
176 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fBufferOfAllReadDataSoFar_;
177 SeekOffsetType fSeekOffset_{0}; // always inside fBufferOfAllReadDataSoFar_
178 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
179 };
180
181 // pretty easy/efficient case cuz we can throw away data as we go, and since not seekable, not many cases to analyze
182 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
183 class Rep_UnSeekable_ : public IRep_<ELEMENT_TYPE> {
184 public:
185 Rep_UnSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
186 : fRealIn_{realIn}
187 {
188 }
189 virtual bool IsSeekable () const override
190 {
191 return false;
192 }
193 virtual void CloseRead () override
194 {
195 if (fRealIn_ != nullptr) {
196 fRealIn_.Close ();
197 }
198 Ensure (not IsOpenRead ());
199 Assert (fRealIn_ == nullptr);
200 }
201 virtual bool IsOpenRead () const override
202 {
203 return fRealIn_ != nullptr;
204 }
205 virtual SeekOffsetType GetReadOffset () const override
206 {
207 Require (IsOpenRead ());
208 return fReadOffsetIntoIntermediateBuf_ + fRealIn_.GetOffset ();
209 }
210 virtual optional<size_t> AvailableToRead () override
211 {
212 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
213 Require (IsOpenRead ());
214 size_t n = GetNEltsAlreadyBufferedFromUpstream_ ();
215 if (auto o = fRealIn_.AvailableToRead ()) {
216 // if we KNOW what's available upstream, add to what we've pre-read
217 return *o + n;
218 }
219 else if (n != 0) {
220 return n; // if zero buffered, and nothing KNOWN about upstream, return nullopt
221 }
222 return nullopt; // if nothing upstream available (but not zero/eof)
223 }
224 virtual optional<SeekOffsetType> RemainingLength () override
225 {
226 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
227 Require (IsOpenRead ());
228 if (auto o = fRealIn_.RemainingLength ()) {
229 return *o + GetNEltsAlreadyBufferedFromUpstream_ ();
230 }
231 return nullopt; // if nothing upstream available (but not zero/eof)
232 }
233 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
234 {
235 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
236 Require (IsOpenRead ());
237 auto n = GetNEltsAlreadyBufferedFromUpstream_ ();
238 if (n == 0) [[unlikely]] {
239 // read into fIntermediateBuffer_ (not intoBuffer)- OK to overwrite cuz
240 // no seeking allowed, so we will never re-examine that data/buffer
241 auto bufR = fRealIn_.Read (span{fIntermediateBuffer_}, blockFlag);
242 if (bufR) {
243 // we filled buffer (possibly with zero elements)
244 fReadOffsetIntoIntermediateBuf_ = 0;
245 n = bufR->size ();
246 fIntermediateBuffer_.resize_uninitialized (n);
247 }
248 else {
249 return nullopt; // no new information, don't change state so can Read again
250 }
251 }
252 // if we get here, and n = 0, really EOF, cuz return above on NOT-AVAIL case
253 Assert (n != 0 or fRealIn_.IsAtEOF ());
254 size_t n2Read = Math::AtMost (n, intoBuffer.size ());
255 auto t = Memory::CopySpanData (span{fIntermediateBuffer_}.subspan (fReadOffsetIntoIntermediateBuf_, n2Read), intoBuffer);
256 Assert (t.size () == n2Read);
257 fReadOffsetIntoIntermediateBuf_ += n2Read;
258 return t;
259 }
260
261 private:
262 nonvirtual size_t GetNEltsAlreadyBufferedFromUpstream_ () const
263 {
264 Assert (fReadOffsetIntoIntermediateBuf_ <= fIntermediateBuffer_.size ());
265 return fIntermediateBuffer_.size () - fReadOffsetIntoIntermediateBuf_;
266 }
267
268 private:
269 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
270 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fIntermediateBuffer_;
271 size_t fReadOffsetIntoIntermediateBuf_{0};
272 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
273 };
274 }
275
276 /*
277 ********************************************************************************
278 ********************* Streams::BufferedInputStream::New ************************
279 ********************************************************************************
280 */
281 template <typename ELEMENT_TYPE>
282 inline auto New (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
283 {
284 using PTR = Ptr<ELEMENT_TYPE>;
285 SeekableFlag srcSeekable = realIn.GetSeekability ();
286 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
287 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
288 if (useSeekable == SeekableFlag::eSeekable) {
289 return (srcSeekable == SeekableFlag::eSeekable)
290 ? PTR{Memory::MakeSharedPtr<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> (realIn)}
291 : PTR{Memory::MakeSharedPtr<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
292 }
293 else {
294 return PTR{Memory::MakeSharedPtr<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
295 }
296 }
297 template <typename ELEMENT_TYPE>
298 inline auto New (Execution::InternallySynchronized internallySynchronized, const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
299 optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
300 {
301 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
302 switch (internallySynchronized) {
303 case Execution::eInternallySynchronized: {
304 SeekableFlag srcSeekable = realIn.GetSeekability ();
305 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
306 if (useSeekable == SeekableFlag::eSeekable) {
307 return (srcSeekable == SeekableFlag::eSeekable)
308 ? InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> ({}, realIn)
309 : InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
310 }
311 else {
312 return InternallySynchronizedInputStream::New<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
313 }
314 }
315 case Execution::eNotKnownInternallySynchronized:
316 return New<ELEMENT_TYPE> (realIn, seekable);
317 default:
319 return nullptr;
320 }
321 }
322
323 /*
324 ********************************************************************************
325 ******************* BufferedInputStream::Ptr<ELEMENT_TYPE> *********************
326 ********************************************************************************
327 */
328 template <typename ELEMENT_TYPE>
329 inline Ptr<ELEMENT_TYPE>::Ptr (const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
330 : inherited{from}
331 {
332 }
333
334 template <typename ELEMENT_TYPE>
335 [[deprecated ("Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE> New (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
336 optional<bool> seekable)
337 {
338 optional<SeekableFlag> sf;
339 if (seekable) {
340 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
341 }
342 return New (realIn, sf);
343 }
344 template <typename ELEMENT_TYPE>
345 [[deprecated ("Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE>
346 New (Execution::InternallySynchronized internallySynchronized, const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<bool> seekable = {})
347 {
348 optional<SeekableFlag> sf;
349 if (seekable) {
350 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
351 }
352 return New (internallySynchronized, realIn, sf);
353 }
354}
#define RequireNotReached()
Definition Assertions.h:385
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...