4#include "Stroika/Foundation/StroikaPreComp.h"
8#include "Stroika/Foundation/Execution/FeatureNotSupportedException.h"
12#if qStroika_HasComponent_zstd
24using namespace Stroika::Foundation::Debug;
25using namespace Stroika::Foundation::Streams;
30#if !qStroika_HasComponent_zstd
36#if qStroika_HasComponent_zstd
38 inline void ThrowIfZStdErr_ (
size_t rc)
40 if (ZSTD_isError (rc)) {
45 constexpr size_t kSmallSoBlockAllocWorksWellNotInlineAnyhow_ = 1;
49 CompressingByteStreamRep_ () =
delete;
50 CompressingByteStreamRep_ (
const CompressingByteStreamRep_&) =
delete;
52 : fInStreamReader_{make_unique<Streams::
StreamReader<byte>> (in)}
54 Require (not o.fCompressionLevel.has_value () or (0 <= o.fCompressionLevel and o.fCompressionLevel <= 1));
57 if (o.fCompressionLevel.has_value ()) {
59 ThrowIfZStdErr_ (::ZSTD_CCtx_setParameter (fCtx_, ZSTD_c_compressionLevel,
60 static_cast<int> ((::ZSTD_maxCLevel () - 1 + 1) * (*o.fCompressionLevel)) + 1));
63 ThrowIfZStdErr_ (::ZSTD_CCtx_setParameter (fCtx_, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT));
66 ThrowIfZStdErr_ (::ZSTD_CCtx_setParameter (fCtx_, ZSTD_c_checksumFlag, 1));
68 virtual ~CompressingByteStreamRep_ ()
70 if (fCtx_ !=
nullptr) {
71 ::ZSTD_freeCCtx (fCtx_);
82 fInStreamReader_ =
nullptr;
83 Ensure (not IsOpenRead ());
88 return fInStreamReader_ !=
nullptr;
92 Require (IsOpenRead ());
101 if (fOutputBufCache_.size () != 0) {
102 return fOutputBufCache_.size ();
104 FillOutputBufCache_ (NoDataAvailableHandling::eDontBlock);
105 if (fOutputBufCache_.empty ()) {
106 if (
auto ob = fInStreamReader_->IsAtEOF (NoDataAvailableHandling::eDontBlock); ob and *ob) {
119 Require (not intoBuffer.empty ());
128 if (not fOutputBufCache_.empty ()) {
129 size_t nToCopy = min (intoBuffer.size (), fOutputBufCache_.size ());
130 auto r = Memory::CopySpanData (fOutputBufCache_.subspan (0, nToCopy), intoBuffer);
131 fOutputBufCache_ = fOutputBufCache_.subspan (nToCopy);
132 fSeekOffset_ += nToCopy;
135 FillOutputBufCache_ (blockFlag);
136 if (fOutputBufCache_.empty ()) {
138 return fStage_ == Stage_::eDone ? span<byte>{} : optional<span<byte>>{};
144 unique_ptr<Streams::StreamReader<byte>> fInStreamReader_;
146 span<byte> fRawUnprocessedInputBytes_{};
148 Memory::eUninitialized, ::ZSTD_CStreamOutSize ()};
149 span<byte> fOutputBufCache_{};
150 ZSTD_CCtx* fCtx_{
nullptr};
156 Stage_ fStage_{Stage_::eReadingInput};
161 struct CompressResult_ {
162 size_t fConsumedInputBytes_{};
163 size_t fProducedOutputBytes_{};
166 static CompressResult_ DoZStd_Compress_ (ZSTD_CCtx* ctx, span<const byte> fromBytes, ZSTD_EndDirective endFlag, span<byte> intoCompressedBytes)
168 ZSTD_inBuffer input = {fromBytes.data (), fromBytes.size (), 0};
169 ZSTD_outBuffer output = {intoCompressedBytes.data (), intoCompressedBytes.size (), 0};
170 size_t const remaining = ::ZSTD_compressStream2 (ctx, &output, &input, endFlag);
171 ThrowIfZStdErr_ (remaining);
172 return CompressResult_{.fConsumedInputBytes_ = input.pos, .fProducedOutputBytes_ = output.pos, .fRemaining = remaining};
178 Require (fOutputBufCache_.empty ());
185 case Stage_::eReadingInput: {
189 if (fRawUnprocessedInputBytes_.size () < fInputBuf_.size ()) {
190 if (optional<span<byte>> n = fInStreamReader_->Read (span{fInputBuf_}.subspan (fRawUnprocessedInputBytes_.size ()), blockFlag);
191 n and not n->empty ()) {
192 fRawUnprocessedInputBytes_ = span{fInputBuf_}.first (fRawUnprocessedInputBytes_.size () + n->size ());
196 Assert (fOutputBufCache_.empty ());
197 if (fRawUnprocessedInputBytes_.empty ()) {
199 if (optional<bool> isAtEOF = fInStreamReader_->IsAtEOF (blockFlag); isAtEOF and *isAtEOF) {
200 fStage_ = Stage_::eEndOutput;
204 Assert (blockFlag == NoDataAvailableHandling::eDontBlock);
210 CompressResult_ compressResults = DoZStd_Compress_ (fCtx_, fRawUnprocessedInputBytes_, ZSTD_e_continue, span{fOutBuf_});
211 fRawUnprocessedInputBytes_ = fRawUnprocessedInputBytes_.subspan (compressResults.fConsumedInputBytes_);
214 if (compressResults.fProducedOutputBytes_ > 0) {
216 fOutputBufCache_ = span{fOutBuf_}.subspan (0, compressResults.fProducedOutputBytes_);
220 Assert (compressResults.fConsumedInputBytes_ > 0);
228 case Stage_::eEndOutput: {
230 Assert (fOutputBufCache_.empty ());
231 Assert (fRawUnprocessedInputBytes_.empty ());
233 CompressResult_ compressResults = DoZStd_Compress_ (fCtx_, span<const byte>{}, ZSTD_e_end, span{fOutBuf_});
234 fOutputBufCache_ = span{fOutBuf_}.subspan (0, compressResults.fProducedOutputBytes_);
235 if (compressResults.fRemaining == 0) {
236 fStage_ = Stage_::eDone;
240 case Stage_::eDone: {
250 unique_ptr<Streams::StreamReader<byte>> fInStreamReader_;
252 span<byte> fRawUnprocessedInputBytes_{};
254 Memory::eUninitialized, ::ZSTD_DStreamOutSize ()};
255 span<byte> fOutputBufCache_{};
256 ZSTD_DCtx* fCtx_{
nullptr};
261 DecompressingByteStreamRep_ () =
delete;
262 DecompressingByteStreamRep_ (
const DecompressingByteStreamRep_&) =
delete;
264 : fInStreamReader_{make_unique<Streams::
StreamReader<byte>> (in)}
268 virtual ~DecompressingByteStreamRep_ ()
270 if (fCtx_ !=
nullptr) {
271 ::ZSTD_freeDCtx (fCtx_);
282 fInStreamReader_ =
nullptr;
283 Ensure (not IsOpenRead ());
288 return fInStreamReader_ !=
nullptr;
292 Require (IsOpenRead ());
301 if (fOutputBufCache_.size () != 0) {
302 return fOutputBufCache_.size ();
304 FillOutputBufCache_ (NoDataAvailableHandling::eDontBlock);
305 if (fOutputBufCache_.empty ()) {
306 if (
auto ob = fInStreamReader_->IsAtEOF (NoDataAvailableHandling::eDontBlock); ob and *ob) {
319 Require (not intoBuffer.empty ());
328 if (not fOutputBufCache_.empty ()) {
329 size_t nToCopy = min (intoBuffer.size (), fOutputBufCache_.size ());
330 auto r = Memory::CopySpanData (fOutputBufCache_.subspan (0, nToCopy), intoBuffer);
331 fOutputBufCache_ = fOutputBufCache_.subspan (nToCopy);
332 fSeekOffset_ += nToCopy;
335 FillOutputBufCache_ (blockFlag);
336 if (fOutputBufCache_.empty ()) {
338 return blockFlag == NoDataAvailableHandling::eBlockIfNoDataAvailable ? span<byte>{} : optional<span<byte>>{};
344 struct DecompressResult_ {
345 size_t fConsumedInputBytes_{};
346 size_t fProducedOutputBytes_{};
349 static DecompressResult_ DoZStd_Decompress_ (ZSTD_DCtx* ctx, span<const byte> fromBytes, span<byte> intoDecompressedBytes)
351 ZSTD_inBuffer input = {fromBytes.data (), fromBytes.size (), 0};
352 ZSTD_outBuffer output = {intoDecompressedBytes.data (), intoDecompressedBytes.size (), 0};
353 size_t const remaining = ::ZSTD_decompressStream (ctx, &output, &input);
354 ThrowIfZStdErr_ (remaining);
355 return DecompressResult_{.fConsumedInputBytes_ = input.pos, .fProducedOutputBytes_ = output.pos, .fRemaining = remaining};
362 if (fRawUnprocessedInputBytes_.size () < fInputBuf_.size ()) {
363 if (optional<span<byte>> n = fInStreamReader_->Read (span{fInputBuf_}.subspan (fRawUnprocessedInputBytes_.size ()), blockFlag);
364 n and not n->empty ()) {
365 fRawUnprocessedInputBytes_ = span{fInputBuf_}.first (fRawUnprocessedInputBytes_.size () + n->size ());
369 Assert (fOutputBufCache_.empty ());
370 if (fRawUnprocessedInputBytes_.empty ()) {
375 DecompressResult_ decompressResults = DoZStd_Decompress_ (fCtx_, fRawUnprocessedInputBytes_, span{fOutBuf_});
376 fRawUnprocessedInputBytes_ = fRawUnprocessedInputBytes_.subspan (decompressResults.fConsumedInputBytes_);
379 if (decompressResults.fProducedOutputBytes_ > 0) {
380 fOutputBufCache_ = span{fOutBuf_}.subspan (0, decompressResults.fProducedOutputBytes_);
384 Assert (decompressResults.fConsumedInputBytes_ > 0);
395#if qStroika_HasComponent_zstd
398 shared_ptr<CompressingByteStreamRep_> fDelegate2;
405 fDelegate2 = Memory::MakeSharedPtr<CompressingByteStreamRep_> (src, fOptions_);
408 virtual optional<Compression::Stats>
GetStats ()
const
413 return Compression::Ptr{Memory::MakeSharedPtr<MyRep_> (o)};
415 Execution::Throw (kNotSuppExcept_);
420#if qStroika_HasComponent_zstd
422 shared_ptr<DecompressingByteStreamRep_> fDelegate2;
425 fDelegate2 = Memory::MakeSharedPtr<DecompressingByteStreamRep_> (src);
428 virtual optional<Compression::Stats>
GetStats ()
const
433 return Compression::Ptr{Memory::MakeSharedPtr<MyRep_> ()};
435 Execution::Throw (kNotSuppExcept_);
#define AssertNotReached()
conditional_t< qStroika_Foundation_Memory_PreferBlockAllocation and andTrueCheck, BlockAllocationUseHelper< T >, Common::Empty > UseBlockAllocationIfAppropriate
Use this to enable block allocation for a particular class. Beware of subclassing.
NoDataAvailableHandling
If eDontBlock passed to most Stream APIs, then when the code would do a blocking read,...
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
virtual bool IsSeekable() const =0
void ThrowIfNull(const Private_::ConstVoidStar &p, const HRESULT &hr)
Template specialization for ThrowIfNull (), for thing being thrown HRESULT - really throw HRESULTErro...
virtual InputStream::Ptr< byte > Transform(const InputStream::Ptr< byte > &src)=0
virtual optional< Stats > GetStats() const =0
As of Stroika v3.0d22 - options ignored (and no api to train/generate dictionary)
As of Stroika v3.0d22 - options ignored.
StreamReader is an non-essential Stream utility, adding simplicity of use for a common use case,...