Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
BufferedInputStream.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
5#include "Stroika/Foundation/Streams/InternallySynchronizedInputStream.h"
7
8namespace Stroika::Foundation::Streams::BufferedInputStream {
9
10 namespace Private_ {
11
12 // this case easy, delegate to StreamReader to do all the work
13 template <typename ELEMENT_TYPE>
14 class Rep_Seekable_FromSeekable_ : public IRep_<ELEMENT_TYPE> {
15 public:
16 Rep_Seekable_FromSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
17 : fRealIn_{realIn}
18 , fReader_{realIn}
19 {
20 Require (realIn.IsSeekable ());
21 }
22 virtual bool IsSeekable () const override
23 {
24 return true;
25 }
26 virtual void CloseRead () override
27 {
28 if (fRealIn_ != nullptr) {
29 fRealIn_.Close ();
30 }
31 Ensure (not IsOpenRead ());
32 Assert (fRealIn_ == nullptr);
33 }
34 virtual bool IsOpenRead () const override
35 {
36 return fRealIn_ != nullptr;
37 }
38 virtual SeekOffsetType GetReadOffset () const override
39 {
40 Require (IsOpenRead ());
41 return fReader_.GetOffset ();
42 }
43 virtual optional<size_t> AvailableToRead () override
44 {
45 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
46 Require (IsOpenRead ());
47 return fReader_.AvailableToRead (); // since no actual buffering here yet
48 }
49 virtual optional<SeekOffsetType> RemainingLength () override
50 {
51 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
52 Require (IsOpenRead ());
53 return fReader_.RemainingLength ();
54 }
55 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
56 {
57 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
58 Require (IsOpenRead ());
59 return fReader_.Read (intoBuffer, blockFlag);
60 }
61
62 private:
63 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
64 StreamReader<ELEMENT_TYPE> fReader_;
65 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
66 };
67
68 // read the source into one big buffer. Keep it all around, so seekable
69 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
70 class Rep_Seekable_FromUnSeekable_ : public IRep_<ELEMENT_TYPE> {
71 public:
72 Rep_Seekable_FromUnSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
73 : fRealIn_{realIn}
74 {
75 }
76 virtual bool IsSeekable () const override
77 {
78 return true;
79 }
80 virtual void CloseRead () override
81 {
82 if (fRealIn_ != nullptr) {
83 fRealIn_.Close ();
84 }
85 Ensure (not IsOpenRead ());
86 Assert (fRealIn_ == nullptr);
87 }
88 virtual bool IsOpenRead () const override
89 {
90 return fRealIn_ != nullptr;
91 }
92 virtual SeekOffsetType GetReadOffset () const override
93 {
94 Require (IsOpenRead ());
95 return fSeekOffset_;
96 }
97 virtual optional<size_t> AvailableToRead () override
98 {
99 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
100 Require (IsOpenRead ());
101 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
102 return fBufferOfAllReadDataSoFar_.size () - static_cast<size_t> (fSeekOffset_); // don't include what we might get upstream cuz more costly to compute
103 }
104 return fRealIn_.AvailableToRead ();
105 }
106 virtual optional<SeekOffsetType> RemainingLength () override
107 {
108 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
109 Require (IsOpenRead ());
110 if (auto rl = fRealIn_.RemainingLength ()) {
111 return MapOffsetFromReal2Mine_ (*rl);
112 }
113 return nullopt;
114 }
115 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
116 {
117 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
118 Require (IsOpenRead ());
119 Assert (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ());
120 if (fSeekOffset_ == fBufferOfAllReadDataSoFar_.size ()) [[unlikely]] {
121 ELEMENT_TYPE buf[1024];
122 if (auto r = fRealIn_.Read (span{buf}, blockFlag)) {
123 fBufferOfAllReadDataSoFar_.push_back (*r); // continue, and fall through
124 }
125 else {
126 return nullopt; // no data pre-read, and nothing available upstream
127 }
128 }
129 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
130 size_t n2Read = min<size_t> (intoBuffer.size (), static_cast<size_t> (fBufferOfAllReadDataSoFar_.size () - fSeekOffset_));
131 auto result = Memory::CopySpanData (span{fBufferOfAllReadDataSoFar_}.subspan (static_cast<size_t> (fSeekOffset_), n2Read), intoBuffer);
132 Assert (result.size () == n2Read);
133 fSeekOffset_ += n2Read;
134 return result;
135 }
136 return nullopt;
137 }
138
139 private:
140 nonvirtual SeekOffsetType MapOffsetFromReal2Mine_ (SeekOffsetType so) const
141 {
142 return static_cast<SeekOffsetType> (static_cast<SignedSeekOffsetType> (so) + static_cast<SignedSeekOffsetType> (fRealIn_.GetOffset ()) -
143 static_cast<SignedSeekOffsetType> (fSeekOffset_));
144 }
145
146 private:
147 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
148 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fBufferOfAllReadDataSoFar_;
149 SeekOffsetType fSeekOffset_{0}; // always inside fBufferOfAllReadDataSoFar_
150 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
151 };
152
153 // pretty easy/efficient case cuz we can throw away data as we go, and since not seekable, not many cases to analyze
154 template <typename ELEMENT_TYPE, size_t INLINE_BUF_SIZE>
155 class Rep_UnSeekable_ : public IRep_<ELEMENT_TYPE> {
156 public:
157 Rep_UnSeekable_ (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
158 : fRealIn_{realIn}
159 {
160 }
161 virtual bool IsSeekable () const override
162 {
163 return false;
164 }
165 virtual void CloseRead () override
166 {
167 if (fRealIn_ != nullptr) {
168 fRealIn_.Close ();
169 }
170 Ensure (not IsOpenRead ());
171 Assert (fRealIn_ == nullptr);
172 }
173 virtual bool IsOpenRead () const override
174 {
175 return fRealIn_ != nullptr;
176 }
177 virtual SeekOffsetType GetReadOffset () const override
178 {
179 Require (IsOpenRead ());
180 return fReadOffsetIntoIntermediateBuf_ + fRealIn_.GetOffset ();
181 }
182 virtual optional<size_t> AvailableToRead () override
183 {
184 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
185 Require (IsOpenRead ());
186 size_t n = GetNEltsAlreadyBufferedFromUpstream_ ();
187 if (auto o = fRealIn_.AvailableToRead ()) {
188 // if we KNOW what's available upstream, add to what we've pre-read
189 return *o + n;
190 }
191 else if (n != 0) {
192 return n; // if zero buffered, and nothing KNOWN about upstream, return nullopt
193 }
194 return nullopt; // if nothing upstream available (but not zero/eof)
195 }
196 virtual optional<SeekOffsetType> RemainingLength () override
197 {
198 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
199 Require (IsOpenRead ());
200 if (auto o = fRealIn_.RemainingLength ()) {
201 return *o + GetNEltsAlreadyBufferedFromUpstream_ ();
202 }
203 return nullopt; // if nothing upstream available (but not zero/eof)
204 }
205 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag) override
206 {
207 Debug::AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
208 Require (IsOpenRead ());
209 auto n = GetNEltsAlreadyBufferedFromUpstream_ ();
210 if (n == 0) [[unlikely]] {
211 // read into fIntermediateBuffer_ (not intoBuffer)- OK to overwrite cuz
212 // no seeking allowed, so we will never re-examine that data/buffer
213 auto bufR = fRealIn_.Read (span{fIntermediateBuffer_}, blockFlag);
214 if (bufR) {
215 // we filled buffer (possibly with zero elements)
216 fReadOffsetIntoIntermediateBuf_ = 0;
217 n = bufR->size ();
218 fIntermediateBuffer_.resize_uninitialized (n);
219 }
220 else {
221 return nullopt; // no new information, don't change state so can Read again
222 }
223 }
224 // if we get here, and n = 0, really EOF, cuz return above on NOT-AVAIL case
225 Assert (n != 0 or fRealIn_.IsAtEOF ());
226 size_t n2Read = Math::AtMost (n, intoBuffer.size ());
227 auto t = Memory::CopySpanData (span{fIntermediateBuffer_}.subspan (fReadOffsetIntoIntermediateBuf_, n2Read), intoBuffer);
228 Assert (t.size () == n2Read);
229 fReadOffsetIntoIntermediateBuf_ += n2Read;
230 return t;
231 }
232
233 private:
234 nonvirtual size_t GetNEltsAlreadyBufferedFromUpstream_ () const
235 {
236 Assert (fReadOffsetIntoIntermediateBuf_ <= fIntermediateBuffer_.size ());
237 return fIntermediateBuffer_.size () - fReadOffsetIntoIntermediateBuf_;
238 }
239
240 private:
241 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
242 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fIntermediateBuffer_;
243 size_t fReadOffsetIntoIntermediateBuf_{0};
244 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
245 };
246 }
247
248 /*
249 ********************************************************************************
250 ********************* Streams::BufferedInputStream::New ************************
251 ********************************************************************************
252 */
253 template <typename ELEMENT_TYPE>
254 inline auto New (const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<bool> seekable) -> Ptr<ELEMENT_TYPE>
255 {
256 using PTR = Ptr<ELEMENT_TYPE>;
257 bool srcSeekable = realIn.IsSeekable ();
258 bool useSeekable = seekable.value_or (srcSeekable);
259 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
260 if (useSeekable) {
261 return srcSeekable ? PTR{make_shared<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> (realIn)}
262 : PTR{make_shared<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
263 }
264 else {
265 return PTR{make_shared<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
266 }
267 }
268 template <typename ELEMENT_TYPE>
269 inline auto New (Execution::InternallySynchronized internallySynchronized, const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
270 optional<bool> seekable) -> Ptr<ELEMENT_TYPE>
271 {
272 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
273 switch (internallySynchronized) {
274 case Execution::eInternallySynchronized: {
275 bool srcSeekable = realIn.IsSeekable ();
276 bool useSeekable = seekable.value_or (srcSeekable);
277 if (useSeekable) {
278 return srcSeekable
279 ? InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> ({}, realIn)
280 : InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
281 }
282 else {
283 return InternallySynchronizedInputStream::New<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
284 }
285 }
286 case Execution::eNotKnownInternallySynchronized:
287 return New<ELEMENT_TYPE> (realIn, seekable);
288 default:
290 return nullptr;
291 }
292 }
293
294 /*
295 ********************************************************************************
296 ******************* BufferedInputStream::Ptr<ELEMENT_TYPE> *********************
297 ********************************************************************************
298 */
299 template <typename ELEMENT_TYPE>
300 inline Ptr<ELEMENT_TYPE>::Ptr (const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
301 : inherited{from}
302 {
303 }
304
305}
#define RequireNotReached()
Definition Assertions.h:385
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
InputStream<>::Ptr is Smart pointer (with abstract Rep) class defining the interface to reading from ...
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
Definition Stream.h:170
nonvirtual bool IsSeekable() const
Returns true iff this object was constructed with a seekable input stream rep.
Definition Stream.inl:44