5#include "Stroika/Foundation/Streams/InternallySynchronizedInputStream.h"
8namespace Stroika::Foundation::Streams::BufferedInputStream {
12 [[noreturn]]
void ThrowCannotSeekFromEnd_ ();
15 template <
typename ELEMENT_TYPE>
16 class Rep_Seekable_FromSeekable_ :
public IRep_<ELEMENT_TYPE> {
18 Rep_Seekable_FromSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
22 Require (realIn.IsSeekable ());
24 virtual bool IsSeekable ()
const override
28 virtual void CloseRead ()
override
30 if (fRealIn_ !=
nullptr) {
33 Ensure (not IsOpenRead ());
34 Assert (fRealIn_ ==
nullptr);
36 virtual bool IsOpenRead ()
const override
38 return fRealIn_ !=
nullptr;
42 Require (IsOpenRead ());
43 return fReader_.GetOffset ();
45 virtual optional<size_t> AvailableToRead ()
override
48 Require (IsOpenRead ());
49 return fReader_.AvailableToRead ();
51 virtual optional<SeekOffsetType> RemainingLength ()
override
54 Require (IsOpenRead ());
55 return fReader_.RemainingLength ();
57 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
60 Require (IsOpenRead ());
61 return fReader_.Read (intoBuffer, blockFlag);
65 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
66 StreamReader<ELEMENT_TYPE> fReader_;
67 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
71 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
72 class Rep_Seekable_FromUnSeekable_ :
public IRep_<ELEMENT_TYPE> {
74 Rep_Seekable_FromUnSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
78 virtual bool IsSeekable ()
const override
82 virtual void CloseRead ()
override
84 if (fRealIn_ !=
nullptr) {
87 Ensure (not IsOpenRead ());
88 Assert (fRealIn_ ==
nullptr);
90 virtual bool IsOpenRead ()
const override
92 return fRealIn_ !=
nullptr;
96 Require (IsOpenRead ());
99 virtual optional<size_t> AvailableToRead ()
override
102 Require (IsOpenRead ());
103 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
104 return fBufferOfAllReadDataSoFar_.size () -
static_cast<size_t> (fSeekOffset_);
106 return fRealIn_.AvailableToRead ();
108 virtual optional<SeekOffsetType> RemainingLength ()
override
111 Require (IsOpenRead ());
112 if (
auto rl = fRealIn_.RemainingLength ()) {
113 return MapOffsetFromReal2Mine_ (*rl);
117 virtual auto SeekRead (Whence whence, SignedSeekOffsetType offset) ->
SeekOffsetType override
120 Require (IsOpenRead ());
123 case Whence::eFromCurrent:
124 fSeekOffset_ += offset;
126 case Whence::eFromStart:
127 fSeekOffset_ = offset;
129 case Whence::eFromEnd:
130 if (
auto remaining = this->RemainingLength ()) {
135 Private_::ThrowCannotSeekFromEnd_ ();
142 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
145 Require (IsOpenRead ());
146 Assert (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ());
147 if (fSeekOffset_ == fBufferOfAllReadDataSoFar_.size ()) [[unlikely]] {
148 ELEMENT_TYPE buf[1024];
149 if (
auto r = fRealIn_.Read (span{buf}, blockFlag)) {
150 fBufferOfAllReadDataSoFar_.push_back (*r);
156 if (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
157 size_t n2Read = min<size_t> (intoBuffer.size (),
static_cast<size_t> (fBufferOfAllReadDataSoFar_.size () - fSeekOffset_));
158 auto result = Memory::CopySpanData (span{fBufferOfAllReadDataSoFar_}.subspan (
static_cast<size_t> (fSeekOffset_), n2Read), intoBuffer);
159 Assert (result.size () == n2Read);
160 fSeekOffset_ += n2Read;
167 nonvirtual
SeekOffsetType MapOffsetFromReal2Mine_ (SeekOffsetType so)
const
174 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
175 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fBufferOfAllReadDataSoFar_;
177 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
181 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
182 class Rep_UnSeekable_ :
public IRep_<ELEMENT_TYPE> {
184 Rep_UnSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
188 virtual bool IsSeekable ()
const override
192 virtual void CloseRead ()
override
194 if (fRealIn_ !=
nullptr) {
197 Ensure (not IsOpenRead ());
198 Assert (fRealIn_ ==
nullptr);
200 virtual bool IsOpenRead ()
const override
202 return fRealIn_ !=
nullptr;
206 Require (IsOpenRead ());
207 return fReadOffsetIntoIntermediateBuf_ + fRealIn_.GetOffset ();
209 virtual optional<size_t> AvailableToRead ()
override
212 Require (IsOpenRead ());
213 size_t n = GetNEltsAlreadyBufferedFromUpstream_ ();
214 if (
auto o = fRealIn_.AvailableToRead ()) {
223 virtual optional<SeekOffsetType> RemainingLength ()
override
226 Require (IsOpenRead ());
227 if (
auto o = fRealIn_.RemainingLength ()) {
228 return *o + GetNEltsAlreadyBufferedFromUpstream_ ();
232 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
235 Require (IsOpenRead ());
236 auto n = GetNEltsAlreadyBufferedFromUpstream_ ();
237 if (n == 0) [[unlikely]] {
240 auto bufR = fRealIn_.Read (span{fIntermediateBuffer_}, blockFlag);
243 fReadOffsetIntoIntermediateBuf_ = 0;
245 fIntermediateBuffer_.resize_uninitialized (n);
252 Assert (n != 0 or fRealIn_.IsAtEOF ());
253 size_t n2Read = Math::AtMost (n, intoBuffer.size ());
254 auto t = Memory::CopySpanData (span{fIntermediateBuffer_}.subspan (fReadOffsetIntoIntermediateBuf_, n2Read), intoBuffer);
255 Assert (t.size () == n2Read);
256 fReadOffsetIntoIntermediateBuf_ += n2Read;
261 nonvirtual
size_t GetNEltsAlreadyBufferedFromUpstream_ ()
const
263 Assert (fReadOffsetIntoIntermediateBuf_ <= fIntermediateBuffer_.size ());
264 return fIntermediateBuffer_.size () - fReadOffsetIntoIntermediateBuf_;
268 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
269 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fIntermediateBuffer_;
270 size_t fReadOffsetIntoIntermediateBuf_{0};
271 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
280 template <
typename ELEMENT_TYPE>
281 inline auto New (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
283 using PTR = Ptr<ELEMENT_TYPE>;
285 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
286 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
287 if (useSeekable == SeekableFlag::eSeekable) {
288 return (srcSeekable == SeekableFlag::eSeekable)
289 ? PTR{make_shared<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> (realIn)}
290 : PTR{make_shared<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
293 return PTR{make_shared<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
296 template <
typename ELEMENT_TYPE>
298 optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
300 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
301 switch (internallySynchronized) {
302 case Execution::eInternallySynchronized: {
304 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
305 if (useSeekable == SeekableFlag::eSeekable) {
306 return (srcSeekable == SeekableFlag::eSeekable)
307 ? InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> ({}, realIn)
308 : InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
311 return InternallySynchronizedInputStream::New<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
314 case Execution::eNotKnownInternallySynchronized:
315 return New<ELEMENT_TYPE> (realIn, seekable);
327 template <
typename ELEMENT_TYPE>
328 inline Ptr<ELEMENT_TYPE>::Ptr (
const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
333 template <
typename ELEMENT_TYPE>
334 [[deprecated (
"Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE> New (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
335 optional<bool> seekable)
337 optional<SeekableFlag> sf;
339 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
341 return New (realIn, sf);
343 template <
typename ELEMENT_TYPE>
344 [[deprecated (
"Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE>
345 New (Execution::InternallySynchronized internallySynchronized,
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<bool> seekable = {})
347 optional<SeekableFlag> sf;
349 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
351 return New (internallySynchronized, realIn, sf);
#define RequireNotReached()
int64_t SignedSeekOffsetType
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...