Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
StreamReader.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
5
6namespace Stroika::Foundation::Streams {
7
8 /*
9 ********************************************************************************
10 ******************** Streams::StreamReader::CacheBlock_ ************************
11 ********************************************************************************
12 */
13 template <typename ELEMENT_TYPE>
14 inline size_t StreamReader<ELEMENT_TYPE>::CacheBlock_::GetSize () const
15 {
16 return fCacheWindowBuf_.GetSize ();
17 }
18 template <typename ELEMENT_TYPE>
19 inline SeekOffsetType StreamReader<ELEMENT_TYPE>::CacheBlock_::GetStart () const
20 {
21 return fCacheWindowBufStart_;
22 }
23 template <typename ELEMENT_TYPE>
24 inline SeekOffsetType StreamReader<ELEMENT_TYPE>::CacheBlock_::GetEnd () const
25 {
26 return fCacheWindowBufStart_ + fCacheWindowBuf_.GetSize ();
27 }
28 template <typename ELEMENT_TYPE>
29 inline auto StreamReader<ELEMENT_TYPE>::CacheBlock_::Peek1FromCache (SeekOffsetType actualOffset) const -> optional<ElementType>
30 {
31 size_t cacheWindowSize = fCacheWindowBuf_.size ();
32 if (fCacheWindowBufStart_ <= actualOffset and actualOffset < fCacheWindowBufStart_ + cacheWindowSize) [[likely]] {
33 return fCacheWindowBuf_[static_cast<size_t> (actualOffset - fCacheWindowBufStart_)];
34 }
35 return nullopt;
36 }
37 template <typename ELEMENT_TYPE>
38 inline auto StreamReader<ELEMENT_TYPE>::CacheBlock_::Read1FromCache (SeekOffsetType* actualOffset) -> optional<ElementType>
39 {
40 RequireNotNull (actualOffset);
41 auto result = Peek1FromCache (*actualOffset);
42 if (result) [[likely]] {
43 ++(*actualOffset);
44 }
45 return result;
46 }
47 template <typename ELEMENT_TYPE>
48 optional<size_t> StreamReader<ELEMENT_TYPE>::CacheBlock_::ReadFromCache (SeekOffsetType* actualOffset, span<ElementType> into)
49 {
50 using namespace Traversal;
51 size_t cacheWindowSize = fCacheWindowBuf_.size ();
52 if (cacheWindowSize != 0) [[likely]] {
53 Range<SignedSeekOffsetType> cacheWindow{
54 static_cast<SignedSeekOffsetType> (fCacheWindowBufStart_),
55 static_cast<SignedSeekOffsetType> (fCacheWindowBufStart_ + cacheWindowSize),
56 Openness::eClosed,
57 Openness::eOpen,
58 };
59 if (cacheWindow.Contains (*actualOffset)) [[likely]] {
60 // then we can return at least some data from the cache - do that now
61 size_t nToRead = into.size ();
62 if (nToRead != 1) {
63 size_t nInBufAvail = static_cast<size_t> (cacheWindow.GetUpperBound () - *actualOffset);
64 nToRead = min (nToRead, nInBufAvail);
65 }
66 Assert (nToRead > 0); // because contained _fOffset
67 size_t curSeekPosOffsetIntoCache = static_cast<size_t> (*actualOffset - cacheWindow.GetLowerBound ());
68 Assert (0 <= curSeekPosOffsetIntoCache and curSeekPosOffsetIntoCache < fCacheWindowBuf_.size ());
69 std::copy (fCacheWindowBuf_.data () + curSeekPosOffsetIntoCache,
70 fCacheWindowBuf_.data () + curSeekPosOffsetIntoCache + nToRead, into.data ());
71 *actualOffset += nToRead;
72 return nToRead;
73 }
74 }
75 return nullopt;
76 }
77 template <typename ELEMENT_TYPE>
78 void StreamReader<ELEMENT_TYPE>::CacheBlock_::FillCacheWith (SeekOffsetType s, span<InlineBufferElementType_> into)
79 {
80 // adjust so smarter to not make cache too big...
81 size_t oldCacheSize = fCacheWindowBuf_.GetSize ();
82 SeekOffsetType currentEnd = fCacheWindowBufStart_ + oldCacheSize;
83 size_t nToWrite = into.size ();
84 Require (nToWrite > 0);
85 if (currentEnd == s) {
86 // extend the cache
87
88 // resize_uninitialized showed up a lot in windows profile running 'large-xxx' so figured
89 // if we are going to actually allocate memory anyhow, then do it once, by grabbing largest chunk we
90 // (are ever likely to) ask for.
91 if (oldCacheSize + nToWrite > fCacheWindowBuf_.kMinCapacity) {
92 fCacheWindowBuf_.reserve (kMaxBufferedChunkSize_);
93 }
94 fCacheWindowBuf_.resize_uninitialized (oldCacheSize + nToWrite);
95 std::copy (into.begin (), into.end (), fCacheWindowBuf_.begin () + oldCacheSize);
96 }
97 else {
98 fCacheWindowBuf_.resize_uninitialized (nToWrite); // CAN shrink
99 fCacheWindowBufStart_ = s;
100 std::copy (into.begin (), into.end (), fCacheWindowBuf_.begin ());
101 }
102 }
103
104 /*
105 ********************************************************************************
106 *************************** Streams::StreamReader ******************************
107 ********************************************************************************
108 */
109 template <typename ELEMENT_TYPE>
111 : fStrm_{underlyingReadFromStreamAdopted}
112 , fOffset_{underlyingReadFromStreamAdopted.GetOffset ()}
113 {
114 Require (underlyingReadFromStreamAdopted.IsSeekable ());
115 }
116 template <typename ELEMENT_TYPE>
118 {
119 // @todo perhaps CTOR flag to control if we do this or not
120 IgnoreExceptionsForCall (this->SynchronizeToUnderlyingStream ())
121 }
122 template <typename ELEMENT_TYPE>
123 inline optional<span<ELEMENT_TYPE>> StreamReader<ELEMENT_TYPE>::Read (span<ElementType> intoBuffer, NoDataAvailableHandling blockFlag)
124 {
125 Require (not intoBuffer.empty ());
126 // if already cached, return from cache. Note - even if only one element is in the Cache, that's enough to return
127 // and not say 'eof'
128 if (optional<size_t> o = ReadFromCache_ (intoBuffer)) {
129 return intoBuffer.subspan (0, *o);
130 }
131 if (auto osz = Read_Slow_Case_ (intoBuffer, blockFlag)) {
132 return intoBuffer.subspan (0, *osz);
133 }
134 Assert (blockFlag == NoDataAvailableHandling::eDontBlock); // Read_Slow_Case_ only returns nullopt in this case - if no data available
135 return nullopt; //
136 }
137 template <typename ELEMENT_TYPE>
138 inline auto StreamReader<ELEMENT_TYPE>::ReadBlocking () -> optional<ElementType>
139 {
140 ElementType e{};
141 span<ElementType> r = ReadBlocking (span{&e, 1});
142 return r.empty () ? optional<ElementType>{} : e;
143 }
144 template <typename ELEMENT_TYPE>
145 inline auto StreamReader<ELEMENT_TYPE>::ReadBlocking (span<ElementType> intoBuffer) -> span<ElementType>
146 {
147 return Memory::ValueOf (Read (intoBuffer, NoDataAvailableHandling::eBlockIfNoDataAvailable));
148 }
149 template <typename ELEMENT_TYPE>
150 auto StreamReader<ELEMENT_TYPE>::ReadBlocking (Memory::InlineBuffer<ElementType>* intoBuffer, ElementType upToSentinel) -> span<ElementType>
151 {
152 Require (intoBuffer->size () == 0);
153 while (auto oe = ReadBlocking ()) {
154 intoBuffer->push_back (*oe); // include the sentinel
155 if (*oe == upToSentinel) {
156 return span{intoBuffer->data (), intoBuffer->size () - 1}; // dont include the sentinel
157 }
158 }
159 return span{intoBuffer->data (), intoBuffer->size ()};
160 }
161 template <typename ELEMENT_TYPE>
162 inline auto StreamReader<ELEMENT_TYPE>::ReadNonBlocking (span<ElementType> intoBuffer) -> optional<span<ElementType>>
163 {
164 // Debug::AssertExternallySynchronizedMutex::ReadContext declareContext{this->_fThisAssertExternallySynchronized};
165 // Require (IsOpen ()); // note - its OK for Write() side of input stream to be closed
166 Require (not intoBuffer.empty ());
167 return Read (intoBuffer, NoDataAvailableHandling::eDontBlock);
168 }
169 template <typename ELEMENT_TYPE>
170 inline auto StreamReader<ELEMENT_TYPE>::ReadOrThrow (span<ElementType> intoBuffer, NoDataAvailableHandling blockFlag) -> span<ElementType>
171 {
172 if (auto o = Read (intoBuffer, blockFlag)) [[likely]] {
173 return *o;
174 }
175 Execution::Throw (EWouldBlock::kThe);
176 }
177 template <typename ELEMENT_TYPE>
178 inline auto StreamReader<ELEMENT_TYPE>::Peek () -> optional<ElementType>
179 {
180 if (auto p = Peek1FromCache_ ()) [[likely]] { // usually will get hit - else default to standard algorithm
181 return p;
182 }
183 SeekOffsetType saved = fOffset_;
184 auto result = this->ReadBlocking ();
185 fOffset_ = saved;
186 return result;
187 }
188 template <typename ELEMENT_TYPE>
189 inline auto StreamReader<ELEMENT_TYPE>::Peek (span<ElementType> intoBuffer) -> span<ElementType>
190 {
191 // @todo maybe able to better optimize this with peeks, and avoid seek
192 SeekOffsetType saved = fOffset_;
193 auto result = this->Read (intoBuffer);
194 fOffset_ = saved;
195 return result;
196 }
197 template <typename ELEMENT_TYPE>
199 {
200 return fOffset_;
201 }
202 template <typename ELEMENT_TYPE>
204 {
205 Require (offset < static_cast<SeekOffsetType> (numeric_limits<SignedSeekOffsetType>::max ()));
206 return Seek (Whence::eFromStart, static_cast<SignedSeekOffsetType> (offset));
207 }
208 template <typename ELEMENT_TYPE>
210 {
211 switch (whence) {
212 case eFromCurrent:
213 fOffset_ += offset;
214 break;
215 case eFromStart:
216 fOffset_ = offset;
217 break;
218 case eFromEnd:
219 fStrm_.Seek (eFromEnd, offset);
220 fOffset_ = fStrm_.GetOffset ();
221 break;
222 }
223 return fOffset_;
224 }
225 template <typename ELEMENT_TYPE>
226 size_t StreamReader<ELEMENT_TYPE>::ReadAll (ElementType* intoStart, ElementType* intoEnd)
227 {
228 size_t elementsRead{};
229 for (ElementType* readCursor = intoStart; readCursor < intoEnd;) {
230 size_t eltsReadThisTime = ReadBlocking (span{readCursor, intoEnd}).size ();
231 Assert (eltsReadThisTime <= static_cast<size_t> (intoEnd - readCursor));
232 if (eltsReadThisTime == 0) {
233 // irrevocable EOF
234 break;
235 }
236 elementsRead += eltsReadThisTime;
237 readCursor += eltsReadThisTime;
238 }
239 return elementsRead;
240 }
241 template <typename ELEMENT_TYPE>
242 inline optional<size_t> StreamReader<ELEMENT_TYPE>::AvailableToRead () const
243 {
244 if (fFarthestReadInUnderlyingStream_ > fOffset_) {
245 return static_cast<size_t> (fFarthestReadInUnderlyingStream_ - fOffset_);
246 }
247 return fStrm_.AvailableToRead ();
248 }
249 template <typename ELEMENT_TYPE>
250 inline optional<SeekOffsetType> StreamReader<ELEMENT_TYPE>::RemainingLength () const
251 {
252 if (auto underlyingRemaining = fStrm_.RemainingLength ()) {
253 Assert (fOffset_ <= fFarthestReadInUnderlyingStream_);
254 return *underlyingRemaining + (fFarthestReadInUnderlyingStream_ - fOffset_);
255 }
256 return nullopt;
257 }
258 template <typename ELEMENT_TYPE>
260 {
261 fStrm_.Seek (GetOffset ());
262 }
263 template <typename ELEMENT_TYPE>
265 {
266 fOffset_ = fStrm_.GetOffset ();
267 }
268 template <typename ELEMENT_TYPE>
270 {
271 if (fOffset_ < fFarthestReadInUnderlyingStream_) [[likely]] {
272 return false; // not logically needed, but optimization
273 }
274 return not Peek ().has_value ();
275 }
276 template <typename ELEMENT_TYPE>
277 inline auto StreamReader<ELEMENT_TYPE>::Peek1FromCache_ () const -> optional<ElementType>
278 {
279 // first try last filled - generally will be the right one
280 for (size_t i = fCacheBlockLastFilled_; i < Memory::NEltsOf (fCacheBlocks_); ++i) {
281 if (auto r = fCacheBlocks_[i].Peek1FromCache (this->fOffset_)) [[likely]] {
282 return r;
283 }
284 }
285 for (size_t i = 0; i < fCacheBlockLastFilled_; ++i) {
286 if (auto r = fCacheBlocks_[i].Peek1FromCache (this->fOffset_)) {
287 return r;
288 }
289 }
290 return nullopt;
291 }
292 template <typename ELEMENT_TYPE>
293 inline auto StreamReader<ELEMENT_TYPE>::Read1FromCache_ () -> optional<ElementType>
294 {
295 // first try last filled - generally will be the right one
296 for (size_t i = fCacheBlockLastFilled_; i < Memory::NEltsOf (fCacheBlocks_); ++i) {
297 if (auto r = fCacheBlocks_[i].Read1FromCache (&this->fOffset_)) [[likely]] {
298 return r;
299 }
300 }
301 for (size_t i = 0; i < fCacheBlockLastFilled_; ++i) {
302 if (auto r = fCacheBlocks_[i].Read1FromCache (&this->fOffset_)) {
303 return r;
304 }
305 }
306 return nullopt;
307 }
308 template <typename ELEMENT_TYPE>
309 optional<size_t> StreamReader<ELEMENT_TYPE>::ReadFromCache_ (span<ElementType> into)
310 {
311 // first try last filled - generally will be the right one
312 for (size_t i = fCacheBlockLastFilled_; i < Memory::NEltsOf (fCacheBlocks_); ++i) {
313 if (auto r = fCacheBlocks_[i].ReadFromCache (&this->fOffset_, into)) {
314 return r;
315 }
316 }
317 for (size_t i = 0; i < fCacheBlockLastFilled_; ++i) {
318 if (auto r = fCacheBlocks_[i].ReadFromCache (&this->fOffset_, into)) {
319 return r;
320 }
321 }
322 return nullopt;
323 }
324 template <typename ELEMENT_TYPE>
325 void StreamReader<ELEMENT_TYPE>::FillCacheWith_ (SeekOffsetType s, span<InlineBufferElementType_> into)
326 {
327 // pingpong buffers
328 // try not to overfill any one cache block, but if the amount being read will fit, append to the current cache block
329 // dont start a new one
330 size_t thisFillSize = into.size ();
331 if (fCacheBlocks_[fCacheBlockLastFilled_].GetEnd () != this->fOffset_ or
332 fCacheBlocks_[fCacheBlockLastFilled_].GetSize () + thisFillSize > kMaxBufferedChunkSize_) {
333 ++fCacheBlockLastFilled_;
334 if (fCacheBlockLastFilled_ >= Memory::NEltsOf (fCacheBlocks_)) {
335 fCacheBlockLastFilled_ = 0;
336 }
337 }
338 fCacheBlocks_[fCacheBlockLastFilled_].FillCacheWith (s, into);
339 }
340 template <typename ELEMENT_TYPE>
341 optional<size_t> StreamReader<ELEMENT_TYPE>::Read_Slow_Case_ (span<ElementType> into, NoDataAvailableHandling blockFlag)
342 {
343 ElementType buf[kDefaultReadBufferSize_];
344 fStrm_.Seek (fOffset_); // check if get_offset not same in case not seekable) - or handle not seekable case
345 if (optional<span<ElementType>> o = fStrm_.Read (buf, blockFlag)) {
346 size_t nRecordsRead = o->size ();
347 if (nRecordsRead == 0) {
348 // not much point in caching - at eof
349 return 0;
350 }
351 fFarthestReadInUnderlyingStream_ = max (fFarthestReadInUnderlyingStream_, fStrm_.GetOffset ());
352 FillCacheWith_ (fOffset_, Memory::SpanBytesCast<span<InlineBufferElementType_>> (span{buf, nRecordsRead}));
353 return Memory::ValueOf (ReadFromCache_ (into)); // we just cached bytes a the right offset so this must succeed
354 }
355 // if upstream read returned nullopt, implies would-block
356 Assert (blockFlag == NoDataAvailableHandling::eDontBlock);
357 return nullopt;
358 }
359
360}
#define RequireNotNull(p)
Definition Assertions.h:347
NoDataAvailableHandling
If eDontBlock passed to most Stream APIs, then when the code would do a blocking read,...
Definition Stream.h:90
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
InputStream<>::Ptr is Smart pointer (with abstract Rep) class defining the interface to reading from ...
nonvirtual bool IsSeekable() const
Returns true iff this object was constructed with a seekable input stream rep.
Definition Stream.inl:44
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
Definition Throw.inl:43
StreamReader is an non-essential Stream utility, adding simplicity of use for a common use case,...
nonvirtual optional< size_t > AvailableToRead() const
returns nullopt if nothing known available, zero if known EOF, and any other number of elements (typi...
nonvirtual SeekOffsetType Seek(SeekOffsetType offset)
Logically the same as InputStream::Ptr<ELEMENT_TYPE>::Seek () - but without being 'synchronized' it m...
nonvirtual optional< ElementType > Peek()
Logically the same as InputStream::Ptr<ELEMENT_TYPE>::Peek () but reading cached data.
nonvirtual optional< span< ElementType > > Read(span< ElementType > intoBuffer, NoDataAvailableHandling blockFlag)
Read into data referenced by span argument - and using argument blocking strategy (default blocking)
nonvirtual SeekOffsetType GetOffset() const
Logically the same as InputStream::Ptr<ELEMENT_TYPE>::GetOffset () - but without being 'synchronized'...
nonvirtual size_t ReadAll(ElementType *intoStart, ElementType *intoEnd)
Logically the same as InputStream::Ptr<ELEMENT_TYPE>::ReadAll ()
nonvirtual optional< SeekOffsetType > RemainingLength() const
returns nullopt if not known (typical, and the default) - but sometimes it is known,...
nonvirtual optional< ElementType > ReadBlocking()
ReadBlocking () reads either a single element, or fills in argument intoBuffer - but never blocks (no...
nonvirtual optional< span< ElementType > > ReadNonBlocking(span< ElementType > intoBuffer)
read into intoBuffer - returning nullopt if would block, and else returning subspan of input with rea...
nonvirtual span< ElementType > ReadOrThrow(span< ElementType > intoBuffer, NoDataAvailableHandling blockFlag)
Read (either one or into argument span) and taking NoDataAvailableHandling blockFlag),...