diff --git a/src/ASDF.jl b/src/ASDF.jl index 1fe903e..7743a07 100644 --- a/src/ASDF.jl +++ b/src/ASDF.jl @@ -2,7 +2,7 @@ module ASDF using ChunkCodecLibBlosc: BloscCodec, BloscEncodeOptions using ChunkCodecLibBzip2: BZ2Codec, BZ2EncodeOptions -using ChunkCodecLibLz4: LZ4FrameCodec, LZ4FrameEncodeOptions +using ChunkCodecLibLz4: LZ4BlockCodec, LZ4FrameCodec, LZ4BlockEncodeOptions, LZ4FrameEncodeOptions using ChunkCodecLibZlib: ZlibCodec, ZlibEncodeOptions using ChunkCodecLibZstd: ZstdCodec, ZstdEncodeOptions, decode, encode using CodecXz: XzCompressor, XzDecompressor @@ -51,6 +51,7 @@ struct BlockHeader used_size::UInt64 data_size::UInt64 checksum::AbstractVector{UInt8} # length 16 + validate_checksum::Bool end mutable struct LazyBlockHeaders @@ -88,6 +89,8 @@ big2native_U8(bytes::AbstractVector{UInt8}) = bytes[1] big2native_U16(bytes::AbstractVector{UInt8}) = (UInt16(bytes[1]) << 8) | bytes[2] big2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(big2native_U16(@view bytes[1:2])) << 16) | big2native_U16(@view bytes[3:4]) big2native_U64(bytes::AbstractVector{UInt8}) = (UInt64(big2native_U32(@view bytes[1:4])) << 32) | big2native_U32(@view bytes[5:8]) +# Read a 4-byte little-endian UInt32 (used for lz4.block store_size prefix) +little2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(bytes[1])) | (UInt32(bytes[2]) << 8) | (UInt32(bytes[3]) << 16) | (UInt32(bytes[4]) << 24) native2big_U8(val::UInt8) = UInt8[val] native2big_U16(val::UInt16) = UInt8[(val >>> 0x08) & 0xff, (val >>> 0x00) & 0xff] @@ -109,7 +112,7 @@ native2big_U16(val::Integer) = native2big_U16(UInt16(val)) native2big_U32(val::Integer) = native2big_U32(UInt32(val)) native2big_U64(val::Integer) = native2big_U64(UInt64(val)) -function read_block_header(io::IO, position::Int64) +function read_block_header(io::IO, position::Int64; validate_checksum::Bool) # Read block header max_header_size = 6 + 48 header = Array{UInt8}(undef, max_header_size) @@ -138,14 +141,14 @@ function read_block_header(io::IO, position::Int64) # TODO: Better error message @assert allocated_size >= used_size - return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum) + return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum, validate_checksum) end -function find_all_blocks(io::IO, pos::Int64=Int64(0)) +function find_all_blocks(io::IO, pos::Int64=Int64(0); validate_checksum::Bool) headers = BlockHeader[] pos = find_next_block(io, pos) while pos !== nothing - header = read_block_header(io, pos) + header = read_block_header(io, pos; validate_checksum) push!(headers, header) pos = Int64(header.position + 6 + header.header_size + header.allocated_size) pos = find_next_block(io, pos) @@ -162,7 +165,7 @@ function read_block(header::BlockHeader) @assert nb == length(data) # Check checksum - if any(header.checksum != 0) + if header.validate_checksum && any(!iszero, header.checksum) actual_checksum = md5(data) # TODO: Better error message @assert all(actual_checksum == header.checksum) @@ -175,13 +178,13 @@ function read_block(header::BlockHeader) # do nothing, the block is uncompressed elseif compression == C_Xz data = transcode(XzDecompressor, data) + elseif compression == C_Lz4 + data = decode_Lz4(data) else if compression == C_Blosc codec = BloscCodec() elseif compression == C_Bzip2 codec = BZ2Codec() - elseif compression == C_Lz4 - codec = LZ4FrameCodec() elseif compression == C_Zlib codec = ZlibCodec() elseif compression == C_Zstd @@ -200,6 +203,46 @@ function read_block(header::BlockHeader) return data end +function decode_Lz4(data) + if (# LZ4 Frame magic bytes 04 22 4D 18 + length(data) >= 4 && + data[1] == 0x04 && data[2] == 0x22 && + data[3] == 0x4D && data[4] == 0x18) + return decode(LZ4FrameCodec(), data) + else + # If the data was originally created from Python's ASDF, then it will be in block instead of frame layout, + # where each chunk is: + # + # [4 bytes, big-endian] compressed chunk size (the ASDF envelope) + # [4 bytes, little-endian] uncompressed chunk size (lz4.block store_size=True prefix) + # [N bytes] raw LZ4 block payload + # + # lz4.block.compress() defaults to store_size=True, which prepends the + # uncompressed size as a little-endian uint32. LZ4BlockCodec expects only + # the raw block, so both the outer BE envelope and the inner LE prefix must + # be stripped, with the LE value used as the uncompressed_size hint. + + out = UInt8[] + pos = 1 + + while pos <= length(data) + # Outer ASDF envelope: big-endian compressed chunk size + compressed_chunk_size = Int(big2native_U32(@view data[pos:pos+3])) + pos += 4 + # Inner lz4.block store_size=True prefix: little-endian uncompressed size + uncompressed_chunk_size = Int(little2native_U32(@view data[pos:pos+3])) + pos += 4 + # Raw LZ4 block payload (compressed_chunk_size includes the 4-byte LE prefix) + payload_len = compressed_chunk_size - 4 + payload = @view data[pos:pos+payload_len-1] + pos += payload_len + append!(out, decode(LZ4BlockCodec(), payload; max_size = uncompressed_chunk_size, size_hint = uncompressed_chunk_size)) + end + + return out + end +end + ################################################################################ """ @@ -511,11 +554,25 @@ end ################################################################################ -asdf_constructors = copy(YAML.default_yaml_constructors) -asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"] -asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"] +function load_file(filename::AbstractString; extensions = false, validate_checksum = true) + asdf_constructors = copy(YAML.default_yaml_constructors) + asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"] + asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"] + asdf_constructors["tag:stsci.edu:asdf/core/extension_metadata-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"] + + if extensions + # Use fallbacks for now + asdf_constructors[nothing] = (constructor, node) -> begin + if node isa YAML.MappingNode + return YAML.construct_mapping(constructor, node) + elseif node isa YAML.SequenceNode + return YAML.construct_sequence(constructor, node) + else + return YAML.construct_scalar(constructor, node) + end + end + end -function load_file(filename::AbstractString) io = open(filename, "r") lazy_block_headers = LazyBlockHeaders() construct_yaml_ndarray = make_construct_yaml_ndarray(lazy_block_headers) @@ -524,12 +581,13 @@ function load_file(filename::AbstractString) asdf_constructors′ = copy(asdf_constructors) asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.0.0"] = construct_yaml_ndarray + asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.1.0"] = construct_yaml_ndarray asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-chunk-1.0.0"] = construct_yaml_ndarray_chunk asdf_constructors′["tag:stsci.edu:asdf/core/chunked-ndarray-1.0.0"] = construct_yaml_chunked_ndarray metadata = YAML.load(io, asdf_constructors′) # lazy_block_headers.block_headers = find_all_blocks(io, position(io)) - lazy_block_headers.block_headers = find_all_blocks(io) + lazy_block_headers.block_headers = find_all_blocks(io; validate_checksum) return ASDFFile(filename, metadata, lazy_block_headers) end @@ -553,9 +611,10 @@ struct NDArrayWrapper array::AbstractArray compression::Compression inline::Bool + lz4_layout::Symbol end -function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false) - return NDArrayWrapper(array, compression, inline) +function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false, lz4_layout::Symbol=:block) + return NDArrayWrapper(array, compression, inline, lz4_layout) end Base.getindex(val::NDArrayWrapper) = val.array @@ -604,6 +663,40 @@ function YAML._print(io::IO, val::NDArrayWrapper, level::Int=0, ignore_level::Bo YAML._print(io, ndarray, level, ignore_level) end +function encode_Lz4_block(input::AbstractVector{UInt8}; chunk_size::Int = 1024 * 1024 * 8) + out = UInt8[] + offset = 1 + while offset <= length(input) + chunk_end = min(offset + chunk_size - 1, length(input)) + chunk = @view input[offset:chunk_end] + + # Compress the raw chunk with LZ4 block codec + # LZ4BlockEncodeOptions does NOT prepend the uncompressed size, + # so we must prepend the LE uint32 ourselves to match Python's + # lz4.block.compress(store_size=True) behaviour. + compressed_payload = encode(LZ4BlockEncodeOptions(), chunk) + uncompressed_size = UInt32(length(chunk)) + compressed_chunk_size = UInt32(4 + length(compressed_payload)) # LE prefix + raw payload + + # Outer ASDF envelope: big-endian compressed chunk size (includes the 4-byte LE prefix) + append!(out, native2big_U32(compressed_chunk_size)) + + # Inner lz4.block store_size=True prefix: little-endian uncompressed size + append!(out, [ + (uncompressed_size >>> 0x00) & 0xff, + (uncompressed_size >>> 0x08) & 0xff, + (uncompressed_size >>> 0x10) & 0xff, + (uncompressed_size >>> 0x18) & 0xff, + ]) + + # Raw LZ4 block payload + append!(out, compressed_payload) + offset = chunk_end + 1 + end + + return out +end + function write_file(filename::AbstractString, document::Dict{Any,Any}) # Set up block descriptors global blocks @@ -672,12 +765,15 @@ function write_file(filename::AbstractString, document::Dict{Any,Any}) # TODO: Don't copy input input = input isa Vector ? input : Vector(input) data = transcode(XzCompressor, input) + elseif array.compression == C_Lz4 && array.lz4_layout == :block + data = encode_Lz4_block(input) + #data = encode(LZ4BlockEncodeOptions(), input) # Not compatible with Python asdf else if array.compression == C_Blosc encode_options = BloscEncodeOptions(; clevel=9, doshuffle=2, typesize=sizeof(eltype(array.array)), compressor="zstd") elseif array.compression == C_Bzip2 encode_options = BZ2EncodeOptions(; blockSize100k=9) - elseif array.compression == C_Lz4 + elseif array.compression == C_Lz4 && array.lz4_layout == :frame encode_options = LZ4FrameEncodeOptions(; compressionLevel=12, blockSizeID=7) elseif array.compression == C_Zlib encode_options = ZlibEncodeOptions(; level=9) diff --git a/test/test-write.jl b/test/test-write.jl index 86b5b81..d3246b0 100644 --- a/test/test-write.jl +++ b/test/test-write.jl @@ -10,10 +10,11 @@ "element1" => ASDF.NDArrayWrapper(array; compression=ASDF.C_None), "element2" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Blosc), "element3" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Bzip2), - "element4" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4), - "element5" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Xz), - "element6" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zlib), - "element7" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zstd), + "element4" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4, lz4_layout=:block), + "element5" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4, lz4_layout=:frame), + "element6" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Xz), + "element7" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zlib), + "element8" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zstd), ), ) ASDF.write_file(filename, doc) @@ -33,7 +34,7 @@ @test size(data2′) == size(data2) @test data2′ == data2 - for n in 1:7 + for n in 1:8 element = doc["group"]["element$n"][] element′ = doc′.metadata["group"]["element$n"][] @test eltype(element′) == eltype(element)