4#include "InternallySynchronizedOutputStream.h"
10namespace Stroika::Foundation::Streams::BufferedOutputStream {
14 template <
typename ELEMENT_TYPE,
size_t INLINE_BUF_SIZE>
15 class Rep_ :
public IRep_<ELEMENT_TYPE> {
17 Rep_ (
const typename OutputStream::Ptr<ELEMENT_TYPE>& realOut)
24 IgnoreExceptionsForCall (Flush ());
30 virtual size_t GetBufferSize ()
const override
33 return fUnwrittenAppends_.capacity ();
35 virtual void SetBufferSize (
size_t bufSize)
override
38 bufSize = Math::AtLeast (bufSize, INLINE_BUF_SIZE);
39 if (bufSize < fUnwrittenAppends_.size ()) {
42 fUnwrittenAppends_.reserve (bufSize);
47 nonvirtual
void Abort ()
51 fUnwrittenAppends_.clear ();
53 virtual bool IsSeekable ()
const override
57 virtual void CloseWrite ()
override
63 Assert (fRealOut_ ==
nullptr);
64 Ensure (not IsOpenWrite ());
66 virtual bool IsOpenWrite ()
const override
68 return fRealOut_ !=
nullptr;
73 Require (IsOpenWrite ());
76 virtual SeekOffsetType SeekWrite ([[maybe_unused]] Whence whence, [[maybe_unused]] SignedSeekOffsetType offset)
override
79 Require (IsOpenWrite ());
82 virtual void Flush ()
override
85 Require (IsOpenWrite ());
90 virtual void Write (span<const ELEMENT_TYPE> elts)
override
92 Require (not elts.empty ());
93 Require (not fAborted_);
94 Require (IsOpenWrite ());
101 size_t bufSpaceRemaining = fUnwrittenAppends_.capacity () - fUnwrittenAppends_.size ();
102 size_t size2WriteRemaining = elts.size ();
103 size_t copy2Buffer = min (bufSpaceRemaining, size2WriteRemaining);
104#if qStroika_Foundation_Debug_AssertionsChecked
105 size_t oldCap = fUnwrittenAppends_.capacity ();
107 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data (), elts.data () + copy2Buffer);
108#if qStroika_Foundation_Debug_AssertionsChecked
109 Assert (oldCap == fUnwrittenAppends_.capacity ());
112 Assert (size2WriteRemaining >= copy2Buffer);
113 size2WriteRemaining -= copy2Buffer;
118 if (fUnwrittenAppends_.capacity () == fUnwrittenAppends_.size ()) {
120 Assert (fUnwrittenAppends_.empty ());
122#if qStroika_Foundation_Debug_AssertionsChecked
123 Assert (oldCap == fUnwrittenAppends_.capacity ());
128 if (size2WriteRemaining == 0) {
131 else if (size2WriteRemaining < fUnwrittenAppends_.capacity ()) {
132 fUnwrittenAppends_.insert (fUnwrittenAppends_.end (), elts.data () + copy2Buffer, elts.data () + elts.size ());
135 fRealOut_.Write (elts.subspan (copy2Buffer));
140 nonvirtual
void Flush_ ()
143 fUnwrittenAppends_.clear ();
146 if (not fUnwrittenAppends_.empty ()) {
147 fRealOut_.Write (span{fUnwrittenAppends_});
148 fUnwrittenAppends_.clear ();
152 Ensure (fUnwrittenAppends_.empty ());
156 Memory::InlineBuffer<ELEMENT_TYPE, INLINE_BUF_SIZE> fUnwrittenAppends_{};
157 typename OutputStream::Ptr<ELEMENT_TYPE> fRealOut_{};
158 bool fAborted_{
false};
159 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
168 template <
typename ELEMENT_TYPE>
171 if (bufferSize and *bufferSize == 0) {
172 return Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::Rep_<ELEMENT_TYPE, 0>> (realOut)};
174 else if (bufferSize and *bufferSize <= 4 * 1024) {
175 return Ptr<ELEMENT_TYPE>{Memory::MakeSharedPtr<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> (realOut)};
178 Ptr<ELEMENT_TYPE> p{Memory::MakeSharedPtr<Private_::Rep_<ELEMENT_TYPE>> (realOut)};
180 p.SetBufferSize (*bufferSize);
185 template <
typename ELEMENT_TYPE>
189 switch (internallySynchronized) {
190 case Execution::eInternallySynchronized: {
191 if (bufferSize and *bufferSize == 0) {
192 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 0>> ({}, realOut)};
194 else if (bufferSize and *bufferSize <= 4 * 1024) {
195 return Ptr<ELEMENT_TYPE>{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE, 4 * 1024>> ({}, realOut)};
198 Ptr<ELEMENT_TYPE> p{InternallySynchronizedOutputStream::New<Private_::Rep_<ELEMENT_TYPE>> ({}, realOut)};
200 p.SetBufferSize (*bufferSize);
205 case Execution::eNotKnownInternallySynchronized:
206 return New<ELEMENT_TYPE> (realOut, bufferSize);
218 template <
typename ELEMENT_TYPE>
223 template <
typename ELEMENT_TYPE>
224 inline size_t Ptr<ELEMENT_TYPE>::GetBufferSize ()
const
226 auto rep = this->GetSharedRep_ ();
227 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
229 return r->GetBufferSize ();
231 template <
typename ELEMENT_TYPE>
232 inline void Ptr<ELEMENT_TYPE>::SetBufferSize (
size_t bufSize)
234 auto rep = this->GetSharedRep_ ();
235 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
237 r->SetBufferSize (bufSize);
239 template <
typename ELEMENT_TYPE>
242 auto rep = this->GetSharedRep_ ();
243 Private_::Rep_<ELEMENT_TYPE>* r = Debug::UncheckedDynamicCast<Private_::Rep_<ELEMENT_TYPE>*> (rep.get ());
247 template <
typename ELEMENT_TYPE>
250 return Debug::UncheckedDynamicPointerCast<Private_::IRep_<ELEMENT_TYPE>> (inherited::GetSharedRep ());
#define RequireNotReached()
#define WeakAssert(c)
A WeakAssert() is for things that aren't guaranteed to be true, but are overwhelmingly likely to be t...
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
OutputStream<>::Ptr is Smart pointer to a stream-based sink of data.
A Streams::Ptr<ELEMENT_TYPE> is a smart-pointer to a stream of elements of type T.
void Abort(const Traversal::Iterable< Ptr > &threads)
foreach Thread t: t.Abort ()