7#include "Stroika/Foundation/Containers/Support/ReserveTweaks.h"
9#include "Stroika/Foundation/Execution/NullMutex.h"
10#include "Stroika/Foundation/Execution/WaitableEvent.h"
13namespace Stroika::Foundation::Streams::SharedMemoryStream {
16 template <
typename ELEMENT_TYPE>
17 class IRep_ :
public InputOutputStream::IRep<ELEMENT_TYPE> {
19 virtual Options GetOptions ()
const = 0;
20 virtual vector<ELEMENT_TYPE> AsVector ()
const = 0;
21 virtual string AsString ()
const = 0;
23 template <
typename ELEMENT_TYPE,
typename LOCK_IMPL>
24 class SeekableRep_ :
public IRep_<ELEMENT_TYPE> {
26 using ElementType = ELEMENT_TYPE;
29 bool fIsOpenForRead_{
true};
32 static constexpr bool kLocking_ = same_as<LOCK_IMPL, recursive_mutex>;
36 : fReadCursor_{fData_.begin ()}
37 , fWriteCursor_{fData_.begin ()}
40 SeekableRep_ (
const SeekableRep_&) =
delete;
41 ~SeekableRep_ () =
default;
42 nonvirtual SeekableRep_& operator= (
const SeekableRep_&) =
delete;
45 virtual bool IsSeekable ()
const override
49 virtual void CloseWrite ()
override
52 [[maybe_unused]] lock_guard critSec{fMutex_};
53 fClosedForWrites_ =
true;
55 if constexpr (kLocking_) {
56 fMoreDataWaiter_.Set ();
58 Ensure (not IsOpenWrite ());
60 virtual bool IsOpenWrite ()
const override
62 return not fClosedForWrites_;
64 virtual void CloseRead ()
override
66 fIsOpenForRead_ =
false;
67 Ensure (not IsOpenRead ());
69 virtual bool IsOpenRead ()
const override
71 return fIsOpenForRead_;
73 virtual optional<size_t> AvailableToRead ()
override
75 Require (IsOpenRead ());
76 [[maybe_unused]] lock_guard critSec{fMutex_};
77 size_t nDefinitelyAvail = distance (fReadCursor_, fData_.cend ());
78 if (nDefinitelyAvail > 0) {
79 return nDefinitelyAvail;
81 else if (fClosedForWrites_) {
88 virtual optional<SeekOffsetType> RemainingLength ()
override
90 Require (IsOpenRead ());
93 DISABLE_COMPILER_GCC_WARNING_START (
"GCC diagnostic ignored \"-Wunused-label\"");
94 DISABLE_COMPILER_MSC_WARNING_START (4102)
95 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
97 Require (not intoBuffer.empty ());
98 Require (IsOpenRead ());
99 size_t nRequested = intoBuffer.size ();
103 if (this->AvailableToRead () == nullopt) {
108 if constexpr (kLocking_) {
109 fMoreDataWaiter_.Wait ();
114 [[maybe_unused]] lock_guard critSec{fMutex_};
115 Assert ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
116 size_t nAvail = distance (fReadCursor_, fData_.cend ());
117 if (nAvail == 0 and not fClosedForWrites_) {
118 if constexpr (kLocking_) {
119 fMoreDataWaiter_.Reset ();
123 Require (blockFlag == eDontBlock);
126 size_t nCopied = min (nAvail, nRequested);
128 copy (fReadCursor_, fReadCursor_ + nCopied, intoBuffer.data ());
129 fReadCursor_ = fReadCursor_ + nCopied;
131 return intoBuffer.subspan (0, nCopied);
133 DISABLE_COMPILER_MSC_WARNING_END (4102)
134 DISABLE_COMPILER_GCC_WARNING_END ("GCC diagnostic ignored \"-Wunused-label\"");
135 virtual
void Write (span<const ELEMENT_TYPE> elts)
override
137 Require (not elts.empty ());
138 Require (IsOpenWrite ());
139 [[maybe_unused]] lock_guard critSec{fMutex_};
140 size_t roomLeft = distance (fWriteCursor_, fData_.end ());
141 size_t roomRequired = elts.size ();
142 if constexpr (kLocking_) {
143 fMoreDataWaiter_.Set ();
145 if (roomLeft < roomRequired) {
146 size_t curReadOffset = distance (fData_.cbegin (), fReadCursor_);
147 size_t curWriteOffset = distance (fData_.begin (), fWriteCursor_);
148 const size_t kChunkSize_ = 128;
149 Containers::Support::ReserveTweaks::Reserve4AddN (fData_, roomRequired - roomLeft, kChunkSize_);
150 fData_.resize (curWriteOffset + roomRequired);
151 fReadCursor_ = fData_.begin () + curReadOffset;
152 fWriteCursor_ = fData_.begin () + curWriteOffset;
153 Assert (fWriteCursor_ < fData_.end ());
155 copy (elts.data (), elts.data () + roomRequired, fWriteCursor_);
156 fWriteCursor_ += roomRequired;
157 Assert (fReadCursor_ < fData_.end ());
158 Assert (fWriteCursor_ <= fData_.end ());
160 virtual void Flush ()
override
166 Require (IsOpenRead ());
167 [[maybe_unused]] lock_guard critSec{fMutex_};
168 return distance (fData_.begin (), fReadCursor_);
170 virtual SeekOffsetType SeekRead (Whence whence, SignedSeekOffsetType offset)
override
172 Require (IsOpenRead ());
173 [[maybe_unused]] lock_guard critSec{fMutex_};
174 if constexpr (kLocking_) {
175 fMoreDataWaiter_.Set ();
179 if (offset < 0) [[unlikely]] {
183 if (uOffset > fData_.size ()) [[unlikely]] {
186 fReadCursor_ = fData_.begin () +
static_cast<size_t> (uOffset);
189 Streams::SeekOffsetType curOffset = distance (fData_.cbegin (), fReadCursor_);
190 Streams::SignedSeekOffsetType newOffset = curOffset + offset;
191 if (newOffset < 0) [[unlikely]] {
195 if (uNewOffset > fData_.size ()) [[unlikely]] {
198 fReadCursor_ = fData_.begin () +
static_cast<size_t> (uNewOffset);
201 Streams::SignedSeekOffsetType newOffset = fData_.size () + offset;
202 if (newOffset < 0) [[unlikely]] {
206 if (uNewOffset > fData_.size ()) [[unlikely]] {
209 fReadCursor_ = fData_.begin () +
static_cast<size_t> (uNewOffset);
212 Ensure ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
213 return distance (fData_.cbegin (), fReadCursor_);
217 Require (IsOpenWrite ());
218 [[maybe_unused]] lock_guard critSec{fMutex_};
219 return distance (fData_.begin (),
static_cast<typename vector<ElementType>::const_iterator
> (fWriteCursor_));
221 virtual SeekOffsetType SeekWrite (Whence whence, SignedSeekOffsetType offset)
override
223 Require (IsOpenWrite ());
224 [[maybe_unused]] lock_guard critSec{fMutex_};
225 if constexpr (kLocking_) {
226 fMoreDataWaiter_.Set ();
230 if (offset < 0) [[unlikely]] {
233 if (
static_cast<SeekOffsetType> (offset) > fData_.size ()) [[unlikely]] {
236 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (offset);
239 Streams::SeekOffsetType curOffset = distance (fData_.begin (), fWriteCursor_);
240 Streams::SignedSeekOffsetType newOffset = curOffset + offset;
241 if (newOffset < 0) [[unlikely]] {
244 if (
static_cast<size_t> (newOffset) > fData_.size ()) [[unlikely]] {
247 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (newOffset);
250 Streams::SignedSeekOffsetType newOffset = fData_.size () + offset;
251 if (newOffset < 0) [[unlikely]] {
254 if (
static_cast<size_t> (newOffset) > fData_.size ()) [[unlikely]] {
257 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (newOffset);
260 Ensure ((fData_.begin () <= fWriteCursor_) and (fWriteCursor_ <= fData_.end ()));
261 return distance (fData_.begin (), fWriteCursor_);
264 virtual Options GetOptions ()
const override
266 return Options{.fInternallySynchronized = same_as<LOCK_IMPL, recursive_mutex>
267 ? Execution::InternallySynchronized::eInternallySynchronized
268 : Execution::InternallySynchronized::eNotKnownInternallySynchronized,
271 virtual vector<ElementType> AsVector ()
const override
273 [[maybe_unused]] lock_guard critSec{fMutex_};
276 virtual string AsString ()
const override
278 [[maybe_unused]] lock_guard critSec{fMutex_};
283 static inline const auto kSeekException_ = range_error{
"seek"};
284 mutable LOCK_IMPL fMutex_;
285 [[no_unique_address]] conditional_t<kLocking_, Execution::WaitableEvent, Common::Empty> fMoreDataWaiter_{};
286 vector<ElementType> fData_;
287 typename vector<ElementType>::const_iterator fReadCursor_;
288 typename vector<ElementType>::iterator fWriteCursor_;
289 bool fClosedForWrites_{
false};
292 template <
typename ELEMENT_TYPE,
typename LOCK_IMPL>
293 class UnseekableRep_ :
public IRep_<ELEMENT_TYPE> {
295 using ElementType = ELEMENT_TYPE;
298 bool fIsOpenForRead_{
true};
301 static constexpr bool kLocking_ = same_as<LOCK_IMPL, recursive_mutex>;
305 : fReadCursor_{fData_.begin ()}
306 , fWriteCursor_{fData_.begin ()}
309 UnseekableRep_ (
const UnseekableRep_&) =
delete;
310 ~UnseekableRep_ () =
default;
311 nonvirtual UnseekableRep_& operator= (
const UnseekableRep_&) =
delete;
314 virtual bool IsSeekable ()
const override
318 virtual void CloseWrite ()
override
321 [[maybe_unused]] lock_guard critSec{fMutex_};
322 fClosedForWrites_ =
true;
324 if constexpr (kLocking_) {
325 fMoreDataWaiter_.Set ();
327 Ensure (not IsOpenWrite ());
329 virtual bool IsOpenWrite ()
const override
331 return not fClosedForWrites_;
333 virtual void CloseRead ()
override
335 fIsOpenForRead_ =
false;
336 Ensure (not IsOpenRead ());
338 virtual bool IsOpenRead ()
const override
340 return fIsOpenForRead_;
342 virtual optional<size_t> AvailableToRead ()
override
344 Require (IsOpenRead ());
345 [[maybe_unused]] lock_guard critSec{fMutex_};
346 size_t nDefinitelyAvail = distance (fReadCursor_, fData_.cend ());
347 if (nDefinitelyAvail > 0) {
348 return nDefinitelyAvail;
350 else if (fClosedForWrites_) {
357 virtual optional<SeekOffsetType> RemainingLength ()
override
359 Require (IsOpenRead ());
362 DISABLE_COMPILER_MSC_WARNING_START (4102)
363 DISABLE_COMPILER_GCC_WARNING_START ("GCC diagnostic ignored \"-Wunused-label\"");
364 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
366 Require (not intoBuffer.empty ());
367 Require (IsOpenRead ());
368 size_t nRequested = intoBuffer.size ();
372 if (this->AvailableToRead () == nullopt) {
377 if constexpr (kLocking_) {
378 fMoreDataWaiter_.Wait ();
383 [[maybe_unused]] lock_guard critSec{fMutex_};
384 Assert ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
385 size_t nAvail = distance (fReadCursor_, fData_.cend ());
386 if (nAvail == 0 and not fClosedForWrites_) {
387 if constexpr (kLocking_) {
388 fMoreDataWaiter_.Reset ();
392 Require (blockFlag == eDontBlock);
395 size_t nCopied = min (nAvail, nRequested);
397 copy (fReadCursor_, fReadCursor_ + nCopied, intoBuffer.data ());
398 fReadCursor_ = fReadCursor_ + nCopied;
400 FreeUpSpaceIfNeeded_ ();
401 return intoBuffer.subspan (0, nCopied);
403 DISABLE_COMPILER_GCC_WARNING_END (
"GCC diagnostic ignored \"-Wunused-label\"");
404 DISABLE_COMPILER_MSC_WARNING_END (4102)
405 virtual
void Write (span<const ELEMENT_TYPE> elts)
override
407 Require (not elts.empty ());
408 Require (IsOpenWrite ());
409 [[maybe_unused]] lock_guard critSec{fMutex_};
410 size_t roomLeft = distance (fWriteCursor_, fData_.end ());
411 size_t roomRequired = elts.size ();
412 if constexpr (kLocking_) {
413 fMoreDataWaiter_.Set ();
415 if (roomLeft < roomRequired) {
416 size_t curReadOffset = distance (fData_.cbegin (), fReadCursor_);
417 size_t curWriteOffset = distance (fData_.begin (), fWriteCursor_);
418 const size_t kChunkSize_ = 128;
419 Containers::Support::ReserveTweaks::Reserve4AddN (fData_, roomRequired - roomLeft, kChunkSize_);
420 fData_.resize (curWriteOffset + roomRequired);
421 fReadCursor_ = fData_.begin () + curReadOffset;
422 fWriteCursor_ = fData_.begin () + curWriteOffset;
423 Assert (fWriteCursor_ < fData_.end ());
425 copy (elts.data (), elts.data () + roomRequired, fWriteCursor_);
426 fWriteCursor_ += roomRequired;
427 Assert (fReadCursor_ < fData_.end ());
428 Assert (fWriteCursor_ <= fData_.end ());
430 virtual void Flush ()
override
436 Require (IsOpenRead ());
437 [[maybe_unused]] lock_guard critSec{fMutex_};
438 return fSpaceClearedFromStreamHead_ + distance (fData_.begin (), fReadCursor_);
440 virtual SeekOffsetType SeekRead ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset)
override
447 Require (IsOpenWrite ());
448 [[maybe_unused]] lock_guard critSec{fMutex_};
449 return fSpaceClearedFromStreamHead_ +
450 std::distance (fData_.begin (),
static_cast<typename vector<ElementType>::const_iterator
> (fWriteCursor_));
452 virtual SeekOffsetType SeekWrite ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset)
override
458 virtual Options GetOptions ()
const override
460 return Options{.fInternallySynchronized = same_as<LOCK_IMPL, recursive_mutex>
461 ? Execution::InternallySynchronized::eInternallySynchronized
462 : Execution::InternallySynchronized::eNotKnownInternallySynchronized,
465 virtual vector<ElementType> AsVector ()
const override
470 virtual string AsString ()
const override
484 nonvirtual
void FreeUpSpaceIfNeeded_ ()
486 [[maybe_unused]] lock_guard critSec{fMutex_};
487 Assert ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
488 Assert (fReadCursor_ <= fWriteCursor_);
489 constexpr size_t kMinData2Reclaim_ = 16 * 1024;
490 size_t elts2Reclaim = distance (fData_.cbegin (), fReadCursor_);
491 if (elts2Reclaim *
sizeof (ELEMENT_TYPE) >= kMinData2Reclaim_ and IsOpenRead () and IsOpenWrite ()) [[unlikely]] {
494 fData_.erase (fData_.begin (), fData_.begin () + elts2Reclaim);
495 fSpaceClearedFromStreamHead_ += elts2Reclaim;
496 Assert (readOffset == fSpaceClearedFromStreamHead_);
497 fReadCursor_ = fData_.begin () +
static_cast<size_t> (readOffset - fSpaceClearedFromStreamHead_);
498 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (writeOffset - fSpaceClearedFromStreamHead_);
499 Assert (readOffset == GetReadOffset ());
500 Assert (writeOffset == GetWriteOffset ());
505 [[no_unique_address]]
mutable LOCK_IMPL fMutex_;
506 size_t fSpaceClearedFromStreamHead_{0};
507 [[no_unique_address]] conditional_t<kLocking_, Execution::WaitableEvent, Common::Empty> fMoreDataWaiter_{};
508 vector<ElementType> fData_;
509 typename vector<ElementType>::const_iterator fReadCursor_;
510 typename vector<ElementType>::iterator fWriteCursor_;
511 bool fClosedForWrites_{
false};
520 template <
typename ELEMENT_TYPE>
521 inline auto New (Options options) -> Ptr<ELEMENT_TYPE>
524 if (options.fSeekable) {
525 return options.fInternallySynchronized == Execution::InternallySynchronized::eInternallySynchronized
526 ? Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::SeekableRep_<ELEMENT_TYPE, recursive_mutex>> ()}
527 : Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::SeekableRep_<ELEMENT_TYPE, Execution::NullMutex>> ()};
530 return options.fInternallySynchronized == Execution::InternallySynchronized::eInternallySynchronized
531 ? Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::UnseekableRep_<ELEMENT_TYPE, recursive_mutex>> ()}
532 : Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::UnseekableRep_<ELEMENT_TYPE, Execution::NullMutex>> ()};
535 template <
typename ELEMENT_TYPE,
typename COPY_FROM>
536 inline auto New (
const COPY_FROM& copyFrom, Options options) -> Ptr<ELEMENT_TYPE>
537 requires (same_as<ELEMENT_TYPE, byte> and Common::IAnyOf<COPY_FROM, Memory::BLOB, span<const ELEMENT_TYPE>>)
539 auto p = New<ELEMENT_TYPE> (options);
549 template <
typename ELEMENT_TYPE>
550 inline Ptr<ELEMENT_TYPE>::Ptr (
const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
554 template <
typename ELEMENT_TYPE>
555 inline auto Ptr<ELEMENT_TYPE>::GetRepConstRef_ () const -> const Private_::IRep_<ELEMENT_TYPE>&
557 return *Debug::UncheckedDynamicCast<const Private_::IRep_<ELEMENT_TYPE>*> (&inherited::GetRepConstRef ());
559 template <
typename ELEMENT_TYPE>
560 inline Options Ptr<ELEMENT_TYPE>::GetOptions ()
const
562 return GetRepConstRef_ ().GetOptions ();
564 template <
typename ELEMENT_TYPE>
565 template <
typename T>
566 inline T Ptr<ELEMENT_TYPE>::As () const
567 requires (same_as<T, vector<ELEMENT_TYPE>> or (same_as<ELEMENT_TYPE,
byte> and Common::IAnyOf<T, Memory::BLOB,
string>) or
568 (same_as<ELEMENT_TYPE, Characters::Character> and same_as<T, Characters::String>))
570 using Characters::Character;
571 using Characters::String;
573 if constexpr (same_as<T, vector<ELEMENT_TYPE>>) {
574 return GetRepConstRef_ ().AsVector ();
576 else if constexpr (same_as<T, Memory::BLOB>) {
577 return GetRepConstRef_ ().AsVector ();
579 else if constexpr (same_as<T, string>) {
580 return GetRepConstRef_ ().AsString ();
582 else if constexpr (same_as<T, String>) {
583 auto tmp = GetRepConstRef_ ().AsVector ();
584 return String{span{tmp}};
589 template <
typename ELEMENT_TYPE>
590 [[deprecated (
"Since Stroika v3.0d5 - use span overload")]]
inline auto New (
const ELEMENT_TYPE* start,
const ELEMENT_TYPE* end)
593 return New (span<ELEMENT_TYPE>{start, end});
#define RequireNotReached()
@ eBlockIfNoDataAvailable
CONTAINER::value_type * End(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the end of the co...
CONTAINER::value_type * Start(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the start of the ...
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...