4#include "Stroika/Foundation/StroikaPreComp.h"
8#include "Stroika/Foundation/Containers/Support/ReserveTweaks.h"
10#include "Stroika/Foundation/Execution/Common.h"
11#include "Stroika/Foundation/Execution/OperationNotSupportedException.h"
25using namespace Stroika::Foundation::Streams;
26using namespace Stroika::Foundation::Streams::BinaryToText;
33 const auto kReadPartialCharacterAtEndOfBinaryStreamException_ =
40 , _fCharConverter{charConverter}
59 struct _ReadAheadCache {
63 optional<_ReadAheadCache> _fReadAheadCache;
72 if (_fSource !=
nullptr) {
75 Ensure (not IsOpenRead ());
76 Assert (_fSource ==
nullptr);
80 return _fSource !=
nullptr;
84 Require (IsOpenRead ());
85 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
90 size_t currentByteGoal = 1;
92 if (
auto o = PreReadUpstreamInto_ (span{inByteBuf}, currentByteGoal,
eDontBlock)) {
93 span<const byte> binarySrcSpan{*o};
95 span<Character> targetBuf{&ignoredChar, 1};
96 Assert (_fCharConverter.ComputeTargetCharacterBufferSize (binarySrcSpan.size ()) <= targetBuf.size ());
97 span<Character> convertedCharacters = _fCharConverter.Bytes2Characters (&binarySrcSpan, targetBuf);
98 if (convertedCharacters.empty ()) {
101 Prepend2ReadAheadCache_ (binarySrcSpan);
105 Assert (_fReadAheadCache == nullopt);
106 Assert (not binarySrcSpan.empty () and not convertedCharacters.empty ());
107 Assert (convertedCharacters.size () <= targetBuf.size ());
108 Prepend2ReadAheadCache_ (binarySrcSpan);
116 Require (IsOpenRead ());
121 Require (not intoBuffer.empty ());
122 Require (IsOpenRead ());
123 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
133 size_t readAtMostCharacters = intoBuffer.size ();
134 StackBuffer<byte, 8 * 1024> inByteBuf{Memory::eUninitialized, readAtMostCharacters};
135 size_t currentByteGoal = 1;
137 if (
auto o = PreReadUpstreamInto_ (span{inByteBuf}, currentByteGoal, blockFlag)) {
138 span<const byte> binarySrcSpan{*o};
139 span<Character> targetBuf{intoBuffer};
140 Assert (_fCharConverter.ComputeTargetCharacterBufferSize (inByteBuf.size ()) <= targetBuf.size ());
141 span<Character> convertedCharacters = _fCharConverter.Bytes2Characters (&binarySrcSpan, targetBuf);
142 if (binarySrcSpan.empty ()) {
144 Assert (convertedCharacters.size () <= targetBuf.size ());
145 _fOffset += convertedCharacters.size ();
146 return intoBuffer.subspan (0, convertedCharacters.size ());
148 else if (convertedCharacters.empty ()) {
151 Prepend2ReadAheadCache_ (binarySrcSpan);
155 Assert (not binarySrcSpan.empty () and not convertedCharacters.empty ());
156 Assert (convertedCharacters.size () <= targetBuf.size ());
157 _fOffset += convertedCharacters.size ();
159 Prepend2ReadAheadCache_ (binarySrcSpan);
160 return intoBuffer.subspan (0, convertedCharacters.size ());
165 Assert (blockFlag == eDontBlock);
171 template <
size_t EXTENT>
172 span<byte> ReadFromAndRemoveFromReadAheadCache_ (span<byte, EXTENT> intoSpan)
174 size_t bytes2Copy = intoSpan.size ();
175 Require (_fReadAheadCache and _fReadAheadCache->fData.size () >= bytes2Copy);
176 Require (this->_fOffset == _fReadAheadCache->fFrom);
177 Memory::CopyBytes (span{_fReadAheadCache->fData}, intoSpan);
178 if (bytes2Copy == _fReadAheadCache->fData.size ()) {
179 _fReadAheadCache.reset ();
182 _fReadAheadCache.emplace (_ReadAheadCache{.fFrom = _fOffset + bytes2Copy, .fData = span{_fReadAheadCache->fData}.subspan (bytes2Copy)});
186 template <
size_t EXTENT>
187 void Prepend2ReadAheadCache_ (span<const byte, EXTENT> data2Append)
189 if (_fReadAheadCache) {
190 Require (_fOffset == _fReadAheadCache->fFrom - data2Append.size ());
191 StackBuffer<byte> combined{data2Append};
192 combined.push_back (span{_fReadAheadCache->fData});
193 _fReadAheadCache.emplace (_ReadAheadCache{.fFrom = _fOffset, .fData = combined});
196 _fReadAheadCache.emplace (_ReadAheadCache{.fFrom = _fOffset, .fData = data2Append});
209 nonvirtual optional<span<const byte>> PreReadUpstreamInto_ (span<byte> intoBuf,
size_t goalSizeAtLeast,
NoDataAvailableHandling blockFlag)
211 Require (goalSizeAtLeast >= 1);
212 Require (goalSizeAtLeast <= intoBuf.size ());
213 span<const byte> result;
214 if (_fReadAheadCache and _fReadAheadCache->fFrom == this->_fOffset) {
220 Assert (intoBuf.size () >= _fReadAheadCache->fData.size ());
221 size_t amtToCopy = min (_fReadAheadCache->fData.size (), intoBuf.size ());
222 result = ReadFromAndRemoveFromReadAheadCache_ (intoBuf.subspan (0, amtToCopy));
224 while (result.size () < goalSizeAtLeast) {
227 auto r = _fSource.ReadBlocking (intoBuf.subspan (result.size ()));
228 if (r.size () == 0) {
234 result = intoBuf.subspan (0, result.size () + r.size ());
238 if (
auto o = _fSource.ReadNonBlocking (intoBuf.subspan (result.size ()))) {
239 if (o->size () == 0) {
245 result = intoBuf.subspan (0, result.size () + o->size ());
251 Prepend2ReadAheadCache_ (result);
261 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
262 Require (IsOpenRead ());
273 class UnseekableBinaryStreamRep_ final :
public FromBinaryStreamBaseRep_ {
274 using inherited = FromBinaryStreamBaseRep_;
278 : inherited{src, charConverter}
290 class CachingSeekableBinaryStreamRep_ final :
public FromBinaryStreamBaseRep_ {
291 using inherited = FromBinaryStreamBaseRep_;
295 : FromBinaryStreamBaseRep_{src, charConverter}
296 , fReadAheadAllowed_{readAhead == Reader::
ReadAhead::eReadAheadAllowed}
301 virtual bool IsSeekable ()
const override
305 virtual optional<span<Character>> Read (span<Character> intoBuffer,
NoDataAvailableHandling blockFlag)
override
307 Require (not intoBuffer.empty ());
308 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
309 Require (IsOpenRead ());
313 if (_fOffset < fCache_.size ()) {
315 size_t nToRead = intoBuffer.size ();
316 size_t nInBufAvail = fCache_.size () -
static_cast<size_t> (_fOffset);
317 nToRead = min (nToRead, nInBufAvail);
318 Assert (nToRead > 0);
319 for (
size_t i = 0; i < nToRead; ++i) {
320 intoBuffer[i] = fCache_[i +
static_cast<size_t> (_fOffset)];
323 return intoBuffer.subspan (0, nToRead);
329 size_t n = bufEnd - bufStart;
330 size_t newCacheSize =
static_cast<size_t> (origOffset + n);
331 Assert (fCache_.size () ==
static_cast<size_t> (origOffset));
332 Assert (newCacheSize > fCache_.size ());
333 Containers::Support::ReserveTweaks::Reserve4AddN (fCache_, n);
334 fCache_.resize_uninitialized (newCacheSize);
335 for (
size_t i = 0; i < n; ++i) {
336 fCache_[i +
static_cast<size_t> (origOffset)] = bufStart[i];
341 constexpr size_t kMinCachedReadSize_{512};
342 if (intoBuffer.size () >= kMinCachedReadSize_ or not fReadAheadAllowed_) {
343 auto result = inherited::Read (intoBuffer, blockFlag);
344 if (result == nullopt) {
345 Throw (EWouldBlock::kThe);
347 if (result->size () != 0) {
348 if (origOffset + result->size () > numeric_limits<size_t>::max ()) [[unlikely]] {
350 Throw (range_error{
"seek past max size for size_t"});
352 pushIntoCacheBuf (intoBuffer.data (), intoBuffer.data () + result->size ());
358 constexpr size_t kUseCacheSize_ = 8 * kMinCachedReadSize_;
361 inherited::Read (span{
reinterpret_cast<Character*
> (std::begin (buf)),
reinterpret_cast<Character*
> (std::end (buf))}, blockFlag);
362 if (result == nullopt) {
363 Throw (EWouldBlock::kThe);
365 if (result->size () != 0) {
366 if (origOffset + result->size () > numeric_limits<size_t>::max ()) [[unlikely]] {
368 Throw (range_error{
"seek past max size for size_t"});
370 pushIntoCacheBuf (std::begin (buf), std::begin (buf) + result->size ());
371 result = result->subspan (0, min (intoBuffer.size (), result->size ()));
372 DISABLE_COMPILER_GCC_WARNING_START (
"GCC diagnostic ignored \"-Wclass-memaccess\"");
373 (void)::memcpy (intoBuffer.data (), std::begin (buf), result->size () *
sizeof (
Character));
374 DISABLE_COMPILER_GCC_WARNING_END (
"GCC diagnostic ignored \"-Wclass-memaccess\"");
375 _fOffset = origOffset + result->size ();
382 static const auto kException_ = range_error{
"seek"};
383 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
384 Require (IsOpenRead ());
387 if (offset < 0) [[unlikely]] {
395 if (newOffset < 0) [[unlikely]] {
399 SeekTo_ (
static_cast<size_t> (uNewOffset));
405 for (
auto o = Read (span{&c, 1}, NoDataAvailableHandling::eBlockIfNoDataAvailable); o && o->size () == 1;
406 o = Read (span{&c, 1}, NoDataAvailableHandling::eBlockIfNoDataAvailable)) {
409 SeekTo_ (_fOffset + offset);
418 static const auto kException_ = range_error{
"seek"};
420 while (_fOffset < offset) {
424 if (
auto o = Read (span{&c, 1}, NoDataAvailableHandling::eBlockIfNoDataAvailable); o && o->size () == 0) [[unlikely]] {
428 Ensure (_fOffset == offset);
437 if (offset > _fOffset) {
438 SeekFowardTo_ (offset);
440 else if (offset < _fOffset) {
441 SeekBackwardTo_ (offset);
443 Ensure (_fOffset == offset);
447 bool fReadAheadAllowed_{
false};
463 Ptr p = (seekable == SeekableFlag::eSeekable) ?
Ptr{make_shared<CachingSeekableBinaryStreamRep_> (src, codeConverter, readAhead)}
464 :
Ptr{make_shared<UnseekableBinaryStreamRep_> (src, codeConverter)};
465 Ensure (p.
IsSeekable () == (seekable == SeekableFlag::eSeekable));
473 if (seekable == nullopt) {
474 seekable = src.
IsSeekable () ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
476 using namespace Characters;
478 if (src.IsSeekable ()) {
479 auto savedSeek = src.GetOffset ();
481 byte bomData[Characters::kMaxBOMSize];
482 optional<tuple<Characters::UnicodeExternalEncodings, size_t>> bomInfo;
483 if (src.ReadAll (span{bomData}).size () == Memory::NEltsOf (bomData) and
484 (bomInfo = Characters::ReadByteOrderMark (span{bomData})).has_value ()) {
486 src.Seek (savedSeek + get<size_t> (*bomInfo));
487 return CodeCvt<>{get<UnicodeExternalEncodings> (*bomInfo)};
490 src.Seek (savedSeek);
494 switch (codeCvtFlags.value_or (AutomaticCodeCvtFlags::eDEFAULT)) {
495 case AutomaticCodeCvtFlags::eReadBOMAndIfNotPresentUseUTF8:
496 return CodeCvt<>{UnicodeExternalEncodings::eUTF8};
497 case AutomaticCodeCvtFlags::eReadBOMAndIfNotPresentUseCurrentLocale:
504 return New_ (src, codeConverter, *seekable, readAhead);
510 if (seekable == nullopt) {
511 seekable = src.
IsSeekable () ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
513 return New_ (src, codeConverter, *seekable, readAhead);
#define AssertNotReached()
NoDataAvailableHandling
If eDontBlock passed to most Stream APIs, then when the code would do a blocking read,...
@ eBlockIfNoDataAvailable
int64_t SignedSeekOffsetType
CodeCvt unifies byte <-> unicode conversions, vaguely inspired by (and wraps) std::codecvt,...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
virtual bool IsSeekable() const =0
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
nonvirtual bool IsSeekable() const
Returns true iff this object was constructed with a seekable input stream rep.
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
Ptr New(const InputStream::Ptr< byte > &src, optional< AutomaticCodeCvtFlags > codeCvtFlags={}, optional< SeekableFlag > seekable={}, ReadAhead readAhead=eReadAheadAllowed)
Create an InputStream::Ptr<Character> from the arguments (usually binary source) - which can be used ...