Stroika Library 3.0d16
 
Loading...
Searching...
No Matches
ToSeekableInputStream.inl
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2025. All rights reserved
3 */
6
7namespace Stroika::Foundation::Streams::ToSeekableInputStream {
8
9 /*
10 ********************************************************************************
11 ****************** Streams::ToSeekableInputStream::New *************************
12 ********************************************************************************
13 */
14 template <typename ELEMENT_TYPE>
15 auto New (const Ptr<ELEMENT_TYPE>& in) -> Ptr<ELEMENT_TYPE>
16 {
18 struct seekableWrapper final : InputStreamDelegationHelper<ELEMENT_TYPE> {
19
21 seekableWrapper (const Ptr<ELEMENT_TYPE>& in)
22 : inherited{in}
23 , fOffset_{in.GetOffset ()}
24 , fCacheBaseOffset_{fOffset_}
25 {
26 Assert (not this->fRealIn.IsSeekable ()); // just to document that's why we're here!
27 }
28 virtual bool IsSeekable () const override
29 {
30 return true;
31 }
32 virtual optional<size_t> AvailableToRead () override
33 {
34 SeekOffsetType cacheEnd = fCacheBaseOffset_ + fCachedData_.size ();
35 if (fCacheBaseOffset_ <= fOffset_ and fOffset_ < cacheEnd) [[unlikely]] {
36 Ensure (cacheEnd - fOffset_ > 0);
37 return static_cast<size_t> (cacheEnd - fOffset_);
38 }
39 return this->fRealIn.AvailableToRead ();
40 }
41 virtual optional<SeekOffsetType> RemainingLength () override
42 {
43 auto baseRemaining = this->fRealIn.RemainingLength ();
44 if (baseRemaining) {
45 SeekOffsetType cacheEnd = fCacheBaseOffset_ + fCachedData_.size ();
46 Assert (fOffset_ <= cacheEnd);
47 baseRemaining = *baseRemaining + static_cast<size_t> (cacheEnd - fOffset_); // if we have some cached data past current seek offset, add it too
48 }
49 return baseRemaining;
50 }
51 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, [[maybe_unused]] NoDataAvailableHandling blockFlag) override
52 {
53 Require (not intoBuffer.empty ());
54 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
55 /*
56 * See if the request can be serviced from the cached data. If so, do so.
57 */
58 SeekOffsetType cacheEnd = fCacheBaseOffset_ + fCachedData_.size ();
59 if (fCacheBaseOffset_ <= fOffset_ and fOffset_ < cacheEnd) [[unlikely]] {
60 size_t copyCnt = max<size_t> (static_cast<size_t> (cacheEnd - fOffset_), intoBuffer.size ());
61 auto r = Memory::CopyBytes (span{fCachedData_}.subspan (static_cast<size_t> (fOffset_ - fCacheBaseOffset_), copyCnt), intoBuffer);
62 fOffset_ += copyCnt;
63 return r;
64 }
65 /*
66 * If it cannot, accumulate any read data into the cache so it can be re-read.
67 */
68 Assert (fOffset_ == inherited::fRealIn.GetOffset ()); // could be bug with this code or somebody else playing fast and loose, but use assert
69 auto r = this->fRealIn.ReadOrThrow (intoBuffer, blockFlag);
70 // cache it, and update our data structures; note easy, cuz fRealIn must be at matching seek offset
71 fCachedData_.push_back (r);
72 fOffset_ += r.size ();
73 return r;
74 }
75 virtual SeekOffsetType GetReadOffset () const override
76 {
77 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
78 return fOffset_;
79 }
80 virtual SeekOffsetType SeekRead (Whence whence, SignedSeekOffsetType offset) override
81 {
82 static const auto kException_ = range_error{"seek"};
83 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
84 switch (whence) {
85 case Whence::eFromStart: {
86 if (offset < 0) [[unlikely]] {
87 Execution::Throw (kException_);
88 }
89 SeekOffsetType newOffset = static_cast<SeekOffsetType> (offset);
90 Require (newOffset >= fCacheBaseOffset_); // as documented in New() code that creates this, we cannot seek back past where we started from
91 SeekOffsetType cacheEnd = fCacheBaseOffset_ + fCachedData_.size ();
92 while (newOffset > cacheEnd) {
93 // we must read and buffer/accumulate in the cache; note - because of how this code works, the
94 // fRealIn is always seeked to the end cached data.
95 byte someBuf[1024];
96 auto r = this->fRealIn.ReadBlocking (span{someBuf});
97 fCachedData_.push_back (r); // nb: an exception in this copy would cause fRealIn offset to be out of sync, but not sure what todo about it
98 cacheEnd = fCacheBaseOffset_ + fCachedData_.size ();
99 }
100 Assert (newOffset <= cacheEnd);
101 fOffset_ = newOffset;
102 return newOffset;
103 } break;
104 case eFromCurrent: {
105 return this->SeekRead (eFromStart, fOffset_ + offset);
106 } break;
107 case eFromEnd: {
108 if (auto remainingLength = this->RemainingLength ()) {
109 return this->SeekRead (eFromStart, fOffset_ + *remainingLength + offset);
110 }
111 else {
112 // implies seeking (fRealIn) to the end, and so reading everything, and then performing the desired seek
113 while (true) {
114 byte someBuf[8 * 1024];
115 auto r = this->fRealIn.ReadBlocking (span{someBuf});
116 fCachedData_.push_back (r); // nb: an exception in this copy would cause fRealIn offset to be out of sync, but not sure what todo about it
117 if (r.empty ()) {
118 break;
119 }
120 }
121 SeekOffsetType realEnd = fCacheBaseOffset_ + fCachedData_.size ();
122 Assert (realEnd == this->fRealIn.GetOffset ());
123 return this->SeekRead (eFromStart, realEnd + offset);
124 }
125 } break;
126 default:
128 return 0;
129 }
130 }
131
132 private:
134 SeekOffsetType fOffset_{0}; // this rep's seek offset (as oppsed to that in fRealIn)
135 SeekOffsetType fCacheBaseOffset_{0};
136 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
137 };
138 if (in.IsSeekable ()) {
139 return in;
140 }
141 else {
142 return Ptr<ELEMENT_TYPE>{make_shared<seekableWrapper> (in)};
143 }
144 return in;
145 }
146
147}
#define RequireNotReached()
Definition Assertions.h:385
NoDataAvailableHandling
If eDontBlock passed to most Stream APIs, then when the code would do a blocking read,...
Definition Stream.h:90
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
InputStream<>::Ptr is Smart pointer (with abstract Rep) class defining the interface to reading from ...
nonvirtual SeekOffsetType GetOffset() const