5#include "Stroika/Foundation/Streams/InternallySynchronizedInputStream.h"
8namespace Stroika::Foundation::Streams::BufferedInputStream {
13 template <
typename ELEMENT_TYPE>
14 class Rep_Seekable_FromSeekable_ :
public IRep_<ELEMENT_TYPE> {
16 Rep_Seekable_FromSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
20 Require (realIn.IsSeekable ());
22 virtual bool IsSeekable ()
const override
26 virtual void CloseRead ()
override
28 if (fRealIn_ !=
nullptr) {
31 Ensure (not IsOpenRead ());
32 Assert (fRealIn_ ==
nullptr);
34 virtual bool IsOpenRead ()
const override
36 return fRealIn_ !=
nullptr;
40 Require (IsOpenRead ());
41 return fReader_.GetOffset ();
43 virtual optional<size_t> AvailableToRead ()
override
46 Require (IsOpenRead ());
47 return fReader_.AvailableToRead ();
49 virtual optional<SeekOffsetType> RemainingLength ()
override
52 Require (IsOpenRead ());
53 return fReader_.RemainingLength ();
55 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
58 Require (IsOpenRead ());
59 return fReader_.Read (intoBuffer, blockFlag);
63 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
64 StreamReader<ELEMENT_TYPE> fReader_;
65 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
69 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
70 class Rep_Seekable_FromUnSeekable_ :
public IRep_<ELEMENT_TYPE> {
72 Rep_Seekable_FromUnSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
76 virtual bool IsSeekable ()
const override
80 virtual void CloseRead ()
override
82 if (fRealIn_ !=
nullptr) {
85 Ensure (not IsOpenRead ());
86 Assert (fRealIn_ ==
nullptr);
88 virtual bool IsOpenRead ()
const override
90 return fRealIn_ !=
nullptr;
94 Require (IsOpenRead ());
97 virtual optional<size_t> AvailableToRead ()
override
100 Require (IsOpenRead ());
101 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
102 return fBufferOfAllReadDataSoFar_.size () -
static_cast<size_t> (fSeekOffset_);
104 return fRealIn_.AvailableToRead ();
106 virtual optional<SeekOffsetType> RemainingLength ()
override
109 Require (IsOpenRead ());
110 if (
auto rl = fRealIn_.RemainingLength ()) {
111 return MapOffsetFromReal2Mine_ (*rl);
115 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
118 Require (IsOpenRead ());
119 Assert (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ());
120 if (fSeekOffset_ == fBufferOfAllReadDataSoFar_.size ()) [[unlikely]] {
121 ELEMENT_TYPE buf[1024];
122 if (
auto r = fRealIn_.Read (span{buf}, blockFlag)) {
123 fBufferOfAllReadDataSoFar_.push_back (*r);
129 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
130 size_t n2Read = min<size_t> (intoBuffer.size (),
static_cast<size_t> (fBufferOfAllReadDataSoFar_.size () - fSeekOffset_));
131 auto result = Memory::CopySpanData (span{fBufferOfAllReadDataSoFar_}.subspan (
static_cast<size_t> (fSeekOffset_), n2Read), intoBuffer);
132 Assert (result.size () == n2Read);
133 fSeekOffset_ += n2Read;
140 nonvirtual
SeekOffsetType MapOffsetFromReal2Mine_ (SeekOffsetType so)
const
147 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
148 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fBufferOfAllReadDataSoFar_;
150 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
154 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
155 class Rep_UnSeekable_ :
public IRep_<ELEMENT_TYPE> {
157 Rep_UnSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
161 virtual bool IsSeekable ()
const override
165 virtual void CloseRead ()
override
167 if (fRealIn_ !=
nullptr) {
170 Ensure (not IsOpenRead ());
171 Assert (fRealIn_ ==
nullptr);
173 virtual bool IsOpenRead ()
const override
175 return fRealIn_ !=
nullptr;
179 Require (IsOpenRead ());
180 return fReadOffsetIntoIntermediateBuf_ + fRealIn_.GetOffset ();
182 virtual optional<size_t> AvailableToRead ()
override
185 Require (IsOpenRead ());
186 size_t n = GetNEltsAlreadyBufferedFromUpstream_ ();
187 if (
auto o = fRealIn_.AvailableToRead ()) {
196 virtual optional<SeekOffsetType> RemainingLength ()
override
199 Require (IsOpenRead ());
200 if (
auto o = fRealIn_.RemainingLength ()) {
201 return *o + GetNEltsAlreadyBufferedFromUpstream_ ();
205 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
208 Require (IsOpenRead ());
209 auto n = GetNEltsAlreadyBufferedFromUpstream_ ();
210 if (n == 0) [[unlikely]] {
213 auto bufR = fRealIn_.Read (span{fIntermediateBuffer_}, blockFlag);
216 fReadOffsetIntoIntermediateBuf_ = 0;
218 fIntermediateBuffer_.resize_uninitialized (n);
225 Assert (n != 0 or fRealIn_.IsAtEOF ());
226 size_t n2Read = Math::AtMost (n, intoBuffer.size ());
227 auto t = Memory::CopySpanData (span{fIntermediateBuffer_}.subspan (fReadOffsetIntoIntermediateBuf_, n2Read), intoBuffer);
228 Assert (t.size () == n2Read);
229 fReadOffsetIntoIntermediateBuf_ += n2Read;
234 nonvirtual
size_t GetNEltsAlreadyBufferedFromUpstream_ ()
const
236 Assert (fReadOffsetIntoIntermediateBuf_ <= fIntermediateBuffer_.size ());
237 return fIntermediateBuffer_.size () - fReadOffsetIntoIntermediateBuf_;
241 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
242 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fIntermediateBuffer_;
243 size_t fReadOffsetIntoIntermediateBuf_{0};
244 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
253 template <
typename ELEMENT_TYPE>
258 bool useSeekable = seekable.value_or (srcSeekable);
259 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
261 return srcSeekable ? PTR{make_shared<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> (realIn)}
262 : PTR{make_shared<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
265 return PTR{make_shared<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
268 template <
typename ELEMENT_TYPE>
272 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
273 switch (internallySynchronized) {
274 case Execution::eInternallySynchronized: {
276 bool useSeekable = seekable.value_or (srcSeekable);
279 ? InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> ({}, realIn)
280 : InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
283 return InternallySynchronizedInputStream::New<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
286 case Execution::eNotKnownInternallySynchronized:
287 return New<ELEMENT_TYPE> (realIn, seekable);
299 template <
typename ELEMENT_TYPE>
#define RequireNotReached()
int64_t SignedSeekOffsetType
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
nonvirtual bool IsSeekable() const
Returns true iff this object was constructed with a seekable input stream rep.