6#include "Stroika/Foundation/Streams/InternallySynchronizedInputStream.h"
9namespace Stroika::Foundation::Streams::BufferedInputStream {
13 [[noreturn]]
void ThrowCannotSeekFromEnd_ ();
16 template <
typename ELEMENT_TYPE>
17 class Rep_Seekable_FromSeekable_ :
public IRep_<ELEMENT_TYPE> {
19 Rep_Seekable_FromSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
23 Require (realIn.IsSeekable ());
25 virtual bool IsSeekable ()
const override
29 virtual void CloseRead ()
override
31 if (fRealIn_ !=
nullptr) {
34 Ensure (not IsOpenRead ());
35 Assert (fRealIn_ ==
nullptr);
37 virtual bool IsOpenRead ()
const override
39 return fRealIn_ !=
nullptr;
43 Require (IsOpenRead ());
44 return fReader_.GetOffset ();
46 virtual optional<size_t> AvailableToRead ()
override
49 Require (IsOpenRead ());
50 return fReader_.AvailableToRead ();
52 virtual optional<SeekOffsetType> RemainingLength ()
override
55 Require (IsOpenRead ());
56 return fReader_.RemainingLength ();
58 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
61 Require (IsOpenRead ());
62 return fReader_.Read (intoBuffer, blockFlag);
66 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
67 StreamReader<ELEMENT_TYPE> fReader_;
68 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
72 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
73 class Rep_Seekable_FromUnSeekable_ :
public IRep_<ELEMENT_TYPE> {
75 Rep_Seekable_FromUnSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
79 virtual bool IsSeekable ()
const override
83 virtual void CloseRead ()
override
85 if (fRealIn_ !=
nullptr) {
88 Ensure (not IsOpenRead ());
89 Assert (fRealIn_ ==
nullptr);
91 virtual bool IsOpenRead ()
const override
93 return fRealIn_ !=
nullptr;
97 Require (IsOpenRead ());
100 virtual optional<size_t> AvailableToRead ()
override
103 Require (IsOpenRead ());
104 if (fSeekOffset_ < fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
105 return fBufferOfAllReadDataSoFar_.size () -
static_cast<size_t> (fSeekOffset_);
107 return fRealIn_.AvailableToRead ();
109 virtual optional<SeekOffsetType> RemainingLength ()
override
112 Require (IsOpenRead ());
113 if (
auto rl = fRealIn_.RemainingLength ()) {
114 return MapOffsetFromReal2Mine_ (*rl);
118 virtual auto SeekRead (Whence whence, SignedSeekOffsetType offset) ->
SeekOffsetType override
121 Require (IsOpenRead ());
124 case Whence::eFromCurrent:
125 fSeekOffset_ += offset;
127 case Whence::eFromStart:
128 fSeekOffset_ = offset;
130 case Whence::eFromEnd:
131 if (
auto remaining = this->RemainingLength ()) {
136 Private_::ThrowCannotSeekFromEnd_ ();
143 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
146 Require (IsOpenRead ());
147 Assert (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ());
148 if (fSeekOffset_ == fBufferOfAllReadDataSoFar_.size ()) [[unlikely]] {
149 ELEMENT_TYPE buf[1024];
150 if (
auto r = fRealIn_.Read (span{buf}, blockFlag)) {
151 fBufferOfAllReadDataSoFar_.push_back (*r);
157 if (fSeekOffset_ <= fBufferOfAllReadDataSoFar_.size ()) [[likely]] {
158 size_t n2Read = min<size_t> (intoBuffer.size (),
static_cast<size_t> (fBufferOfAllReadDataSoFar_.size () - fSeekOffset_));
159 auto result = Memory::CopySpanData (span{fBufferOfAllReadDataSoFar_}.subspan (
static_cast<size_t> (fSeekOffset_), n2Read), intoBuffer);
160 Assert (result.size () == n2Read);
161 fSeekOffset_ += n2Read;
168 nonvirtual
SeekOffsetType MapOffsetFromReal2Mine_ (SeekOffsetType so)
const
175 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
176 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fBufferOfAllReadDataSoFar_;
178 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
182 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
183 class Rep_UnSeekable_ :
public IRep_<ELEMENT_TYPE> {
185 Rep_UnSeekable_ (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn)
189 virtual bool IsSeekable ()
const override
193 virtual void CloseRead ()
override
195 if (fRealIn_ !=
nullptr) {
198 Ensure (not IsOpenRead ());
199 Assert (fRealIn_ ==
nullptr);
201 virtual bool IsOpenRead ()
const override
203 return fRealIn_ !=
nullptr;
207 Require (IsOpenRead ());
208 return fReadOffsetIntoIntermediateBuf_ + fRealIn_.GetOffset ();
210 virtual optional<size_t> AvailableToRead ()
override
213 Require (IsOpenRead ());
214 size_t n = GetNEltsAlreadyBufferedFromUpstream_ ();
215 if (
auto o = fRealIn_.AvailableToRead ()) {
224 virtual optional<SeekOffsetType> RemainingLength ()
override
227 Require (IsOpenRead ());
228 if (
auto o = fRealIn_.RemainingLength ()) {
229 return *o + GetNEltsAlreadyBufferedFromUpstream_ ();
233 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
236 Require (IsOpenRead ());
237 auto n = GetNEltsAlreadyBufferedFromUpstream_ ();
238 if (n == 0) [[unlikely]] {
241 auto bufR = fRealIn_.Read (span{fIntermediateBuffer_}, blockFlag);
244 fReadOffsetIntoIntermediateBuf_ = 0;
246 fIntermediateBuffer_.resize_uninitialized (n);
253 Assert (n != 0 or fRealIn_.IsAtEOF ());
254 size_t n2Read = Math::AtMost (n, intoBuffer.size ());
255 auto t = Memory::CopySpanData (span{fIntermediateBuffer_}.subspan (fReadOffsetIntoIntermediateBuf_, n2Read), intoBuffer);
256 Assert (t.size () == n2Read);
257 fReadOffsetIntoIntermediateBuf_ += n2Read;
262 nonvirtual
size_t GetNEltsAlreadyBufferedFromUpstream_ ()
const
264 Assert (fReadOffsetIntoIntermediateBuf_ <= fIntermediateBuffer_.size ());
265 return fIntermediateBuffer_.size () - fReadOffsetIntoIntermediateBuf_;
269 typename InputStream::Ptr<ELEMENT_TYPE> fRealIn_;
270 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fIntermediateBuffer_;
271 size_t fReadOffsetIntoIntermediateBuf_{0};
272 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
281 template <
typename ELEMENT_TYPE>
282 inline auto New (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
284 using PTR = Ptr<ELEMENT_TYPE>;
286 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
287 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
288 if (useSeekable == SeekableFlag::eSeekable) {
289 return (srcSeekable == SeekableFlag::eSeekable)
290 ? PTR{Memory::MakeSharedPtr<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> (realIn)}
291 : PTR{Memory::MakeSharedPtr<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
294 return PTR{Memory::MakeSharedPtr<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> (realIn)};
297 template <
typename ELEMENT_TYPE>
299 optional<SeekableFlag> seekable) -> Ptr<ELEMENT_TYPE>
301 constexpr size_t INLINE_BUF_SIZE = 4 * 1024;
302 switch (internallySynchronized) {
303 case Execution::eInternallySynchronized: {
305 SeekableFlag useSeekable = seekable.value_or (srcSeekable);
306 if (useSeekable == SeekableFlag::eSeekable) {
307 return (srcSeekable == SeekableFlag::eSeekable)
308 ? InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromSeekable_<ELEMENT_TYPE>> ({}, realIn)
309 : InternallySynchronizedInputStream::New<Private_::Rep_Seekable_FromUnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
312 return InternallySynchronizedInputStream::New<Private_::Rep_UnSeekable_<ELEMENT_TYPE, INLINE_BUF_SIZE>> ({}, realIn);
315 case Execution::eNotKnownInternallySynchronized:
316 return New<ELEMENT_TYPE> (realIn, seekable);
328 template <
typename ELEMENT_TYPE>
329 inline Ptr<ELEMENT_TYPE>::Ptr (
const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
334 template <
typename ELEMENT_TYPE>
335 [[deprecated (
"Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE> New (
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn,
336 optional<bool> seekable)
338 optional<SeekableFlag> sf;
340 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
342 return New (realIn, sf);
344 template <
typename ELEMENT_TYPE>
345 [[deprecated (
"Since Stroika v3.0d19 use Seekability overload")]] Ptr<ELEMENT_TYPE>
346 New (Execution::InternallySynchronized internallySynchronized,
const typename InputStream::Ptr<ELEMENT_TYPE>& realIn, optional<bool> seekable = {})
348 optional<SeekableFlag> sf;
350 sf = *seekable ? SeekableFlag::eSeekable : SeekableFlag::eNotSeekable;
352 return New (internallySynchronized, realIn, sf);
#define RequireNotReached()
int64_t SignedSeekOffsetType
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...