7#include "Stroika/Foundation/Containers/Support/ReserveTweaks.h"
9#include "Stroika/Foundation/Execution/NullMutex.h"
10#include "Stroika/Foundation/Execution/WaitableEvent.h"
12namespace Stroika::Foundation::Streams::SharedMemoryStream {
15 template <
typename ELEMENT_TYPE>
16 class IRep_ :
public InputOutputStream::IRep<ELEMENT_TYPE> {
18 virtual Options GetOptions ()
const = 0;
19 virtual vector<ELEMENT_TYPE> AsVector ()
const = 0;
20 virtual string AsString ()
const = 0;
22 template <
typename ELEMENT_TYPE,
typename LOCK_IMPL>
23 class SeekableRep_ :
public IRep_<ELEMENT_TYPE> {
25 using ElementType = ELEMENT_TYPE;
28 bool fIsOpenForRead_{
true};
31 static constexpr bool kLocking_ = same_as<LOCK_IMPL, recursive_mutex>;
35 : fReadCursor_{fData_.begin ()}
36 , fWriteCursor_{fData_.begin ()}
39 SeekableRep_ (
const SeekableRep_&) =
delete;
40 ~SeekableRep_ () =
default;
41 nonvirtual SeekableRep_& operator= (
const SeekableRep_&) =
delete;
44 virtual bool IsSeekable ()
const override
48 virtual void CloseWrite ()
override
51 [[maybe_unused]] lock_guard critSec{fMutex_};
52 fClosedForWrites_ =
true;
54 if constexpr (kLocking_) {
55 fMoreDataWaiter_.Set ();
57 Ensure (not IsOpenWrite ());
59 virtual bool IsOpenWrite ()
const override
61 return not fClosedForWrites_;
63 virtual void CloseRead ()
override
65 fIsOpenForRead_ =
false;
66 Ensure (not IsOpenRead ());
68 virtual bool IsOpenRead ()
const override
70 return fIsOpenForRead_;
72 virtual optional<size_t> AvailableToRead ()
override
74 Require (IsOpenRead ());
75 [[maybe_unused]] lock_guard critSec{fMutex_};
76 size_t nDefinitelyAvail = distance (fReadCursor_, fData_.cend ());
77 if (nDefinitelyAvail > 0) {
78 return nDefinitelyAvail;
80 else if (fClosedForWrites_) {
87 virtual optional<SeekOffsetType> RemainingLength ()
override
89 Require (IsOpenRead ());
92 DISABLE_COMPILER_GCC_WARNING_START (
"GCC diagnostic ignored \"-Wunused-label\"");
93 DISABLE_COMPILER_MSC_WARNING_START (4102)
94 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
96 Require (not intoBuffer.empty ());
97 Require (IsOpenRead ());
98 size_t nRequested = intoBuffer.size ();
102 if (this->AvailableToRead () == nullopt) {
107 if constexpr (kLocking_) {
108 fMoreDataWaiter_.Wait ();
113 [[maybe_unused]] lock_guard critSec{fMutex_};
114 Assert ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
115 size_t nAvail = distance (fReadCursor_, fData_.cend ());
116 if (nAvail == 0 and not fClosedForWrites_) {
117 if constexpr (kLocking_) {
118 fMoreDataWaiter_.Reset ();
122 Require (blockFlag == eDontBlock);
125 size_t nCopied = min (nAvail, nRequested);
127 copy (fReadCursor_, fReadCursor_ + nCopied, intoBuffer.data ());
128 fReadCursor_ = fReadCursor_ + nCopied;
130 return intoBuffer.subspan (0, nCopied);
132 DISABLE_COMPILER_MSC_WARNING_END (4102)
133 DISABLE_COMPILER_GCC_WARNING_END ("GCC diagnostic ignored \"-Wunused-label\"");
134 virtual
void Write (span<const ELEMENT_TYPE> elts)
override
136 Require (not elts.empty ());
137 Require (IsOpenWrite ());
138 [[maybe_unused]] lock_guard critSec{fMutex_};
139 size_t roomLeft = distance (fWriteCursor_, fData_.end ());
140 size_t roomRequired = elts.size ();
141 if constexpr (kLocking_) {
142 fMoreDataWaiter_.Set ();
144 if (roomLeft < roomRequired) {
145 size_t curReadOffset = distance (fData_.cbegin (), fReadCursor_);
146 size_t curWriteOffset = distance (fData_.begin (), fWriteCursor_);
147 const size_t kChunkSize_ = 128;
148 Containers::Support::ReserveTweaks::Reserve4AddN (fData_, roomRequired - roomLeft, kChunkSize_);
149 fData_.resize (curWriteOffset + roomRequired);
150 fReadCursor_ = fData_.begin () + curReadOffset;
151 fWriteCursor_ = fData_.begin () + curWriteOffset;
152 Assert (fWriteCursor_ < fData_.end ());
154 copy (elts.data (), elts.data () + roomRequired, fWriteCursor_);
155 fWriteCursor_ += roomRequired;
156 Assert (fReadCursor_ < fData_.end ());
157 Assert (fWriteCursor_ <= fData_.end ());
159 virtual void Flush ()
override
165 Require (IsOpenRead ());
166 [[maybe_unused]] lock_guard critSec{fMutex_};
167 return distance (fData_.begin (), fReadCursor_);
169 virtual SeekOffsetType SeekRead (Whence whence, SignedSeekOffsetType offset)
override
171 Require (IsOpenRead ());
172 [[maybe_unused]] lock_guard critSec{fMutex_};
173 if constexpr (kLocking_) {
174 fMoreDataWaiter_.Set ();
178 if (offset < 0) [[unlikely]] {
182 if (uOffset > fData_.size ()) [[unlikely]] {
185 fReadCursor_ = fData_.begin () +
static_cast<size_t> (uOffset);
188 Streams::SeekOffsetType curOffset = distance (fData_.cbegin (), fReadCursor_);
189 Streams::SignedSeekOffsetType newOffset = curOffset + offset;
190 if (newOffset < 0) [[unlikely]] {
194 if (uNewOffset > fData_.size ()) [[unlikely]] {
197 fReadCursor_ = fData_.begin () +
static_cast<size_t> (uNewOffset);
200 Streams::SignedSeekOffsetType newOffset = fData_.size () + offset;
201 if (newOffset < 0) [[unlikely]] {
205 if (uNewOffset > fData_.size ()) [[unlikely]] {
208 fReadCursor_ = fData_.begin () +
static_cast<size_t> (uNewOffset);
211 Ensure ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
212 return distance (fData_.cbegin (), fReadCursor_);
216 Require (IsOpenWrite ());
217 [[maybe_unused]] lock_guard critSec{fMutex_};
218 return distance (fData_.begin (),
static_cast<typename vector<ElementType>::const_iterator
> (fWriteCursor_));
220 virtual SeekOffsetType SeekWrite (Whence whence, SignedSeekOffsetType offset)
override
222 Require (IsOpenWrite ());
223 [[maybe_unused]] lock_guard critSec{fMutex_};
224 if constexpr (kLocking_) {
225 fMoreDataWaiter_.Set ();
229 if (offset < 0) [[unlikely]] {
232 if (
static_cast<SeekOffsetType> (offset) > fData_.size ()) [[unlikely]] {
235 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (offset);
238 Streams::SeekOffsetType curOffset = distance (fData_.begin (), fWriteCursor_);
239 Streams::SignedSeekOffsetType newOffset = curOffset + offset;
240 if (newOffset < 0) [[unlikely]] {
243 if (
static_cast<size_t> (newOffset) > fData_.size ()) [[unlikely]] {
246 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (newOffset);
249 Streams::SignedSeekOffsetType newOffset = fData_.size () + offset;
250 if (newOffset < 0) [[unlikely]] {
253 if (
static_cast<size_t> (newOffset) > fData_.size ()) [[unlikely]] {
256 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (newOffset);
259 Ensure ((fData_.begin () <= fWriteCursor_) and (fWriteCursor_ <= fData_.end ()));
260 return distance (fData_.begin (), fWriteCursor_);
263 virtual Options GetOptions ()
const override
265 return Options{.fInternallySynchronized = same_as<LOCK_IMPL, recursive_mutex>
266 ? Execution::InternallySynchronized::eInternallySynchronized
267 : Execution::InternallySynchronized::eNotKnownInternallySynchronized,
270 virtual vector<ElementType> AsVector ()
const override
272 [[maybe_unused]] lock_guard critSec{fMutex_};
275 virtual string AsString ()
const override
277 [[maybe_unused]] lock_guard critSec{fMutex_};
282 static inline const auto kSeekException_ = range_error{
"seek"};
283 mutable LOCK_IMPL fMutex_;
284 [[no_unique_address]] conditional_t<kLocking_, Execution::WaitableEvent, Common::Empty> fMoreDataWaiter_{};
285 vector<ElementType> fData_;
286 typename vector<ElementType>::const_iterator fReadCursor_;
287 typename vector<ElementType>::iterator fWriteCursor_;
288 bool fClosedForWrites_{
false};
291 template <
typename ELEMENT_TYPE,
typename LOCK_IMPL>
292 class UnseekableRep_ :
public IRep_<ELEMENT_TYPE> {
294 using ElementType = ELEMENT_TYPE;
297 bool fIsOpenForRead_{
true};
300 static constexpr bool kLocking_ = same_as<LOCK_IMPL, recursive_mutex>;
304 : fReadCursor_{fData_.begin ()}
305 , fWriteCursor_{fData_.begin ()}
308 UnseekableRep_ (
const UnseekableRep_&) =
delete;
309 ~UnseekableRep_ () =
default;
310 nonvirtual UnseekableRep_& operator= (
const UnseekableRep_&) =
delete;
313 virtual bool IsSeekable ()
const override
317 virtual void CloseWrite ()
override
320 [[maybe_unused]] lock_guard critSec{fMutex_};
321 fClosedForWrites_ =
true;
323 if constexpr (kLocking_) {
324 fMoreDataWaiter_.Set ();
326 Ensure (not IsOpenWrite ());
328 virtual bool IsOpenWrite ()
const override
330 return not fClosedForWrites_;
332 virtual void CloseRead ()
override
334 fIsOpenForRead_ =
false;
335 Ensure (not IsOpenRead ());
337 virtual bool IsOpenRead ()
const override
339 return fIsOpenForRead_;
341 virtual optional<size_t> AvailableToRead ()
override
343 Require (IsOpenRead ());
344 [[maybe_unused]] lock_guard critSec{fMutex_};
345 size_t nDefinitelyAvail = distance (fReadCursor_, fData_.cend ());
346 if (nDefinitelyAvail > 0) {
347 return nDefinitelyAvail;
349 else if (fClosedForWrites_) {
356 virtual optional<SeekOffsetType> RemainingLength ()
override
358 Require (IsOpenRead ());
361 DISABLE_COMPILER_MSC_WARNING_START (4102)
362 DISABLE_COMPILER_GCC_WARNING_START ("GCC diagnostic ignored \"-Wunused-label\"");
363 virtual optional<span<ELEMENT_TYPE>> Read (span<ELEMENT_TYPE> intoBuffer, NoDataAvailableHandling blockFlag)
override
365 Require (not intoBuffer.empty ());
366 Require (IsOpenRead ());
367 size_t nRequested = intoBuffer.size ();
371 if (this->AvailableToRead () == nullopt) {
376 if constexpr (kLocking_) {
377 fMoreDataWaiter_.Wait ();
382 [[maybe_unused]] lock_guard critSec{fMutex_};
383 Assert ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
384 size_t nAvail = distance (fReadCursor_, fData_.cend ());
385 if (nAvail == 0 and not fClosedForWrites_) {
386 if constexpr (kLocking_) {
387 fMoreDataWaiter_.Reset ();
391 Require (blockFlag == eDontBlock);
394 size_t nCopied = min (nAvail, nRequested);
396 copy (fReadCursor_, fReadCursor_ + nCopied, intoBuffer.data ());
397 fReadCursor_ = fReadCursor_ + nCopied;
399 FreeUpSpaceIfNeeded_ ();
400 return intoBuffer.subspan (0, nCopied);
402 DISABLE_COMPILER_GCC_WARNING_END (
"GCC diagnostic ignored \"-Wunused-label\"");
403 DISABLE_COMPILER_MSC_WARNING_END (4102)
404 virtual
void Write (span<const ELEMENT_TYPE> elts)
override
406 Require (not elts.empty ());
407 Require (IsOpenWrite ());
408 [[maybe_unused]] lock_guard critSec{fMutex_};
409 size_t roomLeft = distance (fWriteCursor_, fData_.end ());
410 size_t roomRequired = elts.size ();
411 if constexpr (kLocking_) {
412 fMoreDataWaiter_.Set ();
414 if (roomLeft < roomRequired) {
415 size_t curReadOffset = distance (fData_.cbegin (), fReadCursor_);
416 size_t curWriteOffset = distance (fData_.begin (), fWriteCursor_);
417 const size_t kChunkSize_ = 128;
418 Containers::Support::ReserveTweaks::Reserve4AddN (fData_, roomRequired - roomLeft, kChunkSize_);
419 fData_.resize (curWriteOffset + roomRequired);
420 fReadCursor_ = fData_.begin () + curReadOffset;
421 fWriteCursor_ = fData_.begin () + curWriteOffset;
422 Assert (fWriteCursor_ < fData_.end ());
424 copy (elts.data (), elts.data () + roomRequired, fWriteCursor_);
425 fWriteCursor_ += roomRequired;
426 Assert (fReadCursor_ < fData_.end ());
427 Assert (fWriteCursor_ <= fData_.end ());
429 virtual void Flush ()
override
435 Require (IsOpenRead ());
436 [[maybe_unused]] lock_guard critSec{fMutex_};
437 return fSpaceClearedFromStreamHead_ + distance (fData_.begin (), fReadCursor_);
439 virtual SeekOffsetType SeekRead ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset)
override
446 Require (IsOpenWrite ());
447 [[maybe_unused]] lock_guard critSec{fMutex_};
448 return fSpaceClearedFromStreamHead_ +
449 std::distance (fData_.begin (),
static_cast<typename vector<ElementType>::const_iterator
> (fWriteCursor_));
451 virtual SeekOffsetType SeekWrite ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset)
override
457 virtual Options GetOptions ()
const override
459 return Options{.fInternallySynchronized = same_as<LOCK_IMPL, recursive_mutex>
460 ? Execution::InternallySynchronized::eInternallySynchronized
461 : Execution::InternallySynchronized::eNotKnownInternallySynchronized,
464 virtual vector<ElementType> AsVector ()
const override
469 virtual string AsString ()
const override
483 nonvirtual
void FreeUpSpaceIfNeeded_ ()
485 [[maybe_unused]] lock_guard critSec{fMutex_};
486 Assert ((fData_.begin () <= fReadCursor_) and (fReadCursor_ <= fData_.end ()));
487 Assert (fReadCursor_ <= fWriteCursor_);
488 constexpr size_t kMinData2Reclaim_ = 16 * 1024;
489 size_t elts2Reclaim = distance (fData_.cbegin (), fReadCursor_);
490 if (elts2Reclaim *
sizeof (ELEMENT_TYPE) >= kMinData2Reclaim_ and IsOpenRead () and IsOpenWrite ()) [[unlikely]] {
493 fData_.erase (fData_.begin (), fData_.begin () + elts2Reclaim);
494 fSpaceClearedFromStreamHead_ += elts2Reclaim;
495 Assert (readOffset == fSpaceClearedFromStreamHead_);
496 fReadCursor_ = fData_.begin () +
static_cast<size_t> (readOffset - fSpaceClearedFromStreamHead_);
497 fWriteCursor_ = fData_.begin () +
static_cast<size_t> (writeOffset - fSpaceClearedFromStreamHead_);
498 Assert (readOffset == GetReadOffset ());
499 Assert (writeOffset == GetWriteOffset ());
504 [[no_unique_address]]
mutable LOCK_IMPL fMutex_;
505 size_t fSpaceClearedFromStreamHead_{0};
506 [[no_unique_address]] conditional_t<kLocking_, Execution::WaitableEvent, Common::Empty> fMoreDataWaiter_{};
507 vector<ElementType> fData_;
508 typename vector<ElementType>::const_iterator fReadCursor_;
509 typename vector<ElementType>::iterator fWriteCursor_;
510 bool fClosedForWrites_{
false};
519 template <
typename ELEMENT_TYPE>
520 inline auto New (Options options) -> Ptr<ELEMENT_TYPE>
523 if (options.fSeekable) {
524 return options.fInternallySynchronized == Execution::InternallySynchronized::eInternallySynchronized
525 ? Ptr<ELEMENT_TYPE>{make_shared<Private_::SeekableRep_<ELEMENT_TYPE, recursive_mutex>> ()}
526 : Ptr<ELEMENT_TYPE>{make_shared<Private_::SeekableRep_<ELEMENT_TYPE, Execution::NullMutex>> ()};
529 return options.fInternallySynchronized == Execution::InternallySynchronized::eInternallySynchronized
530 ? Ptr<ELEMENT_TYPE>{make_shared<Private_::UnseekableRep_<ELEMENT_TYPE, recursive_mutex>> ()}
531 : Ptr<ELEMENT_TYPE>{make_shared<Private_::UnseekableRep_<ELEMENT_TYPE, Execution::NullMutex>> ()};
534 template <
typename ELEMENT_TYPE,
typename COPY_FROM>
535 inline auto New (
const COPY_FROM& copyFrom, Options options) -> Ptr<ELEMENT_TYPE>
536 requires (same_as<ELEMENT_TYPE, byte> and Common::IAnyOf<COPY_FROM, Memory::BLOB, span<const ELEMENT_TYPE>>)
538 auto p = New<ELEMENT_TYPE> (options);
548 template <
typename ELEMENT_TYPE>
549 inline Ptr<ELEMENT_TYPE>::Ptr (
const shared_ptr<Private_::IRep_<ELEMENT_TYPE>>& from)
553 template <
typename ELEMENT_TYPE>
554 inline auto Ptr<ELEMENT_TYPE>::GetRepConstRef_ () const -> const Private_::IRep_<ELEMENT_TYPE>&
556 return *Debug::UncheckedDynamicCast<const Private_::IRep_<ELEMENT_TYPE>*> (&inherited::GetRepConstRef ());
558 template <
typename ELEMENT_TYPE>
559 inline Options Ptr<ELEMENT_TYPE>::GetOptions ()
const
561 return GetRepConstRef_ ().GetOptions ();
563 template <
typename ELEMENT_TYPE>
564 template <
typename T>
565 inline T Ptr<ELEMENT_TYPE>::As () const
566 requires (same_as<T, vector<ELEMENT_TYPE>> or (same_as<ELEMENT_TYPE,
byte> and Common::IAnyOf<T, Memory::BLOB,
string>) or
567 (same_as<ELEMENT_TYPE, Characters::Character> and same_as<T, Characters::String>))
569 using Characters::Character;
570 using Characters::String;
572 if constexpr (same_as<T, vector<ELEMENT_TYPE>>) {
573 return GetRepConstRef_ ().AsVector ();
575 else if constexpr (same_as<T, Memory::BLOB>) {
576 return GetRepConstRef_ ().AsVector ();
578 else if constexpr (same_as<T, string>) {
579 return GetRepConstRef_ ().AsString ();
581 else if constexpr (same_as<T, String>) {
582 auto tmp = GetRepConstRef_ ().AsVector ();
583 return String{span{tmp}};
588 template <
typename ELEMENT_TYPE>
589 [[deprecated (
"Since Stroika v3.0d5 - use span overload")]]
inline auto New (
const ELEMENT_TYPE* start,
const ELEMENT_TYPE* end)
592 return New (span<ELEMENT_TYPE>{start, end});
#define RequireNotReached()
@ eBlockIfNoDataAvailable
CONTAINER::value_type * End(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the end of the co...
CONTAINER::value_type * Start(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the start of the ...
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...