Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,44 @@ public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] co
return mb;
}

public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) {
MatrixBlock mb = new MatrixBlock(rlen, clen, true);
mb.allocateSparseRowsBlock(false);
ByteBuffer dataBuf = ByteBuffer.wrap(data);
dataBuf.order(ByteOrder.nativeOrder());
ByteBuffer indicesBuf = ByteBuffer.wrap(indices);
indicesBuf.order(ByteOrder.nativeOrder());
ByteBuffer indptrBuf = ByteBuffer.wrap(indptr);
indptrBuf.order(ByteOrder.nativeOrder());

// Read indptr array to get row boundaries
int[] rowPtrs = new int[rlen + 1];
for(int i = 0; i <= rlen; i++) {
rowPtrs[i] = indptrBuf.getInt();
}

// Iterate through each row
for(int row = 0; row < rlen; row++) {
int startIdx = rowPtrs[row];
int endIdx = rowPtrs[row + 1];

// Set buffer positions to the start of this row
dataBuf.position(startIdx * Double.BYTES);
indicesBuf.position(startIdx * Integer.BYTES);

// Process all non-zeros in this row sequentially
for(int idx = startIdx; idx < endIdx; idx++) {
double val = dataBuf.getDouble();
int colIndex = indicesBuf.getInt();
mb.set(row, colIndex, val);
}
}

mb.recomputeNonZeros();
mb.examSparsity();
return mb;
}

public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) {
MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
ret.allocateBlock();
Expand Down
24 changes: 24 additions & 0 deletions src/main/python/make_jar_and_package.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

mvn package -P distribution
python create_python_dist.py
12 changes: 11 additions & 1 deletion src/main/python/systemds/context/systemds_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import sys
import struct
import traceback
import warnings
from contextlib import contextmanager
from glob import glob
from queue import Queue
Expand Down Expand Up @@ -103,9 +104,18 @@ def __init__(
The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL.
:param py4j_logging_level: The logging level for Py4j to use, since all communication to the JVM is done through this,
it can be verbose if not set high.
:param data_transfer_mode: default 0,
:param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes (on unix systems)
:param multi_pipe_enabled: default False, if True, use multiple pipes for data transfer
only used if data_transfer_mode is 1.
.. experimental:: This parameter is experimental and may be removed in a future version.
"""

if multi_pipe_enabled:
warnings.warn(
"The 'multi_pipe_enabled' parameter is experimental and may be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
self.__setup_logging(logging_level, py4j_logging_level)
self.__start(port, capture_stdout)
self.capture_stats(capture_statistics)
Expand Down
Loading
Loading