4#include "InternallySynchronizedOutputStream.h"
9namespace Stroika::Foundation::Streams::BufferedOutputStream {
13 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
14 class Rep_ :
public IRep_<ELEMENT_TYPE> {
16 Rep_ (
const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut)
23 IgnoreExceptionsForCall (Flush ());
29 virtual size_t GetBufferSize ()
const override
32 return fUnwrittenAppends_.capacity ();
34 virtual void SetBufferSize (
size_t bufSize)
override
37 bufSize = Math::AtLeast (bufSize, INLINE_BUF_SIZE);
38 if (bufSize < fUnwrittenAppends_.size ()) {
41 fUnwrittenAppends_.reserve (bufSize);
46 nonvirtual
void Abort ()
50 fUnwrittenAppends_.clear ();
52 virtual bool IsSeekable ()
const override
56 virtual void CloseWrite ()
override
62 Assert (fRealOut_ ==
nullptr);
63 Ensure (not IsOpenWrite ());
65 virtual bool IsOpenWrite ()
const override
67 return fRealOut_ !=
nullptr;
72 Require (IsOpenWrite ());
75 virtual SeekOffsetType SeekWrite ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset)
override
78 Require (IsOpenWrite ());
81 virtual void Flush ()
override
84 Require (IsOpenWrite ());
89 virtual void Write (span<const ELEMENT_TYPE> elts)
override
91 Require (not elts.empty ());
92 Require (not fAborted_);
93 Require (IsOpenWrite ());
100 size_t bufSpaceRemaining = fUnwrittenAppends_.capacity () - fUnwrittenAppends_.size ();
101 size_t size2WriteRemaining = elts.size ();
102 size_t copy2Buffer = min (bufSpaceRemaining, size2WriteRemaining);
103#if qStroika_Foundation_Debug_AssertionsChecked
104 size_t oldCap = fUnwrittenAppends_.capacity ();
106 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data (), elts.data () + copy2Buffer);
107#if qStroika_Foundation_Debug_AssertionsChecked
108 Assert (oldCap == fUnwrittenAppends_.capacity ());
111 Assert (size2WriteRemaining >= copy2Buffer);
112 size2WriteRemaining -= copy2Buffer;
117 if (fUnwrittenAppends_.capacity () == fUnwrittenAppends_.size ()) {
119 Assert (fUnwrittenAppends_.empty ());
121#if qStroika_Foundation_Debug_AssertionsChecked
122 Assert (oldCap == fUnwrittenAppends_.capacity ());
127 if (size2WriteRemaining == 0) {
130 else if (size2WriteRemaining < fUnwrittenAppends_.capacity ()) {
131 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data () + copy2Buffer, elts.data () + elts.size ());
134 fRealOut_.Write (elts.subspan (copy2Buffer));
139 nonvirtual
void Flush_ ()
142 fUnwrittenAppends_.clear ();
145 if (not fUnwrittenAppends_.empty ()) {
146 fRealOut_.Write (span{fUnwrittenAppends_});
147 fUnwrittenAppends_.clear ();
151 Ensure (fUnwrittenAppends_.empty ());
155 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fUnwrittenAppends_{};
156 typename OutputStream::Ptr<ELEMENT_TYPE> fRealOut_{};
157 bool fAborted_{
false};
158 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
167 template <
typename ELEMENT_TYPE>
170 if (bufferSize and *bufferSize == 0) {
171 return Ptr<ELEMENT_TYPE>{make_shared<Private_::Rep_<ELEMENT_TYPE, 0>> (realOut)};
173 else if (bufferSize and *bufferSize <= 4 * 1024) {
174 return Ptr<ELEMENT_TYPE>{make_shared<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> (realOut)};
179 p.SetBufferSize (*bufferSize);
184 template <
typename ELEMENT_TYPE>
188 switch (internallySynchronized) {
189 case Execution::eInternallySynchronized: {
190 if (bufferSize and *bufferSize == 0) {
191 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 0>> ({}, realOut)};
193 else if (bufferSize and *bufferSize <= 4 * 1024) {
194 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> ({}, realOut)};
197 Ptr<ELEMENT_TYPE> p{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE>> ({}, realOut)};
199 p.SetBufferSize (*bufferSize);
204 case Execution::eNotKnownInternallySynchronized:
205 return New<ELEMENT_TYPE> (realOut, bufferSize);
217 template <
typename ELEMENT_TYPE>
222 template <
typename ELEMENT_TYPE>
223 inline size_t Ptr<ELEMENT_TYPE>::GetBufferSize ()
const
225 auto rep = this->GetSharedRep_ ();
226 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
228 return r->GetBufferSize ();
230 template <
typename ELEMENT_TYPE>
231 inline void Ptr<ELEMENT_TYPE>::SetBufferSize (
size_t bufSize)
233 auto rep = this->GetSharedRep_ ();
234 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
236 r->SetBufferSize (bufSize);
238 template <
typename ELEMENT_TYPE>
241 auto rep = this->GetSharedRep_ ();
242 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
246 template <
typename ELEMENT_TYPE>
249 return Debug::UncheckedDynamicPointerCast<Private_::IRep_<ELEMENT_TYPE>> (inherited::GetSharedRep ());
#define RequireNotReached()
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
OutputStream<>::Ptr is Smart pointer to a stream-based sink of data.
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
void Abort(const Traversal::Iterable< Ptr > &threads)
foreach Thread t: t.Abort ()