Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Readme.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# netchdf
_last updated: 7/9/2025_
_last updated: 7/13/2025_

This is a rewrite in Kotlin of parts of the devcdm and netcdf-java libraries.

Expand Down Expand Up @@ -73,7 +73,7 @@
contents of the file.

Currently, the Netcdf-4 and HDF5 libraries are not thread safe, not even for read-only applications.
This is a serious limitation for high performance, scalable applications, and it is disappointing that it hasnt been fixed.

Check failure on line 76 in Readme.md

View workflow job for this annotation

GitHub Actions / Check for spelling errors

hasnt ==> hasn't
See [Toward Multi-Threaded Concurrency in HDF5](https://www.hdfgroup.org/wp-content/uploads/2022/05/Toward-MT-HDF5.pdf),
and [RFC:Multi-Thread HDF5](https://support.hdfgroup.org/releases/hdf5/documentation/rfc/RFC_multi_thread.pdf) for more information.

Expand Down
5 changes: 5 additions & 0 deletions cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
dependencies {
api(project(":core"))

implementation(libs.lzf)
implementation(libs.lz4)
implementation(libs.kotlinx.cli)
implementation(libs.oshai.logging)
implementation(libs.logback.classic)

testImplementation(kotlin("test"))
testImplementation(libs.junit.jupiter.params)
}

kotlin {
Expand Down
159 changes: 159 additions & 0 deletions cli/src/main/kotlin/com/sunya/netchdf/hdf5/BitShuffleFilter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.sunya.netchdf.hdf5

import net.jpountz.lz4.LZ4Factory

// seems to handle LZ4_COMPRESSION combined with bit shuffle
class BitShuffleFilter : H5filterIF {
override fun id() = 32008
override fun name() = "bitshuffle"
val lz4Decompressor = LZ4Factory.fastestJavaInstance().safeDecompressor()

override fun apply(encodedData: ByteArray, clientValues: IntArray): ByteArray {
val blockSize = if (clientValues[3] == 0) getDefaultBlockSize(clientValues[2]) else clientValues[3]
val blockSizeBytes = blockSize * clientValues[2]

when (clientValues[4]) {
NO_COMPRESSION -> return noCompression(encodedData, clientValues, blockSizeBytes)
LZ4_COMPRESSION -> return lz4Compression(encodedData, clientValues)
ZSTD_COMPRESSION -> throw RuntimeException("Bitshuffle zstd not implemented")
else -> throw RuntimeException("Unknown compression type: " + clientValues[4])
}
}

private fun noCompression(encodedData: ByteArray, clientValues: IntArray, blockSizeBytes: Int): ByteArray {
val nblocks = encodedData.size / blockSizeBytes
val unshuffled = ByteArray(encodedData.size)
for (i in 0..< nblocks) {
val blockData = ByteArray(blockSizeBytes)
System.arraycopy(encodedData, i * blockSizeBytes, blockData, 0, blockSizeBytes)
val unshuffledBlock = ByteArray(blockSizeBytes)
unshuffle(blockData, clientValues[2], unshuffledBlock)
System.arraycopy(unshuffledBlock, 0, unshuffled, i * blockSizeBytes, blockSizeBytes)
}
if (nblocks * blockSizeBytes < encodedData.size) {
val finalBlockSize = encodedData.size - nblocks * blockSizeBytes
val blockData = ByteArray(finalBlockSize)
System.arraycopy(encodedData, nblocks * blockSizeBytes, blockData, 0, finalBlockSize)
val unshuffledBlock = ByteArray(finalBlockSize)
unshuffle(blockData, clientValues[2], unshuffledBlock)
System.arraycopy(unshuffledBlock, 0, unshuffled, nblocks * blockSizeBytes, finalBlockSize)
}
return unshuffled
}

private fun lz4Compression(encodedData: ByteArray, clientValues: IntArray): ByteArray {
val totalDecompressedSize = Math.toIntExact(makeLongFromBEBytes(encodedData, 0, 8))
val decompressed = ByteArray(totalDecompressedSize)

val decompressedBlockSize: Int = makeIntFromBEBytes(encodedData, 8, 4)
val nblocks = if (decompressedBlockSize > totalDecompressedSize) 1 else totalDecompressedSize / decompressedBlockSize
val decompressedBuffer = ByteArray(decompressedBlockSize)

var srcOffset = 12
var dstOffset = 0

repeat (nblocks) {
val compressedBlockLength = makeIntFromBEBytes(encodedData, srcOffset, 4)
srcOffset += 4

val decompressedBytes = lz4Decompressor.decompress(encodedData, srcOffset, compressedBlockLength, decompressedBuffer, 0)
unshuffle(decompressedBuffer, decompressedBytes, decompressed, dstOffset, clientValues[2])

srcOffset += compressedBlockLength
dstOffset += decompressedBlockSize
}

if (dstOffset < totalDecompressedSize) { // copy remaining into destination
encodedData.copyInto(decompressed, destinationOffset = dstOffset, startIndex = srcOffset)
}

return decompressed
}

protected fun unshuffle(shuffledBuffer: ByteArray, elementSize: Int, unshuffledBuffer: ByteArray) {
unshuffle(shuffledBuffer, shuffledBuffer.size, unshuffledBuffer, 0, elementSize)
}

protected fun unshuffle(
shuffledBuffer: ByteArray,
shuffledLength: Int,
unshuffledBuffer: ByteArray,
unshuffledOffset: Int,
elementSize: Int
) {
val elements = shuffledLength / elementSize
val elementSizeBits = elementSize * 8
val unshuffledOffsetBits = unshuffledOffset * 8

if (elements < 8) {
// https://github.com/xerial/snappy-java/issues/296#issuecomment-964469607
System.arraycopy(shuffledBuffer, 0, unshuffledBuffer, 0, shuffledLength)
return
}

val elementsToShuffle = elements - elements % 8
val elementsToCopy = elements - elementsToShuffle

var pos = 0
for (i in 0..<elementSizeBits) {
for (j in 0..<elementsToShuffle) {
val bit: Boolean = getBit(shuffledBuffer, pos)
if (bit) setBit(unshuffledBuffer, unshuffledOffsetBits + j * elementSizeBits + i, true)
pos++ // step through the input array
}
}

System.arraycopy(
shuffledBuffer,
elementsToShuffle * elementSize,
unshuffledBuffer,
elementsToShuffle * elementSize,
elementsToCopy * elementSize
)
}

// https://github.com/kiyo-masui/bitshuffle/blob/master/src/bitshuffle_core.c#L1830
// See method bshuf_default_block_size
private fun getDefaultBlockSize(elementSize: Int): Int {
var defaultBlockSize = BSHUF_TARGET_BLOCK_SIZE_B / elementSize
// Ensure it is a required multiple.
defaultBlockSize = (defaultBlockSize / BSHUF_BLOCKED_MULT) * BSHUF_BLOCKED_MULT
return Integer.max(defaultBlockSize, BSHUF_MIN_RECOMMEND_BLOCK)
}

companion object {
// Constants see https://github.com/kiyo-masui/bitshuffle/blob/master/src/bitshuffle_internals.h#L32
private const val BSHUF_MIN_RECOMMEND_BLOCK = 128
private const val BSHUF_BLOCKED_MULT = 8 // Block sizes must be multiple of this.
private const val BSHUF_TARGET_BLOCK_SIZE_B = 8192

const val NO_COMPRESSION: Int = 0

// https://github.com/kiyo-masui/bitshuffle/blob/master/src/bshuf_h5filter.h#L46
const val LZ4_COMPRESSION: Int = 2
const val ZSTD_COMPRESSION: Int = 3
}
}

fun getBit(bytes: ByteArray, bit: Int): Boolean {
val byteIndex = bit / 8
val bitInByte = bit % 8
return ((bytes[byteIndex].toInt() shr bitInByte) and 1) == 1
}

// See https://stackoverflow.com/a/4674035
fun setBit(bytes: ByteArray, bit: Int, value: Boolean) {
require(!(bit < 0 || bit >= bytes.size * 8)) { "bit index out of range. index=" + bit }
val byteIndex = bit / 8
val bitInByte = bit % 8

// could pregenerate = 1, 2, 4, .. 128
val bitset = 1 shl bitInByte
val bitclear = (1 shl bitInByte).inv()

if (value) {
bytes[byteIndex] = (bytes[byteIndex].toInt() or bitset).toByte()
} else {
bytes[byteIndex] = (bytes[byteIndex].toInt() and bitclear).toByte()
}
}
44 changes: 44 additions & 0 deletions cli/src/main/kotlin/com/sunya/netchdf/hdf5/Lz4Filter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.sunya.netchdf.hdf5

import net.jpountz.lz4.LZ4Factory
import kotlin.math.min

class Lz4Filter : H5filterIF {
override fun id() = 32004
override fun name() = "lz4"

override fun apply(encodedData: ByteArray, clientValues: IntArray): ByteArray {
val lz4Decompressor = LZ4Factory.fastestJavaInstance().fastDecompressor()

// https://docs.hdfgroup.org/archive/support/services/filters/HDF5_LZ4.pdf
// bytearray cant be larger than 2^32
// TODO Big Endian ?
val totalDecompressedSizeLong = makeLongFromBEBytes(encodedData, 0, 8)
val totalDecompressedSize = Math.toIntExact(totalDecompressedSizeLong)
val decompressed = ByteArray(totalDecompressedSize)

val decompressedBlockSize: Int = makeIntFromBEBytes(encodedData, 8, 4)
val nblocks = (totalDecompressedSize + decompressedBlockSize - 1) / decompressedBlockSize

var srcOffset = 12
var dstOffset = 0

repeat (nblocks) {
val compressedBlockSize = makeIntFromBEBytes(encodedData, srcOffset, 4)
srcOffset += 4
val destBlockSize = min(decompressed.size - dstOffset, decompressedBlockSize)

if (compressedBlockSize == destBlockSize) {
encodedData.copyInto(decompressed, destinationOffset = dstOffset, startIndex = srcOffset, endIndex = srcOffset + destBlockSize)
} else {
// public abstract int decompress(byte[] src, int srcOff, byte[] dest, int destOff, int destLen);
lz4Decompressor.decompress(encodedData, srcOffset, decompressed, dstOffset, destBlockSize)
}
srcOffset += compressedBlockSize
dstOffset += decompressedBlockSize
}

return decompressed
}

}
28 changes: 28 additions & 0 deletions cli/src/main/kotlin/com/sunya/netchdf/hdf5/LzfFilter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.sunya.netchdf.hdf5

import com.ning.compress.lzf.LZFException
import com.ning.compress.lzf.util.ChunkDecoderFactory

class LzfFilter() : H5filterIF {
override fun id() = 32000
override fun name() = "lzf"

override fun apply(encodedData: ByteArray, clientValues: IntArray): ByteArray {
val compressedLength = encodedData.size
val uncompressedLength = clientValues[2]

if (compressedLength == uncompressedLength) {
return encodedData
}

val output = ByteArray(uncompressedLength)

try {
ChunkDecoderFactory.safeInstance().decodeChunk(encodedData, 0, output, 0, uncompressedLength)
} catch (e: LZFException) {
throw RuntimeException("Inflating failed", e)
}
return output
}

}
82 changes: 82 additions & 0 deletions cli/src/test/kotlin/com/sunya/netchdf/hdf5/TestFilters.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.sunya.netchdf.hdf5

import com.sunya.cdm.api.computeSize
import kotlin.test.Test
import com.sunya.netchdf.openNetchdfFile

class TestFilters {
init {
FilterRegistrar.registerFilter(Lz4Filter())
FilterRegistrar.registerFilter(LzfFilter())
FilterRegistrar.registerFilter(BitShuffleFilter())
}

// hdf5 test_compressed_chunked_datasets_earliest.hdf5 {
//
// group: float {
// variables:
// float float32(7, 5) ;
// float float32lzf(7, 5) ;
// double float64(7, 5) ;
// double float64lzf(7, 5) ;
// }
//
// group: int {
// variables:
// short int16(7, 5) ;
// short int16lzf(7, 5) ;
// int int32(7, 5) ;
// int int32lzf(7, 5) ;
// byte int8(7, 5) ;
// byte int8lzf(7, 5) ;
// }
//}
// read float32
// read float32lzf
// read float64
// read float64lzf
//failed to read /float/float64lzf, java.lang.RuntimeException: Unimplemented filter type= lzf name = lzf
@Test
fun testLzfFilter() { // 37
val filename = "/home/stormy/dev/github/netcdf/jhdf/jhdf/src/test/resources/hdf5/test_compressed_chunked_datasets_earliest.hdf5"
println(filename)
readNetchdfData(filename, true, true)
}

// HDF5 "/home/stormy/dev/github/netcdf/jhdf/jhdf/src/test/resources/hdf5/lz4_datasets.hdf5" {
//GROUP "/" {
// DATASET "float32_bs0" {
// DATATYPE H5T_IEEE_F32LE
// DATASPACE SIMPLE { ( 20 ) / ( 20 ) }
// }
// ..
@Test
fun testLz4Filter() { // 37
val filename = "/home/stormy/dev/github/netcdf/jhdf/jhdf/src/test/resources/hdf5/lz4_datasets.hdf5"
println(filename)
readNetchdfData(filename, true, true)
}

@Test
fun testBitShuffleFilter() { // 37
val filename = "/home/stormy/dev/github/netcdf/jhdf/jhdf/src/test/resources/hdf5/bitshuffle_datasets.hdf5"
println(filename)
readNetchdfData(filename, true, true)
}

fun readNetchdfData(filename: String, showCdl : Boolean = false, showData : Boolean = false) {
openNetchdfFile(filename).use { myfile ->
if (myfile == null) {
println("*** not a netchdf file = $filename")
return
}
println("--- ${myfile.type()} $filename ")
myfile.rootGroup().allVariables().forEach { myvar ->
val mydata = myfile.readArrayData(myvar)
if (showCdl) println(" ${myvar.datatype} ${myvar.fullname()}${myvar.shape.contentToString()} = " +
"${mydata.shape.contentToString()} ${mydata.shape.computeSize()} elems" )
if (showData) println(mydata)
}
}
}
}
Loading
Loading