Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 112 additions & 16 deletions src/ASDF.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module ASDF

using ChunkCodecLibBlosc: BloscCodec, BloscEncodeOptions
using ChunkCodecLibBzip2: BZ2Codec, BZ2EncodeOptions
using ChunkCodecLibLz4: LZ4FrameCodec, LZ4FrameEncodeOptions
using ChunkCodecLibLz4: LZ4BlockCodec, LZ4FrameCodec, LZ4BlockEncodeOptions, LZ4FrameEncodeOptions
using ChunkCodecLibZlib: ZlibCodec, ZlibEncodeOptions
using ChunkCodecLibZstd: ZstdCodec, ZstdEncodeOptions, decode, encode
using CodecXz: XzCompressor, XzDecompressor
Expand Down Expand Up @@ -51,6 +51,7 @@ struct BlockHeader
used_size::UInt64
data_size::UInt64
checksum::AbstractVector{UInt8} # length 16
validate_checksum::Bool
end

mutable struct LazyBlockHeaders
Expand Down Expand Up @@ -88,6 +89,8 @@ big2native_U8(bytes::AbstractVector{UInt8}) = bytes[1]
big2native_U16(bytes::AbstractVector{UInt8}) = (UInt16(bytes[1]) << 8) | bytes[2]
big2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(big2native_U16(@view bytes[1:2])) << 16) | big2native_U16(@view bytes[3:4])
big2native_U64(bytes::AbstractVector{UInt8}) = (UInt64(big2native_U32(@view bytes[1:4])) << 32) | big2native_U32(@view bytes[5:8])
# Read a 4-byte little-endian UInt32 (used for lz4.block store_size prefix)
little2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(bytes[1])) | (UInt32(bytes[2]) << 8) | (UInt32(bytes[3]) << 16) | (UInt32(bytes[4]) << 24)

native2big_U8(val::UInt8) = UInt8[val]
native2big_U16(val::UInt16) = UInt8[(val >>> 0x08) & 0xff, (val >>> 0x00) & 0xff]
Expand All @@ -109,7 +112,7 @@ native2big_U16(val::Integer) = native2big_U16(UInt16(val))
native2big_U32(val::Integer) = native2big_U32(UInt32(val))
native2big_U64(val::Integer) = native2big_U64(UInt64(val))

function read_block_header(io::IO, position::Int64)
function read_block_header(io::IO, position::Int64; validate_checksum::Bool)
# Read block header
max_header_size = 6 + 48
header = Array{UInt8}(undef, max_header_size)
Expand Down Expand Up @@ -138,14 +141,14 @@ function read_block_header(io::IO, position::Int64)
# TODO: Better error message
@assert allocated_size >= used_size

return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum)
return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum, validate_checksum)
end

function find_all_blocks(io::IO, pos::Int64=Int64(0))
function find_all_blocks(io::IO, pos::Int64=Int64(0); validate_checksum::Bool)
headers = BlockHeader[]
pos = find_next_block(io, pos)
while pos !== nothing
header = read_block_header(io, pos)
header = read_block_header(io, pos; validate_checksum)
push!(headers, header)
pos = Int64(header.position + 6 + header.header_size + header.allocated_size)
pos = find_next_block(io, pos)
Expand All @@ -162,7 +165,7 @@ function read_block(header::BlockHeader)
@assert nb == length(data)

# Check checksum
if any(header.checksum != 0)
if header.validate_checksum && any(!iszero, header.checksum)
actual_checksum = md5(data)
# TODO: Better error message
@assert all(actual_checksum == header.checksum)
Expand All @@ -175,13 +178,13 @@ function read_block(header::BlockHeader)
# do nothing, the block is uncompressed
elseif compression == C_Xz
data = transcode(XzDecompressor, data)
elseif compression == C_Lz4
data = decode_Lz4(data)
else
if compression == C_Blosc
codec = BloscCodec()
elseif compression == C_Bzip2
codec = BZ2Codec()
elseif compression == C_Lz4
codec = LZ4FrameCodec()
elseif compression == C_Zlib
codec = ZlibCodec()
elseif compression == C_Zstd
Expand All @@ -200,6 +203,46 @@ function read_block(header::BlockHeader)
return data
end

function decode_Lz4(data)
if (# LZ4 Frame magic bytes 04 22 4D 18
length(data) >= 4 &&
data[1] == 0x04 && data[2] == 0x22 &&
data[3] == 0x4D && data[4] == 0x18)
return decode(LZ4FrameCodec(), data)
else
# If the data was originally created from Python's ASDF, then it will be in block instead of frame layout,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also produce the block layout? Should we? Can Python handle the frame layout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me, will add that in. Looks like they can, but as a plug-in https://github.com/asdf-format/asdf-compression

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in e012b21

There is a layout option for NDArrayWrapper now to flip between frame and block. Does that seem like a reasonable place to control things? I guess it doesn't really make sense for other compression schemes, so I just set the default layout as default

I don't love from a maintainability point of view that there is now a hand-rolled Lz4-specific encode and decode path to accommodate Python's asdf scheme. There's also the matter of compatibility with Lz4 frame support on the Python side, but since that's still an experimental plugin, maybe that can be a problem for future us to deal with.

I am now squarely out of my comfort zone and would gladly accept any suggestions for simplifying things, haha. Thanks again for taking a look!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only for lz4 then I would call it lz4_layout, with values :frame and :block. I am sure that other compression schemes will also want to have options in the future, e.g. specifying the compression level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, just renamed here e5b6bd4

# where each chunk is:
#
# [4 bytes, big-endian] compressed chunk size (the ASDF envelope)
# [4 bytes, little-endian] uncompressed chunk size (lz4.block store_size=True prefix)
# [N bytes] raw LZ4 block payload
#
# lz4.block.compress() defaults to store_size=True, which prepends the
# uncompressed size as a little-endian uint32. LZ4BlockCodec expects only
# the raw block, so both the outer BE envelope and the inner LE prefix must
# be stripped, with the LE value used as the uncompressed_size hint.

out = UInt8[]
pos = 1

while pos <= length(data)
# Outer ASDF envelope: big-endian compressed chunk size
compressed_chunk_size = Int(big2native_U32(@view data[pos:pos+3]))
pos += 4
# Inner lz4.block store_size=True prefix: little-endian uncompressed size
uncompressed_chunk_size = Int(little2native_U32(@view data[pos:pos+3]))
pos += 4
# Raw LZ4 block payload (compressed_chunk_size includes the 4-byte LE prefix)
payload_len = compressed_chunk_size - 4
payload = @view data[pos:pos+payload_len-1]
pos += payload_len
append!(out, decode(LZ4BlockCodec(), payload; max_size = uncompressed_chunk_size, size_hint = uncompressed_chunk_size))
end

return out
end
end

################################################################################

"""
Expand Down Expand Up @@ -511,11 +554,25 @@ end

################################################################################

asdf_constructors = copy(YAML.default_yaml_constructors)
asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"]
asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"]
function load_file(filename::AbstractString; extensions = false, validate_checksum = true)
asdf_constructors = copy(YAML.default_yaml_constructors)
asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"]
asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"]
asdf_constructors["tag:stsci.edu:asdf/core/extension_metadata-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"]

if extensions
# Use fallbacks for now
asdf_constructors[nothing] = (constructor, node) -> begin
if node isa YAML.MappingNode
return YAML.construct_mapping(constructor, node)
elseif node isa YAML.SequenceNode
return YAML.construct_sequence(constructor, node)
else
return YAML.construct_scalar(constructor, node)
end
end
end

function load_file(filename::AbstractString)
io = open(filename, "r")
lazy_block_headers = LazyBlockHeaders()
construct_yaml_ndarray = make_construct_yaml_ndarray(lazy_block_headers)
Expand All @@ -524,12 +581,13 @@ function load_file(filename::AbstractString)

asdf_constructors′ = copy(asdf_constructors)
asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.0.0"] = construct_yaml_ndarray
asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.1.0"] = construct_yaml_ndarray
asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-chunk-1.0.0"] = construct_yaml_ndarray_chunk
asdf_constructors′["tag:stsci.edu:asdf/core/chunked-ndarray-1.0.0"] = construct_yaml_chunked_ndarray

metadata = YAML.load(io, asdf_constructors′)
# lazy_block_headers.block_headers = find_all_blocks(io, position(io))
lazy_block_headers.block_headers = find_all_blocks(io)
lazy_block_headers.block_headers = find_all_blocks(io; validate_checksum)
return ASDFFile(filename, metadata, lazy_block_headers)
end

Expand All @@ -553,9 +611,10 @@ struct NDArrayWrapper
array::AbstractArray
compression::Compression
inline::Bool
lz4_layout::Symbol
end
function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false)
return NDArrayWrapper(array, compression, inline)
function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false, lz4_layout::Symbol=:block)
return NDArrayWrapper(array, compression, inline, lz4_layout)
end
Base.getindex(val::NDArrayWrapper) = val.array

Expand Down Expand Up @@ -604,6 +663,40 @@ function YAML._print(io::IO, val::NDArrayWrapper, level::Int=0, ignore_level::Bo
YAML._print(io, ndarray, level, ignore_level)
end

function encode_Lz4_block(input::AbstractVector{UInt8}; chunk_size::Int = 1024 * 1024 * 8)
out = UInt8[]
offset = 1
while offset <= length(input)
chunk_end = min(offset + chunk_size - 1, length(input))
chunk = @view input[offset:chunk_end]

# Compress the raw chunk with LZ4 block codec
# LZ4BlockEncodeOptions does NOT prepend the uncompressed size,
# so we must prepend the LE uint32 ourselves to match Python's
# lz4.block.compress(store_size=True) behaviour.
compressed_payload = encode(LZ4BlockEncodeOptions(), chunk)
uncompressed_size = UInt32(length(chunk))
compressed_chunk_size = UInt32(4 + length(compressed_payload)) # LE prefix + raw payload

# Outer ASDF envelope: big-endian compressed chunk size (includes the 4-byte LE prefix)
append!(out, native2big_U32(compressed_chunk_size))

# Inner lz4.block store_size=True prefix: little-endian uncompressed size
append!(out, [
(uncompressed_size >>> 0x00) & 0xff,
(uncompressed_size >>> 0x08) & 0xff,
(uncompressed_size >>> 0x10) & 0xff,
(uncompressed_size >>> 0x18) & 0xff,
])

# Raw LZ4 block payload
append!(out, compressed_payload)
offset = chunk_end + 1
end

return out
end

function write_file(filename::AbstractString, document::Dict{Any,Any})
# Set up block descriptors
global blocks
Expand Down Expand Up @@ -672,12 +765,15 @@ function write_file(filename::AbstractString, document::Dict{Any,Any})
# TODO: Don't copy input
input = input isa Vector ? input : Vector(input)
data = transcode(XzCompressor, input)
elseif array.compression == C_Lz4 && array.lz4_layout == :block
data = encode_Lz4_block(input)
#data = encode(LZ4BlockEncodeOptions(), input) # Not compatible with Python asdf
else
if array.compression == C_Blosc
encode_options = BloscEncodeOptions(; clevel=9, doshuffle=2, typesize=sizeof(eltype(array.array)), compressor="zstd")
elseif array.compression == C_Bzip2
encode_options = BZ2EncodeOptions(; blockSize100k=9)
elseif array.compression == C_Lz4
elseif array.compression == C_Lz4 && array.lz4_layout == :frame
encode_options = LZ4FrameEncodeOptions(; compressionLevel=12, blockSizeID=7)
elseif array.compression == C_Zlib
encode_options = ZlibEncodeOptions(; level=9)
Expand Down
11 changes: 6 additions & 5 deletions test/test-write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
"element1" => ASDF.NDArrayWrapper(array; compression=ASDF.C_None),
"element2" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Blosc),
"element3" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Bzip2),
"element4" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4),
"element5" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Xz),
"element6" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zlib),
"element7" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zstd),
"element4" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4, lz4_layout=:block),
"element5" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4, lz4_layout=:frame),
"element6" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Xz),
"element7" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zlib),
"element8" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zstd),
),
)
ASDF.write_file(filename, doc)
Expand All @@ -33,7 +34,7 @@
@test size(data2′) == size(data2)
@test data2′ == data2

for n in 1:7
for n in 1:8
element = doc["group"]["element$n"][]
element′ = doc′.metadata["group"]["element$n"][]
@test eltype(element′) == eltype(element)
Expand Down
Loading