4#include "Stroika/Foundation/StroikaPreComp.h"
8#include "Stroika/Foundation/Containers/Support/ReserveTweaks.h"
10#include "Stroika/Foundation/Execution/Common.h"
11#include "Stroika/Foundation/Execution/OperationNotSupportedException.h"
24using namespace Stroika::Foundation::Streams;
25using namespace Stroika::Foundation::Streams::BinaryToText;
32 const auto kReadPartialCharacterAtEndOfBinaryStreamException_ =
39 , _fCharConverter{charConverter}
58 struct _ReadAheadCache {
62 optional<_ReadAheadCache> _fReadAheadCache;
71 if (_fSource !=
nullptr) {
74 Ensure (not IsOpenRead ());
75 Assert (_fSource ==
nullptr);
79 return _fSource !=
nullptr;
83 Require (IsOpenRead ());
84 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
89 size_t currentByteGoal = 1;
91 if (
auto o = PreReadUpstreamInto_ (span{inByteBuf}, currentByteGoal,
eDontBlock)) {
92 span<const byte> binarySrcSpan{*o};
94 span<Character> targetBuf{&ignoredChar, 1};
95 Assert (_fCharConverter.ComputeTargetCharacterBufferSize (binarySrcSpan.size ()) <= targetBuf.size ());
96 span<Character> convertedCharacters = _fCharConverter.Bytes2Characters (&binarySrcSpan, targetBuf);
97 if (convertedCharacters.empty ()) {
100 Prepend2ReadAheadCache_ (binarySrcSpan);
104 Assert (_fReadAheadCache == nullopt);
105 Assert (not binarySrcSpan.empty () and not convertedCharacters.empty ());
106 Assert (convertedCharacters.size () <= targetBuf.size ());
107 Prepend2ReadAheadCache_ (binarySrcSpan);
115 Require (IsOpenRead ());
120 Require (not intoBuffer.empty ());
121 Require (IsOpenRead ());
122 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
132 size_t readAtMostCharacters = intoBuffer.size ();
133 StackBuffer<byte, 8 * 1024> inByteBuf{Memory::eUninitialized, readAtMostCharacters};
134 size_t currentByteGoal = 1;
136 if (
auto o = PreReadUpstreamInto_ (span{inByteBuf}, currentByteGoal, blockFlag)) {
137 span<const byte> binarySrcSpan{*o};
138 span<Character> targetBuf{intoBuffer};
139 Assert (_fCharConverter.ComputeTargetCharacterBufferSize (inByteBuf.size ()) <= targetBuf.size ());
140 span<Character> convertedCharacters = _fCharConverter.Bytes2Characters (&binarySrcSpan, targetBuf);
141 if (binarySrcSpan.empty ()) {
143 Assert (convertedCharacters.size () <= targetBuf.size ());
144 _fOffset += convertedCharacters.size ();
145 return intoBuffer.subspan (0, convertedCharacters.size ());
147 else if (convertedCharacters.empty ()) {
150 Prepend2ReadAheadCache_ (binarySrcSpan);
154 Assert (not binarySrcSpan.empty () and not convertedCharacters.empty ());
155 Assert (convertedCharacters.size () <= targetBuf.size ());
156 _fOffset += convertedCharacters.size ();
158 Prepend2ReadAheadCache_ (binarySrcSpan);
159 return intoBuffer.subspan (0, convertedCharacters.size ());
164 Assert (blockFlag == eDontBlock);
170 template <
size_t EXTENT>
171 span<byte> ReadFromAndRemoveFromReadAheadCache_ (span<byte, EXTENT> intoSpan)
173 size_t bytes2Copy = intoSpan.size ();
174 Require (_fReadAheadCache and _fReadAheadCache->fData.size () >= bytes2Copy);
175 Require (this->_fOffset == _fReadAheadCache->fFrom);
176 Memory::CopyBytes (span{_fReadAheadCache->fData}, intoSpan);
177 if (bytes2Copy == _fReadAheadCache->fData.size ()) {
178 _fReadAheadCache.reset ();
181 _fReadAheadCache.emplace (_ReadAheadCache{.fFrom = _fOffset + bytes2Copy, .fData = span{_fReadAheadCache->fData}.subspan (bytes2Copy)});
185 template <
size_t EXTENT>
186 void Prepend2ReadAheadCache_ (span<const byte, EXTENT> data2Append)
188 if (_fReadAheadCache) {
189 Require (_fOffset == _fReadAheadCache->fFrom - data2Append.size ());
190 StackBuffer<byte> combined{data2Append};
191 combined.push_back (span{_fReadAheadCache->fData});
192 _fReadAheadCache.emplace (_ReadAheadCache{.fFrom = _fOffset, .fData = combined});
195 _fReadAheadCache.emplace (_ReadAheadCache{.fFrom = _fOffset, .fData = data2Append});
208 nonvirtual optional<span<const byte>> PreReadUpstreamInto_ (span<byte> intoBuf,
size_t goalSizeAtLeast,
NoDataAvailableHandling blockFlag)
210 Require (goalSizeAtLeast >= 1);
211 Require (goalSizeAtLeast <= intoBuf.size ());
212 span<const byte> result;
213 if (_fReadAheadCache and _fReadAheadCache->fFrom == this->_fOffset) {
219 Assert (intoBuf.size () >= _fReadAheadCache->fData.size ());
220 size_t amtToCopy = min (_fReadAheadCache->fData.size (), intoBuf.size ());
221 result = ReadFromAndRemoveFromReadAheadCache_ (intoBuf.subspan (0, amtToCopy));
223 while (result.size () < goalSizeAtLeast) {
226 auto r = _fSource.ReadBlocking (intoBuf.subspan (result.size ()));
227 if (r.size () == 0) {
233 result = intoBuf.subspan (0, result.size () + r.size ());
237 if (
auto o = _fSource.ReadNonBlocking (intoBuf.subspan (result.size ()))) {
238 if (o->size () == 0) {
244 result = intoBuf.subspan (0, result.size () + o->size ());
250 Prepend2ReadAheadCache_ (result);
260 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
261 Require (IsOpenRead ());
272 class UnseekableBinaryStreamRep_ final :
public FromBinaryStreamBaseRep_ {
273 using inherited = FromBinaryStreamBaseRep_;
277 : inherited{src, charConverter}
289 class CachingSeekableBinaryStreamRep_ final :
public FromBinaryStreamBaseRep_ {
290 using inherited = FromBinaryStreamBaseRep_;
294 : FromBinaryStreamBaseRep_{src, charConverter}
295 , fReadAheadAllowed_{readAhead == Reader::
ReadAhead::eReadAheadAllowed}
300 virtual bool IsSeekable ()
const override
304 virtual optional<span<Character>> Read (span<Character> intoBuffer,
NoDataAvailableHandling blockFlag)
override
306 Require (not intoBuffer.empty ());
307 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
308 Require (IsOpenRead ());
312 if (_fOffset < fCache_.size ()) {
314 size_t nToRead = intoBuffer.size ();
315 size_t nInBufAvail = fCache_.size () -
static_cast<size_t> (_fOffset);
316 nToRead = min (nToRead, nInBufAvail);
317 Assert (nToRead > 0);
318 for (
size_t i = 0; i < nToRead; ++i) {
319 intoBuffer[i] = fCache_[i +
static_cast<size_t> (_fOffset)];
322 return intoBuffer.subspan (0, nToRead);
328 size_t n = bufEnd - bufStart;
329 size_t newCacheSize =
static_cast<size_t> (origOffset + n);
330 Assert (fCache_.size () ==
static_cast<size_t> (origOffset));
331 Assert (newCacheSize > fCache_.size ());
332 Containers::Support::ReserveTweaks::Reserve4AddN (fCache_, n);
333 fCache_.resize_uninitialized (newCacheSize);
334 for (
size_t i = 0; i < n; ++i) {
335 fCache_[i +
static_cast<size_t> (origOffset)] = bufStart[i];
340 constexpr size_t kMinCachedReadSize_{512};
341 if (intoBuffer.size () >= kMinCachedReadSize_ or not fReadAheadAllowed_) {
342 auto result = inherited::Read (intoBuffer, blockFlag);
343 if (result == nullopt) {
344 Throw (EWouldBlock::kThe);
346 if (result->size () != 0) {
347 if (origOffset + result->size () > numeric_limits<size_t>::max ()) [[unlikely]] {
349 Throw (range_error{
"seek past max size for size_t"});
351 pushIntoCacheBuf (intoBuffer.data (), intoBuffer.data () + result->size ());
357 constexpr size_t kUseCacheSize_ = 8 * kMinCachedReadSize_;
360 inherited::Read (span{
reinterpret_cast<Character*
> (std::begin (buf)),
reinterpret_cast<Character*
> (std::end (buf))}, blockFlag);
361 if (result == nullopt) {
362 Throw (EWouldBlock::kThe);
364 if (result->size () != 0) {
365 if (origOffset + result->size () > numeric_limits<size_t>::max ()) [[unlikely]] {
367 Throw (range_error{
"seek past max size for size_t"});
369 pushIntoCacheBuf (std::begin (buf), std::begin (buf) + result->size ());
370 result = result->subspan (0, min (intoBuffer.size (), result->size ()));
371 DISABLE_COMPILER_GCC_WARNING_START (
"GCC diagnostic ignored \"-Wclass-memaccess\"");
372 (void)::memcpy (intoBuffer.data (), std::begin (buf), result->size () *
sizeof (
Character));
373 DISABLE_COMPILER_GCC_WARNING_END (
"GCC diagnostic ignored \"-Wclass-memaccess\"");
374 _fOffset = origOffset + result->size ();
381 static const auto kException_ = range_error{
"seek"};
382 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
383 Require (IsOpenRead ());
386 if (offset < 0) [[unlikely]] {
394 if (newOffset < 0) [[unlikely]] {
398 SeekTo_ (
static_cast<size_t> (uNewOffset));
404 for (
auto o = Read (span{&c, 1}, NoDataAvailableHandling::eBlockIfNoDataAvailable); o && o->size () == 1;
405 o = Read (span{&c, 1}, NoDataAvailableHandling::eBlockIfNoDataAvailable)) {
408 SeekTo_ (_fOffset + offset);
417 static const auto kException_ = range_error{
"seek"};
419 while (_fOffset < offset) {
423 if (
auto o = Read (span{&c, 1}, NoDataAvailableHandling::eBlockIfNoDataAvailable); o && o->size () == 0) [[unlikely]] {
427 Ensure (_fOffset == offset);
436 if (offset > _fOffset) {
437 SeekFowardTo_ (offset);
439 else if (offset < _fOffset) {
440 SeekBackwardTo_ (offset);
442 Ensure (_fOffset == offset);
446 bool fReadAheadAllowed_{
false};
462 Ptr p = (seekable == SeekableFlag::eSeekable) ?
Ptr{make_shared<CachingSeekableBinaryStreamRep_> (src, codeConverter, readAhead)}
463 :
Ptr{make_shared<UnseekableBinaryStreamRep_> (src, codeConverter)};
464 Ensure (p.
IsSeekable () == (seekable == SeekableFlag::eSeekable));
472 if (seekable == nullopt) {
473 seekable = src.
IsSeekable () ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
475 using namespace Characters;
477 if (src.IsSeekable ()) {
478 auto savedSeek = src.GetOffset ();
480 byte bomData[Characters::kMaxBOMSize];
481 optional<tuple<Characters::UnicodeExternalEncodings, size_t>> bomInfo;
482 if (src.ReadAll (span{bomData}).size () == Memory::NEltsOf (bomData) and
483 (bomInfo = Characters::ReadByteOrderMark (span{bomData})).has_value ()) {
485 src.Seek (savedSeek + get<size_t> (*bomInfo));
486 return CodeCvt<>{get<UnicodeExternalEncodings> (*bomInfo)};
489 src.Seek (savedSeek);
493 switch (codeCvtFlags.value_or (AutomaticCodeCvtFlags::eDEFAULT)) {
494 case AutomaticCodeCvtFlags::eReadBOMAndIfNotPresentUseUTF8:
495 return CodeCvt<>{UnicodeExternalEncodings::eUTF8};
496 case AutomaticCodeCvtFlags::eReadBOMAndIfNotPresentUseCurrentLocale:
503 return New_ (src, codeConverter, *seekable, readAhead);
509 if (seekable == nullopt) {
510 seekable = src.
IsSeekable () ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
512 return New_ (src, codeConverter, *seekable, readAhead);
#define AssertNotReached()
NoDataAvailableHandling
If eDontBlock passed to most Stream APIs, then when the code would do a blocking read,...
@ eBlockIfNoDataAvailable
int64_t SignedSeekOffsetType
CodeCvt unifies byte <-> unicode conversions, vaguely inspired by (and wraps) std::codecvt,...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
virtual bool IsSeekable() const =0
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
nonvirtual bool IsSeekable() const
Returns true iff this object was constructed with a seekable input stream rep.
Iterable<T> is a base class for containers which easily produce an Iterator<T> to traverse them.
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
Ptr New(const InputStream::Ptr< byte > &src, optional< AutomaticCodeCvtFlags > codeCvtFlags={}, optional< SeekableFlag > seekable={}, ReadAhead readAhead=eReadAheadAllowed)
Create an InputStream::Ptr<Character> from the arguments (usually binary source) - which can be used ...