Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ interface Netchdf : AutoCloseable {
fun <T> readChunksConcurrent(v2: Variable<T>,
lamda : (ArraySection<*>) -> Unit,
done : () -> Unit,
wantSection: SectionPartial? = null,
nthreads: Int? = null) {
TODO()
}
Expand Down
1 change: 1 addition & 0 deletions core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sunya.cdm.api

/** A filled section of multidimensional array indices, plus the variable shape. */
// TODO IndexSpace mo betta?
data class Section(val ranges : List<LongProgression>, val varShape : LongArray) {
val rank = ranges.size
val shape : LongArray // or IntArray ??
Expand Down
7 changes: 6 additions & 1 deletion core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sunya.cdm.layout

import com.sunya.cdm.api.computeSize
import kotlin.math.max
import kotlin.math.min

Expand All @@ -20,6 +21,7 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) {
val tileShape : LongArray // overall shape of the dataset's tile space
val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape
val tileStrider : LongArray // for computing tile index
val nelems: Int

init {
// convenient to allow tileSize to have (an) extra dimension at the end
Expand All @@ -30,10 +32,13 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) {
for (i in 0 until rank) {
this.indexShape[i] = max(varShape[i], chunk[i])
}

this.tileShape = LongArray(rank)
for (i in 0 until rank) {
tileShape[i] = (this.indexShape[i] + chunk[i] - 1) / chunk[i]
}
nelems = tileShape.computeSize().toInt()

tileStrider = LongArray(rank)
var accumStride = 1L
for (k in rank - 1 downTo 0) {
Expand Down Expand Up @@ -88,7 +93,7 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) {
tile[k] = rem / tileStrider[k]
rem = rem - (tile[k] * tileStrider[k])
}
print("tile $order = ${tile.contentToString()}")
// print("tile $order = ${tile.contentToString()}")

// convert to index
return index(tile)
Expand Down
14 changes: 5 additions & 9 deletions core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal class BTree1(

// here both internal and leaf are the same structure
// Btree nodes Level 1A1 - Version 1 B-trees
inner class Node(val address: Long, val parent: Node?) : BTreeNodeIF {
inner class Node(val address: Long, val parent: Node?) {
val level: Int
val nentries: Int
private val leftAddress: Long
Expand Down Expand Up @@ -93,11 +93,11 @@ internal class BTree1(
// but most nodes will point to less than that number of children""
}

override fun isLeaf() = (level == 0)
fun isLeaf() = (level == 0)

override fun nentries() = nentries
fun nentries() = nentries

override fun dataChunkEntryAt(idx: Int) = dataChunkEntries[idx]
fun dataChunkEntryAt(idx: Int) = dataChunkEntries[idx]
}

/** @param key the byte offset into the local heap for the first object name in the subtree which that key describes. */
Expand Down Expand Up @@ -129,14 +129,10 @@ internal class BTree1(
override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" +
", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx"
}
}

interface BTreeNodeIF {
fun isLeaf(): Boolean
fun nentries(): Int
fun dataChunkEntryAt(idx: Int) : DataChunkIF // only if isLeaf
}


interface DataChunkIF {
fun childAddress(): Long
fun offsets(): LongArray
Expand Down
117 changes: 66 additions & 51 deletions core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,58 +10,65 @@ import kotlin.collections.mutableListOf
/** a BTree1 that uses OpenFileExtended and tracks its own tiling. */
internal class BTree1data(
val raf: OpenFileExtended,
val rootNodeAddress: Long,
rootNodeAddress: Long,
varShape: LongArray,
chunkShape: LongArray,
) {
val tiling = Tiling(varShape, chunkShape)
val ndimStorage = chunkShape.size
val rootNode: BTreeNode

fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null)
init {
rootNode = BTreeNode(rootNodeAddress, null)
}

fun asSequence(): Sequence<Pair<Long, DataChunk>> = sequence {
repeat( tiling.nelems) {
//val startingIndex = tiling.orderToIndex(it.toLong())
//val indexSpace = IndexSpace(startingIndex, tiling.chunk)
yield(Pair(it.toLong(), findDataChunk(it) ?: missingDataChunk(it)))
}
}

internal fun findDataChunk(order: Int): DataChunk? {
return rootNode.findDataChunk(order)
}

// here both internal and leaf are the same structure
// Btree nodes Level 1A1 - Version 1 B-trees
inner class BTreeNode(val address: Long, val parent: BTreeNode?) {
val level: Int
val nentries: Int
private val leftAddress: Long
private val rightAddress: Long
var level: Int = 0
var nentries: Int = 0

val keys = mutableListOf<LongArray>()
val values = mutableListOf<DataChunkIF>()
val keyValues = mutableListOf<Pair<Int, DataChunk>>() // tile order to DataChunk
val children = mutableListOf<BTreeNode>()

var lastOrder : Int = 0

init {
val state = OpenFileState(raf.getFileOffset(address), false)
val magic: String = raf.readString(state, 4)
check(magic == "TREE") { "DataBTree doesnt start with TREE" }

val type: Int = raf.readByte(state).toInt()
check(type == 1) { "DataBTree must be type 1" }

level = raf.readByte(state).toInt() // leaf nodes are level 0
nentries = raf.readShort(state).toInt() // number of children to which this node points
leftAddress = raf.readOffset(state)
rightAddress = raf.readOffset(state)

if (nentries == 0) {
val chunkSize = raf.readInt(state)
val filterMask = raf.readInt(state)
val inner = LongArray(ndimStorage) { j -> raf.readLong(state) }
val key = DataChunkKey(chunkSize, filterMask, inner)
val childPointer = raf.readAddress(state)
keys.add(inner)
values.add(DataChunkEntry1(this, key, childPointer))
} else {
if (address > 0) {
val state = OpenFileState(raf.getFileOffset(address), false)
val magic: String = raf.readString(state, 4)
check(magic == "TREE") { "DataBTree doesnt start with TREE" }

val type: Int = raf.readByte(state).toInt()
check(type == 1) { "DataBTree must be type 1" }

level = raf.readByte(state).toInt() // leaf nodes are level 0
nentries = raf.readShort(state).toInt() // number of children to which this node points
val leftAddress = raf.readOffset(state)
val rightAddress = raf.readOffset(state)

repeat(nentries) {
val chunkSize = raf.readInt(state)
val filterMask = raf.readInt(state)
val inner = LongArray(ndimStorage) { j -> raf.readLong(state) }
val key = DataChunkKey(chunkSize, filterMask, inner)
val order = tiling.order(inner).toInt()
val key = DataChunkKey(order, chunkSize, filterMask)
val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset
if (level == 0) {
keys.add(inner)
values.add(DataChunkEntry1( this, key, childPointer))
keyValues.add(Pair(order, DataChunk(key, childPointer)))
lastOrder = order
} else {
children.add(BTreeNode(childPointer, this))
}
Expand All @@ -72,44 +79,52 @@ internal class BTree1data(
// but most nodes will point to less than that number of children""
}

// return only the leaf nodes, in any order
fun asSequence(): Sequence<Pair<Long, DataChunkIF>> = sequence {
// this does not have missing data. Use iterator on the Btree1data class
// return only the leaf nodes, in depth-first order
fun asSequence(): Sequence<Pair<Int, DataChunkIF>> = sequence {
// Handle child nodes recursively (in-order traversal)
if (children.isNotEmpty()) {
children.forEachIndexed { index, childNode ->
yieldAll(childNode.asSequence()) // Yield all elements from the child
}
} else { // If it's a leaf node (no children)
keys.forEachIndexed { index, key ->
yield(tiling.order(key) to values[index]) // Yield all key-value pairs
}
keyValues.forEach { yield(it) }
}
}
}

data class DataChunkKey(val chunkSize: Int, val filterMask : Int, val offsets: LongArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is DataChunkKey) return false
if (!offsets.contentEquals(other.offsets)) return false
return true
fun findDataChunk(wantOrder: Int): DataChunk? {
if (children.isNotEmpty()) { // search tree; assumes that chunks are ordered
children.forEach { childNode ->
if (wantOrder <= childNode.lastOrder)
return childNode.findDataChunk(wantOrder)
}
} else { // If it's a leaf node (no children)
val kv = keyValues.find { it.first == wantOrder }
return kv?.second
}
return null
}

override fun hashCode(): Int {
return offsets.contentHashCode()
}
}

data class DataChunkKey(val order: Int, val chunkSize: Int, val filterMask : Int)

// childAddress = data chunk (level 1) else a child node
data class DataChunkEntry1(val parent : BTreeNode, val key : DataChunkKey, val childAddress : Long) : DataChunkIF {
inner class DataChunk(val key : DataChunkKey, val childAddress : Long) : DataChunkIF {
override fun childAddress() = childAddress
override fun offsets() = key.offsets
override fun offsets() = tiling.orderToIndex(key.order.toLong())
override fun isMissing() = (childAddress <= 0L) // may be 0 or -1
override fun chunkSize() = key.chunkSize
override fun filterMask() = key.filterMask

override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" +
", tile= ${tiling.tile(key.offsets).contentToString()}"
override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${offsets().contentToString()}" +
", tile= ${tiling.tile(offsets() ).contentToString()}"

fun show() = show(tiling)
}

fun missingDataChunk(order: Int) : DataChunk {
return DataChunk(DataChunkKey(order, 0, 0), -1L)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ package com.sunya.netchdf.hdf5

import com.sunya.cdm.api.ArraySection
import com.sunya.cdm.api.Datatype
import com.sunya.cdm.api.Section
import com.sunya.cdm.api.SectionPartial
import com.sunya.cdm.api.Variable
import com.sunya.cdm.api.computeSize
import com.sunya.cdm.api.toIntArray
import com.sunya.cdm.api.toLongArray
import com.sunya.cdm.iosp.OpenFileState
import com.sunya.cdm.layout.Chunker
import com.sunya.cdm.layout.IndexSpace
import com.sunya.cdm.layout.Tiling
import com.sunya.cdm.layout.transferMissingNelems
Expand All @@ -23,18 +26,25 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield

class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>, wantSection: SectionPartial?) {
val h5 = h5file.header
val rafext: OpenFileExtended = h5.openFileExtended()
internal val bTree: BTree1data

val varShape = v2.shape
val chunkShape: IntArray
val tiling: Tiling
val nchunks: Long
internal val rootNode: BTree1data.BTreeNode
val rootAddress: Long
// internal val rootNode: BTree1data.BTreeNode
// val rootAddress: Long
val wantSpace: IndexSpace
val allData : Boolean

init {
val useSection = SectionPartial.fill(wantSection, v2.shape)
wantSpace = IndexSpace(useSection)
allData = (wantSection == null) || (useSection == Section(varShape))

require(v2.spObject is DataContainerVariable)
val vinfo = v2.spObject
require(vinfo.mdl is DataLayoutBTreeVer1)
Expand All @@ -44,17 +54,16 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
nchunks = tiling.tileShape.computeSize()

// its not obvious you actually need a seperate raf
val bTreeExt = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray())
rootNode = bTreeExt.rootNode()
rootAddress = mdl.btreeAddress
bTree = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray())
// rootAddress = mdl.btreeAddress
}

// TODO section: SectionPartial
fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit, done: () -> Unit) {

runBlocking {
val jobs = mutableListOf<Job>()
val workers = mutableListOf<Worker>()
val chunkProducer = produceChunks(rootNode.asSequence())
val chunkProducer = produceChunks(bTree.asSequence())
repeat(nthreads) {
val worker = Worker()
jobs.add( launchJob(worker, chunkProducer, lamda))
Expand Down Expand Up @@ -95,9 +104,6 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
private inner class Worker() {
val rafext: OpenFileExtended = h5.openFileExtended() // here we need a seperate raf

// a thread-safe accessor of the btree
// private val btree1 = BTree1data(rafext, rootAddress, varShape, chunkShape.toLongArray())

val vinfo: DataContainerVariable = v2.spObject as DataContainerVariable
val h5type: H5TypeInfo
val elemSize: Int
Expand All @@ -116,31 +122,41 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) {
}

fun work(dataChunk : DataChunkIF) : ArraySection<*>? {
// TODO check if intersect with wantSection before reading in data

val dataSpace = IndexSpace(v2.rank, dataChunk.offsets(), vinfo.storageDims)
if (!allData && !wantSpace.intersects(dataSpace)) {
return null
}
val useEntireChunk = wantSpace.contains(dataSpace)
val intersectSpace = if (useEntireChunk) dataSpace else wantSpace.intersect(dataSpace)

val ba = if (dataChunk.isMissing()) {
if (debugChunking) println(" missing ${dataChunk.show(tiling)}")
val sizeBytes = dataSpace.totalElements * elemSize
val sizeBytes = intersectSpace.totalElements * elemSize
val bbmissing = ByteArray(sizeBytes.toInt())
transferMissingNelems(vinfo.fillValue, dataSpace.totalElements.toInt(), bbmissing, 0)
if (debugChunking) println(" missing transfer ${dataSpace.totalElements} fillValue=${vinfo.fillValue}")
transferMissingNelems(vinfo.fillValue, intersectSpace.totalElements.toInt(), bbmissing, 0)
if (debugChunking) println(" missing transfer ${intersectSpace.totalElements} fillValue=${vinfo.fillValue}")
bbmissing
} else {
if (debugChunking) println(" chunkIterator=${dataChunk.show(tiling)}")
state.pos = dataChunk.childAddress()
val rawdata = rafext.readByteArray(state, dataChunk.chunkSize())
if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!)
val filteredData = if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!)
if (useEntireChunk) {
filteredData
} else {
val chunker = Chunker(dataSpace, wantSpace) // each DataChunkEntry has its own Chunker iteration
chunker.copyOut(filteredData, 0, elemSize, intersectSpace.totalElements.toInt())
}
}

val array = if (h5type.datatype5 == Datatype5.Vlen) {
// internal fun <T> H5builder.processVlenIntoArray(h5type: H5TypeInfo, shape: IntArray, ba: ByteArray, nelems: Int, elemSize : Int): ArrayTyped<T> {
h5.processVlenIntoArray(h5type, dataSpace.shape.toIntArray(), ba, dataSpace.totalElements.toInt(), elemSize)
h5.processVlenIntoArray(h5type, intersectSpace.shape.toIntArray(), ba, intersectSpace.totalElements.toInt(), elemSize)
} else {
h5.processDataIntoArray(ba, h5type.isBE, datatype, dataSpace.shape.toIntArray(), h5type, elemSize)
h5.processDataIntoArray(ba, h5type.isBE, datatype, intersectSpace.shape.toIntArray(), h5type, elemSize)
}

return ArraySection(array, dataSpace.section(v2.shape))
return ArraySection(array, intersectSpace.section(v2.shape))
}
}
val debugChunking = false
Expand Down
Loading