Skip to content

Commit 71f1093

Browse files
committed
moves cli
1 parent 021490d commit 71f1093

11 files changed

Lines changed: 990 additions & 884 deletions

File tree

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Simple caller-allocated output example:
3+
*
4+
* - compressInto(): writes compressed bytes into a user-owned device buffer
5+
* - decompressInto(): writes decompressed bytes into a user-owned device buffer
6+
*
7+
* Build:
8+
* cmake -S . -B build -DBUILD_EXAMPLES=ON
9+
* cmake --build build -j --target caller_allocated_output
10+
*
11+
* Run:
12+
* ./build/bin/caller_allocated_output
13+
*/
14+
15+
#include "cuda_check.h"
16+
#include "fzgpumodules.h"
17+
18+
#include <algorithm>
19+
#include <cmath>
20+
#include <cstdlib>
21+
#include <iomanip>
22+
#include <iostream>
23+
#include <vector>
24+
25+
using namespace fz;
26+
27+
static std::vector<float> make_smooth_data(size_t n) {
28+
std::vector<float> v(n);
29+
for (size_t i = 0; i < n; ++i) {
30+
v[i] = std::sin(static_cast<float>(i) * 0.01f) * 50.0f
31+
+ std::cos(static_cast<float>(i) * 0.003f) * 20.0f;
32+
}
33+
return v;
34+
}
35+
36+
int main() {
37+
constexpr size_t N = 1 << 16;
38+
constexpr float EB = 1e-2f;
39+
const size_t input_bytes = N * sizeof(float);
40+
41+
auto h_input = make_smooth_data(N);
42+
43+
float* d_input = nullptr;
44+
FZ_CUDA_CHECK(cudaMalloc(&d_input, input_bytes));
45+
FZ_CUDA_CHECK(cudaMemcpy(d_input, h_input.data(), input_bytes, cudaMemcpyHostToDevice));
46+
47+
Pipeline pipeline(input_bytes, MemoryStrategy::MINIMAL);
48+
auto* lorenzo = pipeline.addStage<LorenzoStage<float, uint16_t>>();
49+
lorenzo->setErrorBound(EB);
50+
lorenzo->setQuantRadius(512);
51+
pipeline.finalize();
52+
53+
// Ask the pipeline for a max compressed output size before allocating.
54+
const size_t compressed_capacity = pipeline.getMaxCompressedOutputSize();
55+
56+
void* d_compressed_user = nullptr;
57+
FZ_CUDA_CHECK(cudaMalloc(&d_compressed_user, compressed_capacity));
58+
59+
size_t compressed_size = 0;
60+
pipeline.compressInto(
61+
d_input,
62+
input_bytes,
63+
d_compressed_user,
64+
compressed_capacity,
65+
&compressed_size,
66+
0);
67+
FZ_CUDA_CHECK(cudaDeviceSynchronize());
68+
69+
// Ask the pipeline for a safe upper bound before allocating output.
70+
const size_t decompressed_capacity = pipeline.getMaxDecompressedOutputSize();
71+
72+
// User-owned decompressed output buffer.
73+
void* d_decompressed_user = nullptr;
74+
FZ_CUDA_CHECK(cudaMalloc(&d_decompressed_user, decompressed_capacity));
75+
76+
size_t decompressed_size = 0;
77+
pipeline.decompressInto(
78+
d_compressed_user,
79+
compressed_size,
80+
d_decompressed_user,
81+
decompressed_capacity,
82+
&decompressed_size,
83+
0);
84+
FZ_CUDA_CHECK(cudaDeviceSynchronize());
85+
86+
if (decompressed_size != input_bytes) {
87+
std::cerr << "Unexpected decompressed size: " << decompressed_size
88+
<< " (expected " << input_bytes << ")\n";
89+
cudaFree(d_decompressed_user);
90+
cudaFree(d_compressed_user);
91+
cudaFree(d_input);
92+
return 1;
93+
}
94+
95+
std::vector<float> h_recon(N);
96+
FZ_CUDA_CHECK(cudaMemcpy(
97+
h_recon.data(), d_decompressed_user, input_bytes, cudaMemcpyDeviceToHost));
98+
99+
float max_abs_error = 0.0f;
100+
for (size_t i = 0; i < N; ++i) {
101+
max_abs_error = std::max(max_abs_error, std::abs(h_recon[i] - h_input[i]));
102+
}
103+
104+
std::cout << std::fixed << std::setprecision(3);
105+
std::cout << "Caller-allocated output API example\n";
106+
std::cout << " input bytes: " << input_bytes << "\n";
107+
std::cout << " compressed size: " << compressed_size << "\n";
108+
std::cout << " compressed cap: " << compressed_capacity << "\n";
109+
std::cout << " decompressed cap: " << decompressed_capacity << "\n";
110+
std::cout << " decompressed size: " << decompressed_size << "\n";
111+
std::cout << " max abs error: " << max_abs_error << "\n";
112+
113+
cudaFree(d_decompressed_user);
114+
cudaFree(d_compressed_user);
115+
cudaFree(d_input);
116+
return 0;
117+
}

include/pipeline/compressor.h

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,40 @@ class Pipeline {
134134
cudaStream_t stream = 0
135135
);
136136

137+
/**
138+
* Compress into a caller-allocated device buffer.
139+
*
140+
* @param d_input Device pointer to raw input.
141+
* @param input_size Input size in bytes.
142+
* @param d_output Caller-allocated device buffer for compressed bytes.
143+
* @param output_capacity Capacity of d_output in bytes.
144+
* @param output_size Receives the compressed size in bytes.
145+
* @param stream CUDA stream for all GPU operations.
146+
*/
147+
void compressInto(
148+
const void* d_input,
149+
size_t input_size,
150+
void* d_output,
151+
size_t output_capacity,
152+
size_t* output_size,
153+
cudaStream_t stream = 0
154+
);
155+
156+
/**
157+
* Compress multi-source input into a caller-allocated device buffer.
158+
*
159+
* Output format matches compress(const std::vector<InputSpec>&):
160+
* single output buffer for single-source pipelines; concat format for
161+
* multi-source pipelines.
162+
*/
163+
void compressInto(
164+
const std::vector<InputSpec>& inputs,
165+
void* d_output,
166+
size_t output_capacity,
167+
size_t* output_size,
168+
cudaStream_t stream = 0
169+
);
170+
137171
/**
138172
* Compress (multi-source). One InputSpec per source stage; order does not matter.
139173
* *d_output is pool-owned — do NOT cudaFree.
@@ -177,6 +211,45 @@ class Pipeline {
177211
cudaStream_t stream = 0
178212
);
179213

214+
/**
215+
* Decompress into a caller-allocated device buffer (single-source only).
216+
*
217+
* @param d_input nullptr to read from live forward DAG buffers, or
218+
* a device pointer to external compressed bytes.
219+
* @param input_size Byte size of d_input (ignored when d_input is nullptr).
220+
* @param d_output Caller-allocated device buffer for decompressed bytes.
221+
* @param output_capacity Capacity of d_output in bytes.
222+
* @param output_size Receives the exact decompressed size in bytes.
223+
* @param stream CUDA stream for all GPU operations.
224+
*/
225+
void decompressInto(
226+
const void* d_input,
227+
size_t input_size,
228+
void* d_output,
229+
size_t output_capacity,
230+
size_t* output_size,
231+
cudaStream_t stream = 0
232+
);
233+
234+
/**
235+
* Decompress multi-source data into caller-allocated output buffers.
236+
*
237+
* @param d_input nullptr to read from live forward DAG buffers,
238+
* or a device pointer to external compressed bytes.
239+
* @param input_size Byte size of d_input (ignored when d_input is nullptr).
240+
* @param d_outputs One device pointer per source output.
241+
* @param output_capacities Capacity (bytes) for each output pointer.
242+
* @param stream CUDA stream for all GPU operations.
243+
* @return Exact decompressed size (bytes) for each source output.
244+
*/
245+
std::vector<size_t> decompressMultiInto(
246+
const void* d_input,
247+
size_t input_size,
248+
const std::vector<void*>& d_outputs,
249+
const std::vector<size_t>& output_capacities,
250+
cudaStream_t stream = 0
251+
);
252+
180253
/**
181254
* Decompress (multi-source). Returns one {device_ptr, size} pair per source,
182255
* in the same order as forward source discovery. Ownership follows
@@ -188,6 +261,32 @@ class Pipeline {
188261
cudaStream_t stream = 0
189262
);
190263

264+
/**
265+
* Maximum compressed output size for the current finalized pipeline.
266+
*
267+
* Returned value is an upper bound suitable for caller allocation before
268+
* compressInto(...). For multi-source pipelines this corresponds to the
269+
* single concat output format returned by compress().
270+
*/
271+
size_t getMaxCompressedOutputSize() const;
272+
273+
/**
274+
* Maximum decompressed output size (single-source pipelines only).
275+
*
276+
* Value is derived from the most recent compress() input size when available,
277+
* otherwise from finalize-time size hints. Returns an upper bound suitable
278+
* for caller allocation before decompressInto().
279+
*/
280+
size_t getMaxDecompressedOutputSize() const;
281+
282+
/**
283+
* Maximum decompressed output size per source (multi-source aware).
284+
*
285+
* Order matches decompressMulti()/decompressMultiInto() source order.
286+
* Values are upper bounds suitable for caller allocation.
287+
*/
288+
std::vector<size_t> getMaxDecompressedOutputSizes() const;
289+
191290
/** Free non-persistent buffers and reset execution state for re-use. */
192291
void reset(cudaStream_t stream = 0);
193292

@@ -270,6 +369,12 @@ class Pipeline {
270369
/** Parse the FZM header from a file without decompressing the payload. */
271370
static FZMFileHeader readHeader(const std::string& filename);
272371

372+
/** Exact decompressed output size from an FZM file (single-source convenience). */
373+
static size_t getDecompressedOutputSizeFromFile(const std::string& filename);
374+
375+
/** Exact decompressed output sizes (one per source) from an FZM file header. */
376+
static std::vector<size_t> getDecompressedOutputSizesFromFile(const std::string& filename);
377+
273378
/** Build the FZM header from current pipeline state. Requires a prior compress(). */
274379
FZMFileHeader buildHeader() const;
275380

@@ -332,6 +437,14 @@ class Pipeline {
332437
std::vector<Stage*> getSourceStages() const;
333438
std::vector<Stage*> getSinkStages() const;
334439

440+
std::vector<std::pair<void*, size_t>> decompressMultiImpl(
441+
const void* d_input,
442+
size_t input_size,
443+
cudaStream_t stream,
444+
const std::vector<void*>* caller_outputs,
445+
const std::vector<size_t>* caller_capacities
446+
);
447+
335448
// ── Inverse DAG helpers ───────────────────────────────────────────────────
336449

337450
/** Compact description of one forward stage used by buildInverseDAG(). */

src/pipeline/compressor.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,95 @@ size_t Pipeline::getPoolThreshold() const {
584584
return mem_pool_ ? mem_pool_->getConfiguredSize() : 0;
585585
}
586586

587+
size_t Pipeline::getMaxCompressedOutputSize() const {
588+
if (!is_finalized_) {
589+
throw std::runtime_error(
590+
"getMaxCompressedOutputSize() requires finalize() to be called first");
591+
}
592+
if (!dag_) {
593+
throw std::runtime_error("getMaxCompressedOutputSize(): DAG is not initialized");
594+
}
595+
if (output_buffer_ids_.empty()) {
596+
throw std::runtime_error(
597+
"getMaxCompressedOutputSize(): pipeline has no detected output buffers");
598+
}
599+
600+
if (!needs_concat_) {
601+
return dag_->getBufferSize(output_buffer_ids_[0]);
602+
}
603+
604+
// Mirror concat format capacity computation using buffer capacities rather
605+
// than run-time actual sizes.
606+
auto align16_local = [](size_t x) -> size_t { return (x + 15u) & ~15u; };
607+
608+
const size_t n = output_buffer_ids_.size();
609+
size_t total = align16_local(sizeof(uint32_t) + n * sizeof(uint64_t));
610+
for (int buf_id : output_buffer_ids_) {
611+
total += align16_local(dag_->getBufferSize(buf_id));
612+
}
613+
return total;
614+
}
615+
616+
std::vector<size_t> Pipeline::getMaxDecompressedOutputSizes() const {
617+
if (!is_finalized_) {
618+
throw std::runtime_error(
619+
"getMaxDecompressedOutputSizes() requires finalize() to be called first");
620+
}
621+
622+
std::vector<size_t> out;
623+
out.reserve(input_nodes_.size());
624+
625+
for (size_t i = 0; i < input_nodes_.size(); ++i) {
626+
size_t max_size = 0;
627+
628+
// Most accurate value: actual source size from the most recent compress().
629+
if (i < source_input_sizes_.size() && source_input_sizes_[i] > 0) {
630+
max_size = source_input_sizes_[i];
631+
}
632+
633+
// Fallback: per-source finalize-time hint.
634+
if (max_size == 0) {
635+
Stage* src_stage = input_nodes_[i]->stage;
636+
auto hint_it = per_source_hints_.find(src_stage);
637+
if (hint_it != per_source_hints_.end() && hint_it->second > 0) {
638+
max_size = hint_it->second;
639+
}
640+
}
641+
642+
// Fallback: constructor hint (single-source common case).
643+
if (max_size == 0 && input_size_hint_ > 0) {
644+
max_size = input_size_hint_;
645+
}
646+
647+
// Final fallback: finalized DAG input buffer size if it is not the
648+
// placeholder 1-byte value used when no hint is available.
649+
if (max_size == 0 && i < input_buffer_ids_.size() && dag_) {
650+
const size_t dag_size = dag_->getBufferSize(input_buffer_ids_[i]);
651+
if (dag_size > 1) max_size = dag_size;
652+
}
653+
654+
// Returned value is a max allocation size; preserve alignment rounding.
655+
if (max_size > 0 && input_alignment_bytes_ > 1) {
656+
max_size = ((max_size + input_alignment_bytes_ - 1) / input_alignment_bytes_)
657+
* input_alignment_bytes_;
658+
}
659+
660+
out.push_back(max_size);
661+
}
662+
663+
return out;
664+
}
665+
666+
size_t Pipeline::getMaxDecompressedOutputSize() const {
667+
auto sizes = getMaxDecompressedOutputSizes();
668+
if (sizes.size() != 1) {
669+
throw std::runtime_error(
670+
"getMaxDecompressedOutputSize() is single-source only; use getMaxDecompressedOutputSizes() for " +
671+
std::to_string(sizes.size()) + " source outputs");
672+
}
673+
return sizes[0];
674+
}
675+
587676
size_t Pipeline::getCurrentMemoryUsage() const {
588677
return dag_ ? dag_->getCurrentMemoryUsage() : 0;
589678
}

0 commit comments

Comments
 (0)