Stroika Library 3.0d23x
 
Loading...
Searching...
No Matches
ZStd.cpp
1/*
2 * Copyright(c) Sophist Solutions, Inc. 1990-2026. All rights reserved
3 */
4#include "Stroika/Foundation/StroikaPreComp.h"
5
8#include "Stroika/Foundation/Execution/FeatureNotSupportedException.h"
11
12#if qStroika_HasComponent_zstd
13#include <zstd.h>
14#endif
15
16#include "ZStd.h"
17
18using std::byte;
19
20using namespace Stroika::Foundation;
24using namespace Stroika::Foundation::Debug;
25using namespace Stroika::Foundation::Streams;
26
27// Comment this in to turn on aggressive noisy DbgTrace in this module
28//#define USE_NOISY_TRACE_IN_THIS_MODULE_ 1
29
30#if !qStroika_HasComponent_zstd
31namespace {
32 const auto kNotSuppExcept_ = Execution::FeatureNotSupportedException{"ZStd"sv};
33}
34#endif
35
36#if qStroika_HasComponent_zstd
37namespace {
38 inline void ThrowIfZStdErr_ (size_t rc) // CHECK_ZSTD
39 {
40 if (ZSTD_isError (rc)) {
41 Execution::Throw (Execution::RuntimeErrorException{"ZStd error: {}"_f(String::FromNarrowSDKString (::ZSTD_getErrorName (rc)))});
42 }
43 }
44
45 constexpr size_t kSmallSoBlockAllocWorksWellNotInlineAnyhow_ = 1;
46
47 struct CompressingByteStreamRep_ final : InputStream::IRep<byte>, Memory::UseBlockAllocationIfAppropriate<CompressingByteStreamRep_> {
48 public:
49 CompressingByteStreamRep_ () = delete;
50 CompressingByteStreamRep_ (const CompressingByteStreamRep_&) = delete;
51 CompressingByteStreamRep_ (const Streams::InputStream::Ptr<byte>& in, Compress::Options o)
52 : fInStreamReader_{make_unique<Streams::StreamReader<byte>> (in)}
53 {
54 Require (not o.fCompressionLevel.has_value () or (0 <= o.fCompressionLevel and o.fCompressionLevel <= 1));
55
56 Execution::ThrowIfNull (fCtx_ = ::ZSTD_createCCtx ());
57 if (o.fCompressionLevel.has_value ()) {
58 // The library supports regular compression levels from 1 up to ZSTD_maxCLevel()
59 ThrowIfZStdErr_ (::ZSTD_CCtx_setParameter (fCtx_, ZSTD_c_compressionLevel,
60 static_cast<int> ((::ZSTD_maxCLevel () - 1 + 1) * (*o.fCompressionLevel)) + 1));
61 }
62 else {
63 ThrowIfZStdErr_ (::ZSTD_CCtx_setParameter (fCtx_, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT));
64 }
65 // from example - not sure if helpful
66 ThrowIfZStdErr_ (::ZSTD_CCtx_setParameter (fCtx_, ZSTD_c_checksumFlag, 1));
67 }
68 virtual ~CompressingByteStreamRep_ ()
69 {
70 if (fCtx_ != nullptr) {
71 ::ZSTD_freeCCtx (fCtx_);
72 fCtx_ = nullptr;
73 }
74 }
75 virtual bool IsSeekable () const override
76 {
77 return false; // SHOULD allow seekable IFF src is seekable, but tricky because of internal state in compress/decompress library - not sure how to update/manage
78 }
79 virtual void CloseRead () override
80 {
81 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
82 fInStreamReader_ = nullptr;
83 Ensure (not IsOpenRead ());
84 }
85 virtual bool IsOpenRead () const override
86 {
87 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
88 return fInStreamReader_ != nullptr;
89 }
90 virtual SeekOffsetType GetReadOffset () const override
91 {
92 Require (IsOpenRead ());
93 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
94 return fSeekOffset_;
95 }
96 virtual optional<size_t> AvailableToRead () override
97 {
98 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
99 // at most twice through loop
100 while (true) {
101 if (fOutputBufCache_.size () != 0) {
102 return fOutputBufCache_.size ();
103 }
104 FillOutputBufCache_ (NoDataAvailableHandling::eDontBlock); // pull and process what we can without blocking
105 if (fOutputBufCache_.empty ()) {
106 if (auto ob = fInStreamReader_->IsAtEOF (NoDataAvailableHandling::eDontBlock); ob and *ob) {
107 return 0;
108 }
109 return nullopt;
110 }
111 }
112 }
113 virtual optional<SeekOffsetType> RemainingLength () override
114 {
115 return nullopt; // generally cannot tell without side-effects on input stream
116 }
117 virtual optional<span<byte>> Read (span<byte> intoBuffer, NoDataAvailableHandling blockFlag) override
118 {
119 Require (not intoBuffer.empty ()); // API rule for streams
120
121 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
122
123 /*
124 * First try output cache, and then fill if needed. If second time around, and still empty, must be EOF or dont know (cuz not blocking)
125 */
126 while (true) {
127 // See if request can be satisfied from cached output bytes; note we only NEED to return one byte (but can return more)
128 if (not fOutputBufCache_.empty ()) {
129 size_t nToCopy = min (intoBuffer.size (), fOutputBufCache_.size ());
130 auto r = Memory::CopySpanData (fOutputBufCache_.subspan (0, nToCopy), intoBuffer); // intoBuffer large enuf cuz we pinned size with nToCopy
131 fOutputBufCache_ = fOutputBufCache_.subspan (nToCopy); // skip returned bytes
132 fSeekOffset_ += nToCopy;
133 return r;
134 }
135 FillOutputBufCache_ (blockFlag);
136 if (fOutputBufCache_.empty ()) {
137 // if fill failed, either cuz non-blocking, or EOF
138 return fStage_ == Stage_::eDone ? span<byte>{} : optional<span<byte>>{};
139 }
140 }
141 }
142
143 private:
144 unique_ptr<Streams::StreamReader<byte>> fInStreamReader_; // wrapped/buffered provided input stream
145 Memory::InlineBuffer<byte, kSmallSoBlockAllocWorksWellNotInlineAnyhow_> fInputBuf_{Memory::eUninitialized, ::ZSTD_CStreamInSize ()}; // used to cache extra input (uncompressed) bytes not yet proceessed
146 span<byte> fRawUnprocessedInputBytes_{}; // empty or subspan of fInputBuf_
148 Memory::eUninitialized, ::ZSTD_CStreamOutSize ()}; // used to cache extra output (compressed) bytes not yet returned (NOTE - CStreamOutSize maybe wrong to use here)
149 span<byte> fOutputBufCache_{}; // empty or subspan of fOutBuf_
150 ZSTD_CCtx* fCtx_{nullptr};
151 enum class Stage_ {
152 eReadingInput,
153 eEndOutput,
154 eDone
155 };
156 Stage_ fStage_{Stage_::eReadingInput};
157 SeekOffsetType fSeekOffset_{};
158 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
159
160 private:
161 struct CompressResult_ {
162 size_t fConsumedInputBytes_{};
163 size_t fProducedOutputBytes_{};
164 size_t fRemaining;
165 };
166 static CompressResult_ DoZStd_Compress_ (ZSTD_CCtx* ctx, span<const byte> fromBytes, ZSTD_EndDirective endFlag, span<byte> intoCompressedBytes)
167 {
168 ZSTD_inBuffer input = {fromBytes.data (), fromBytes.size (), 0};
169 ZSTD_outBuffer output = {intoCompressedBytes.data (), intoCompressedBytes.size (), 0};
170 size_t const remaining = ::ZSTD_compressStream2 (ctx, &output, &input, endFlag);
171 ThrowIfZStdErr_ (remaining);
172 return CompressResult_{.fConsumedInputBytes_ = input.pos, .fProducedOutputBytes_ = output.pos, .fRemaining = remaining};
173 }
174
175 private:
176 void FillOutputBufCache_ (NoDataAvailableHandling blockFlag)
177 {
178 Require (fOutputBufCache_.empty ());
179 // first read from the input stream, and accumulate until EOF on input stream (using ZSTD_e_continue or ZSTD_e_flush) - (Process chunks)
180 switch (fStage_) {
181 // TODO
182 // Latency vs. Ratio: If you need data to be available immediately
183 // (e.g., for real-time network packets), use ZSTD_e_flush instead of ZSTD_e_continue,
184 // though this may slightly reduce your compression ratio.
185 case Stage_::eReadingInput: {
186 while (true) {
187 // Combine existing fInputBuf_ cached data with a bit more we try to read, so we pass as big a chunk as possible to ZStd lib
188 // Read argument windows into fInputBuf_, just after any bytes already read
189 if (fRawUnprocessedInputBytes_.size () < fInputBuf_.size ()) {
190 if (optional<span<byte>> n = fInStreamReader_->Read (span{fInputBuf_}.subspan (fRawUnprocessedInputBytes_.size ()), blockFlag);
191 n and not n->empty ()) {
192 fRawUnprocessedInputBytes_ = span{fInputBuf_}.first (fRawUnprocessedInputBytes_.size () + n->size ());
193 }
194 }
195
196 Assert (fOutputBufCache_.empty ());
197 if (fRawUnprocessedInputBytes_.empty ()) {
198 // If nothing available to compress, either input at EOF, or must return nullopt and wait for more
199 if (optional<bool> isAtEOF = fInStreamReader_->IsAtEOF (blockFlag); isAtEOF and *isAtEOF) {
200 fStage_ = Stage_::eEndOutput; // if instream reader definitely at EOF, then fRawUnprocessedInputBytes_ contains all that is left
201 goto EndOutput;
202 }
203 else {
204 Assert (blockFlag == NoDataAvailableHandling::eDontBlock); // else we would have blocked getting at least one byte
205 return;
206 }
207 }
208 else {
209 // Now if we have any input bytes to compress (or at EOF, and may need to write more stuff at end), run it through the library
210 CompressResult_ compressResults = DoZStd_Compress_ (fCtx_, fRawUnprocessedInputBytes_, ZSTD_e_continue, span{fOutBuf_});
211 fRawUnprocessedInputBytes_ = fRawUnprocessedInputBytes_.subspan (compressResults.fConsumedInputBytes_);
212
213 // if anything produced, adjust cache(s) and return it
214 if (compressResults.fProducedOutputBytes_ > 0) {
215 // cache excess output bytes, and return those that will fit
216 fOutputBufCache_ = span{fOutBuf_}.subspan (0, compressResults.fProducedOutputBytes_); // skip returned bytes
217 return;
218 }
219 else {
220 Assert (compressResults.fConsumedInputBytes_ > 0); // keep going - making progress
221 }
222 }
223 }
224 // There maybe more to pull from the streamreader, so we cannot assume we are done
225 // this line probably wrong!!!
227 } break;
228 case Stage_::eEndOutput: {
229 EndOutput:
230 Assert (fOutputBufCache_.empty ());
231 Assert (fRawUnprocessedInputBytes_.empty ());
232 // then input has already signaled EOF (this cannot change) - and we just do final fetch of remaining output (Final flush and frame end)
233 CompressResult_ compressResults = DoZStd_Compress_ (fCtx_, span<const byte>{}, ZSTD_e_end, span{fOutBuf_});
234 fOutputBufCache_ = span{fOutBuf_}.subspan (0, compressResults.fProducedOutputBytes_);
235 if (compressResults.fRemaining == 0) {
236 fStage_ = Stage_::eDone; // Frame fully flushed and finished
237 }
238 return;
239 }
240 case Stage_::eDone: {
241 return; // no more data to read
242 }
243 default:
245 }
246 }
247 };
248
249 struct DecompressingByteStreamRep_ final : InputStream::IRep<byte>, Memory::UseBlockAllocationIfAppropriate<DecompressingByteStreamRep_> {
250 unique_ptr<Streams::StreamReader<byte>> fInStreamReader_; // wrapped/buffered provided input stream
251 Memory::InlineBuffer<byte, kSmallSoBlockAllocWorksWellNotInlineAnyhow_> fInputBuf_{Memory::eUninitialized, ::ZSTD_DStreamInSize ()}; // used to cache extra input (compressed) bytes not yet proceessed
252 span<byte> fRawUnprocessedInputBytes_{}; // empty or subspan of fInputBuf_
254 Memory::eUninitialized, ::ZSTD_DStreamOutSize ()}; // used to cache extra output (uncompressed) bytes not yet returned (NOTE - CStreamOutSize maybe wrong to use here)
255 span<byte> fOutputBufCache_{}; // empty or subspan of fOutBuf_
256 ZSTD_DCtx* fCtx_{nullptr};
257 SeekOffsetType fSeekOffset_{};
258 [[no_unique_address]] Debug::AssertExternallySynchronizedMutex fThisAssertExternallySynchronized_;
259
260 public:
261 DecompressingByteStreamRep_ () = delete;
262 DecompressingByteStreamRep_ (const DecompressingByteStreamRep_&) = delete;
263 DecompressingByteStreamRep_ (const Streams::InputStream::Ptr<byte>& in)
264 : fInStreamReader_{make_unique<Streams::StreamReader<byte>> (in)}
265 {
266 Execution::ThrowIfNull (fCtx_ = ::ZSTD_createDCtx ());
267 }
268 virtual ~DecompressingByteStreamRep_ ()
269 {
270 if (fCtx_ != nullptr) {
271 ::ZSTD_freeDCtx (fCtx_);
272 fCtx_ = nullptr;
273 }
274 }
275 virtual bool IsSeekable () const override
276 {
277 return false; // SHOULD allow seekable IFF src is seekable, but tricky because of internal state in compress/decompress library - not sure how to update/manage
278 }
279 virtual void CloseRead () override
280 {
281 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
282 fInStreamReader_ = nullptr;
283 Ensure (not IsOpenRead ());
284 }
285 virtual bool IsOpenRead () const override
286 {
287 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
288 return fInStreamReader_ != nullptr;
289 }
290 virtual SeekOffsetType GetReadOffset () const override
291 {
292 Require (IsOpenRead ());
293 AssertExternallySynchronizedMutex::ReadContext declareContext{fThisAssertExternallySynchronized_};
294 return fSeekOffset_;
295 }
296 virtual optional<size_t> AvailableToRead () override
297 {
298 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
299 // at most twice through loop
300 while (true) {
301 if (fOutputBufCache_.size () != 0) {
302 return fOutputBufCache_.size ();
303 }
304 FillOutputBufCache_ (NoDataAvailableHandling::eDontBlock); // pull and process what we can without blocking
305 if (fOutputBufCache_.empty ()) {
306 if (auto ob = fInStreamReader_->IsAtEOF (NoDataAvailableHandling::eDontBlock); ob and *ob) {
307 return 0;
308 }
309 return nullopt;
310 }
311 }
312 }
313 virtual optional<SeekOffsetType> RemainingLength () override
314 {
315 return nullopt; // generally cannot tell without side-effects on input stream
316 }
317 virtual optional<span<byte>> Read (span<byte> intoBuffer, NoDataAvailableHandling blockFlag) override
318 {
319 Require (not intoBuffer.empty ()); // API rule for streams
320
321 AssertExternallySynchronizedMutex::WriteContext declareContext{fThisAssertExternallySynchronized_};
322
323 /*
324 * First try output cache, and then fill if needed. If second time around, and still empty, must be EOF or dont know (cuz not blocking)
325 */
326 while (true) {
327 // See if request can be satisfied from cached output bytes; note we only NEED to return one byte (but can return more)
328 if (not fOutputBufCache_.empty ()) {
329 size_t nToCopy = min (intoBuffer.size (), fOutputBufCache_.size ());
330 auto r = Memory::CopySpanData (fOutputBufCache_.subspan (0, nToCopy), intoBuffer); // intoBuffer large enuf cuz we pinned size with nToCopy
331 fOutputBufCache_ = fOutputBufCache_.subspan (nToCopy); // skip returned bytes
332 fSeekOffset_ += nToCopy;
333 return r;
334 }
335 FillOutputBufCache_ (blockFlag);
336 if (fOutputBufCache_.empty ()) {
337 // if fill failed, either cuz non-blocking, or EOF
338 return blockFlag == NoDataAvailableHandling::eBlockIfNoDataAvailable ? span<byte>{} : optional<span<byte>>{};
339 }
340 }
341 }
342
343 private:
344 struct DecompressResult_ {
345 size_t fConsumedInputBytes_{};
346 size_t fProducedOutputBytes_{};
347 size_t fRemaining;
348 };
349 static DecompressResult_ DoZStd_Decompress_ (ZSTD_DCtx* ctx, span<const byte> fromBytes, span<byte> intoDecompressedBytes)
350 {
351 ZSTD_inBuffer input = {fromBytes.data (), fromBytes.size (), 0};
352 ZSTD_outBuffer output = {intoDecompressedBytes.data (), intoDecompressedBytes.size (), 0};
353 size_t const remaining = ::ZSTD_decompressStream (ctx, &output, &input);
354 ThrowIfZStdErr_ (remaining);
355 return DecompressResult_{.fConsumedInputBytes_ = input.pos, .fProducedOutputBytes_ = output.pos, .fRemaining = remaining};
356 }
357 void FillOutputBufCache_ (NoDataAvailableHandling blockFlag)
358 {
359 while (true) {
360 // Combine existing fInputBuf_ cached data with a bit more we try to read, so we pass as big a chunk as possible to ZStd lib
361 // Read argument windows into fInputBuf_, just after any bytes already read
362 if (fRawUnprocessedInputBytes_.size () < fInputBuf_.size ()) {
363 if (optional<span<byte>> n = fInStreamReader_->Read (span{fInputBuf_}.subspan (fRawUnprocessedInputBytes_.size ()), blockFlag);
364 n and not n->empty ()) {
365 fRawUnprocessedInputBytes_ = span{fInputBuf_}.first (fRawUnprocessedInputBytes_.size () + n->size ());
366 }
367 }
368
369 Assert (fOutputBufCache_.empty ());
370 if (fRawUnprocessedInputBytes_.empty ()) {
371 return; // all we can do... either at EOF, or no more non-blocking data available
372 }
373 else {
374 // Now if we have any input bytes to decompress
375 DecompressResult_ decompressResults = DoZStd_Decompress_ (fCtx_, fRawUnprocessedInputBytes_, span{fOutBuf_});
376 fRawUnprocessedInputBytes_ = fRawUnprocessedInputBytes_.subspan (decompressResults.fConsumedInputBytes_);
377
378 // if anything produced, adjust cache(s) and return it
379 if (decompressResults.fProducedOutputBytes_ > 0) {
380 fOutputBufCache_ = span{fOutBuf_}.subspan (0, decompressResults.fProducedOutputBytes_);
381 return;
382 }
383 else {
384 Assert (decompressResults.fConsumedInputBytes_ > 0); // keep going - making progress - buffered internally in fCtx_
385 }
386 }
387 }
388 }
389 };
390}
391#endif
392
393Compression::Ptr ZStd::Compress::New (const ZStd::Compress::Options& o)
394{
395#if qStroika_HasComponent_zstd
396 struct MyRep_ final : IRep, public Memory::UseBlockAllocationIfAppropriate<MyRep_> {
397 ZStd::Compress::Options fOptions_;
398 shared_ptr<CompressingByteStreamRep_> fDelegate2;
399 MyRep_ (const ZStd::Compress::Options& o)
400 : fOptions_{o}
401 {
402 }
404 {
405 fDelegate2 = Memory::MakeSharedPtr<CompressingByteStreamRep_> (src, fOptions_);
406 return InputStream::Ptr<byte>{fDelegate2};
407 }
408 virtual optional<Compression::Stats> GetStats () const
409 {
410 return nullopt;
411 }
412 };
413 return Compression::Ptr{Memory::MakeSharedPtr<MyRep_> (o)};
414#else
415 Execution::Throw (kNotSuppExcept_);
416#endif
417}
418Compression::Ptr ZStd::Decompress::New ([[maybe_unused]] const ZStd::Decompress::Options& o)
419{
420#if qStroika_HasComponent_zstd
421 struct MyRep_ final : IRep, public Memory::UseBlockAllocationIfAppropriate<MyRep_> {
422 shared_ptr<DecompressingByteStreamRep_> fDelegate2;
424 {
425 fDelegate2 = Memory::MakeSharedPtr<DecompressingByteStreamRep_> (src);
426 return InputStream::Ptr<byte>{fDelegate2};
427 }
428 virtual optional<Compression::Stats> GetStats () const
429 {
430 return nullopt;
431 }
432 };
433 return Compression::Ptr{Memory::MakeSharedPtr<MyRep_> ()};
434#else
435 Execution::Throw (kNotSuppExcept_);
436#endif
437}
#define AssertNotReached()
Definition Assertions.h:355
conditional_t< qStroika_Foundation_Memory_PreferBlockAllocation and andTrueCheck, BlockAllocationUseHelper< T >, Common::Empty > UseBlockAllocationIfAppropriate
Use this to enable block allocation for a particular class. Beware of subclassing.
NoDataAvailableHandling
If eDontBlock passed to most Stream APIs, then when the code would do a blocking read,...
Definition Stream.h:90
NOT a real mutex - just a debugging infrastructure support tool so in debug builds can be assured thr...
shared_lock< const AssertExternallySynchronizedMutex > ReadContext
Instantiate AssertExternallySynchronizedMutex::ReadContext to designate an area of code where protect...
unique_lock< AssertExternallySynchronizedMutex > WriteContext
Instantiate AssertExternallySynchronizedMutex::WriteContext to designate an area of code where protec...
Logically halfway between std::array and std::vector; Smart 'direct memory array' - which when needed...
virtual bool IsSeekable() const =0
virtual SeekOffsetType GetReadOffset() const =0
virtual optional< span< ElementType > > Read(span< ElementType > intoBuffer, NoDataAvailableHandling blockFlag)=0
virtual optional< SeekOffsetType > RemainingLength()
returns nullopt if not known (typical, and the default) - but sometimes it is known,...
virtual optional< size_t > AvailableToRead()
returns nullopt if nothing known available, zero if known EOF, and any other number of elements (typi...
InputStream<>::Ptr is Smart pointer (with abstract Rep) class defining the interface to reading from ...
void ThrowIfNull(const Private_::ConstVoidStar &p, const HRESULT &hr)
Template specialization for ThrowIfNull (), for thing being thrown HRESULT - really throw HRESULTErro...
virtual InputStream::Ptr< byte > Transform(const InputStream::Ptr< byte > &src)=0
virtual optional< Stats > GetStats() const =0
As of Stroika v3.0d22 - options ignored (and no api to train/generate dictionary)
Definition ZStd.h:48
StreamReader is an non-essential Stream utility, adding simplicity of use for a common use case,...