Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: CyteTypeR
Title: CyteType for R
Version: 0.3.2
Version: 0.4.0
Description: CyteTypeR is the R version of CyteType python package.
Authors@R:
person("Nygen Analytics AB", , ,"contact@nygen.io", role = c("aut", "cre"))
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by roxygen2: do not edit by hand

S3method(print,cytetype_api_error)
export(CleanUpArtifacts)
export(CyteTypeR)
export(GetResults)
export(PrepareCyteTypeR)
Expand Down
27 changes: 13 additions & 14 deletions R/api.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,19 @@
# Process based on job status
if (job_status == "completed") {
# Try to get results
tryCatch({
results_resp <- .api_response_helper(job_id, api_url, 'results', auth_token)
}, error = function(e) {
# If results fetch fails, treat as failed job
return(make_response(
"failed",
message = paste("Job completed but results unavailable:", e$message),
raw = status_data
))
})

# Check if we got an error response above
if (is.list(results_resp) && (results_resp$status == "failed")) {
return(results_resp) # Return the error response
results_resp <- tryCatch(
.api_response_helper(job_id, api_url, 'results', auth_token),
error = function(e) {
make_response(
"failed",
message = paste("Job completed but results unavailable:", e$message),
raw = status_data
)
}
)

if (!is.null(results_resp$status) && results_resp$status == "failed") {
return(results_resp)
}

if (results_resp$status_code == 404) {
Expand Down
163 changes: 95 additions & 68 deletions R/artifacts.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@
return(y)
}

# Map R class to a pandas-compatible dtype string for the source_dtype attribute.
.r_to_source_dtype <- function(vec) {
if (is.factor(vec)) return("category")
if (is.logical(vec)) return("bool")
if (is.integer(vec)) return("int32")
if (is.numeric(vec)) return("float64")
return("object")
}

# Write optional var (feature) metadata under info/var (mirrors Python _write_var_metadata).
.write_var_metadata <- function(fid, n_cols, feature_df, feature_names) {
if (nrow(feature_df) != n_cols) {
Expand All @@ -43,6 +52,7 @@
existing <- c(existing, dataset_name)
col_path <- paste0("info/var/columns/", dataset_name)
vec <- feature_df[[i]]
source_dtype <- .r_to_source_dtype(vec)
if (is.factor(vec)) vec <- as.character(vec)
if (is.logical(vec)) {
storage.mode(vec) <- "integer"
Expand All @@ -53,15 +63,75 @@
} else {
rhdf5::h5writeDataset(.as_string_values(vec), h5loc = fid, name = col_path)
}
did <- rhdf5::H5Dopen(fid, col_path)
rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name")
rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype")
rhdf5::H5Dclose(did)
}
invisible(NULL)
}

# Write a sparse matrix under a named HDF5 group.
# csr = FALSE (default): CSC — indptr over columns (genes), indices are row (cell) indices.
# Input m must be cells × genes (n_obs × n_vars).
# csr = TRUE: CSR — indptr over rows (cells), indices are column (gene) indices.
# Input m must be genes × cells (n_vars × n_obs) as returned by Seurat GetAssayData.
# Stored via CSC(genes × cells) ≡ CSR(cells × genes); no transpose needed.
.write_sparse_group <- function(fid, group, m, n_obs, col_batch, chunk_size,
csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT") {
if (csr) {
m <- as(m, "CsparseMatrix")
n_vars <- nrow(m)
} else {
n_vars <- ncol(m)
}
n_cols <- ncol(m)
rhdf5::h5createGroup(fid, group)
gid <- rhdf5::H5Gopen(fid, group)
on.exit(rhdf5::H5Gclose(gid), add = TRUE)
rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs")
rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars")

rhdf5::h5createDataset(fid, paste0(group, "/indices"), dims = 0L,
maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size,
H5type = "H5T_NATIVE_INT32", filter = "BLOSC_LZ4")
rhdf5::h5createDataset(fid, paste0(group, "/data"), dims = 0L,
maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size,
H5type = data_h5type, filter = "BLOSC_LZ4")

indptr <- 0
current_size <- 0L
starts <- seq(1L, n_cols, by = col_batch)
for (start in starts) {
end <- min(start + col_batch - 1L, n_cols)
chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix")
chunk_indices <- as.integer(chunk@i)
chunk_data <- if (data_h5type == "H5T_NATIVE_INT32") as.integer(chunk@x) else as.numeric(chunk@x)
chunk_nnz <- length(chunk_indices)
if (chunk_nnz > 0L) {
rhdf5::h5set_extent(fid, paste0(group, "/indices"), current_size + chunk_nnz)
rhdf5::h5writeDataset(chunk_indices, h5loc = fid, name = paste0(group, "/indices"),
index = list((current_size + 1L):(current_size + chunk_nnz)))
rhdf5::h5set_extent(fid, paste0(group, "/data"), current_size + chunk_nnz)
rhdf5::h5writeDataset(chunk_data, h5loc = fid, name = paste0(group, "/data"),
index = list((current_size + 1L):(current_size + chunk_nnz)))
current_size <- current_size + chunk_nnz
}
new_indptr <- as.numeric(chunk@p[-1L]) + indptr[length(indptr)]
indptr <- c(indptr, new_indptr)
}

rhdf5::h5createDataset(fid, paste0(group, "/indptr"), dims = length(indptr),
H5type = "H5T_NATIVE_INT64", chunk = min(chunk_size, length(indptr)),
filter = "BLOSC_LZ4")
rhdf5::h5writeDataset(indptr, h5loc = fid, name = paste0(group, "/indptr"))
invisible(NULL)
}

.save_vars_h5 <- function(out_file, mat, feature_df = NULL, feature_names = NULL,
.save_vars_h5 <- function(out_file, mat, raw_mat = NULL, feature_df = NULL, feature_names = NULL,
col_batch = NULL, min_chunk_size = 10000L) {
m <- as(mat, "CsparseMatrix")
n_obs <- nrow(m)
n_vars <- ncol(m)
n_obs <- nrow(mat)
n_vars <- ncol(mat)

if (!requireNamespace("rhdf5filters", quietly = TRUE)) {
stop("Package 'rhdf5filters' is required to write vars.h5 with LZ4 compression.")
Expand All @@ -80,90 +150,47 @@
fid <- rhdf5::H5Fopen(out_file, flags = "H5F_ACC_RDWR")
on.exit(rhdf5::H5Fclose(fid), add = TRUE)

rhdf5::h5createGroup(fid, "vars")
rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = out_file, name = "n_obs", h5loc = "vars", asScalar = TRUE)
rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = out_file, name = "n_vars", h5loc = "vars", asScalar = TRUE)

# Create extensible datasets (equivalent to maxshape=(None,) in h5py)
max_nnz <- n_obs * n_vars # upper bound
rhdf5::h5createDataset(
fid, "vars/indices",
dims = 0L,
maxdims = rhdf5::H5Sunlimited(),
chunk = chunk_size,
H5type = "H5T_NATIVE_INT32",
filter = "BLOSC_LZ4"
)
rhdf5::h5createDataset(
fid, "vars/data",
dims = 0L,
maxdims = rhdf5::H5Sunlimited(),
chunk = chunk_size,
H5type = "H5T_NATIVE_FLOAT",
filter = "BLOSC_LZ4"
)

indptr <- 0L
current_size <- 0L
.write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size)

starts <- seq(1L, n_vars, by = col_batch)
for (start in starts) {
end <- min(start + col_batch - 1L, n_vars)
chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix")

chunk_indices <- as.integer(chunk@i)
chunk_data <- as.numeric(chunk@x)
chunk_nnz <- length(chunk_indices)

if (chunk_nnz > 0L) {
# Extend and write indices
rhdf5::h5set_extent(fid, "vars/indices", current_size + chunk_nnz)
rhdf5::h5writeDataset(
chunk_indices, h5loc = fid, name = "vars/indices",
index = list((current_size + 1L):(current_size + chunk_nnz))
)
# Extend and write data
rhdf5::h5set_extent(fid, "vars/data", current_size + chunk_nnz)
rhdf5::h5writeDataset(
chunk_data, h5loc = fid, name = "vars/data",
index = list((current_size + 1L):(current_size + chunk_nnz))
)
current_size <- current_size + chunk_nnz
}

# Accumulate indptr (skip first element after first chunk)
new_indptr <- as.integer(chunk@p[-1L] + indptr[length(indptr)])
indptr <- c(indptr, new_indptr)
if (!is.null(raw_mat)) {
raw_col_batch <- max(1L, as.integer(100000000 / max(nrow(raw_mat), 1)))
.write_sparse_group(fid, "raw", raw_mat, n_obs, raw_col_batch, chunk_size,
csr = TRUE, data_h5type = "H5T_NATIVE_INT32")
}

rhdf5::h5createDataset(
fid, "vars/indptr",
dims = length(indptr),
H5type = "H5T_NATIVE_INT32",
chunk = min(chunk_size, length(indptr)),
filter = "BLOSC_LZ4"
)
rhdf5::h5writeDataset(as.integer(indptr), h5loc = fid, name = "vars/indptr")

if (!is.null(feature_df)) {
.write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df, feature_names = feature_names)
}

invisible(out_file)
}

.save_obs_duckdb <- function(out_file, obs_df, table_name = "obs",
.save_obs_duckdb <- function(out_file, obs_df, coordinates = NULL, coordinates_key = NULL,
table_name = "obs",
threads = "4", memory_limit = "4GB", temp_directory = "tmp/duckdb") {
if (!requireNamespace("duckdb", quietly = TRUE)) {
stop("Package 'duckdb' is required to build obs.duckdb. Install with: install.packages('duckdb')")
}
if (!grepl("^[A-Za-z_][A-Za-z0-9_]*$", table_name)) {
stop("Invalid table_name. Use letters, numbers, and underscores only.")
}

df <- as.data.frame(obs_df)

if (!is.null(coordinates) && !is.null(coordinates_key)) {
coords <- as.matrix(coordinates)
if (ncol(coords) >= 2 && nrow(coords) == nrow(df)) {
col1 <- paste0("__vis_coordinates_", coordinates_key, "_1")
col2 <- paste0("__vis_coordinates_", coordinates_key, "_2")
df[[col1]] <- as.numeric(coords[, 1])
df[[col2]] <- as.numeric(coords[, 2])
}
}

if (file.exists(out_file)) file.remove(out_file)
config <- list(threads = as.character(threads), memory_limit = memory_limit, temp_directory = temp_directory)
con <- duckdb::dbConnect(duckdb::duckdb(), out_file, config = config)
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE), add = TRUE)
duckdb::dbWriteTable(con, table_name, as.data.frame(obs_df), overwrite = TRUE)
duckdb::dbWriteTable(con, table_name, df, overwrite = TRUE)
invisible(out_file)
}
52 changes: 32 additions & 20 deletions R/client.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Upload size limits (bytes): match Python API (vars_h5 10GB uses numeric to avoid integer overflow)
.MAX_UPLOAD_BYTES <- list(obs_duckdb = 100L * 1024L * 1024L, vars_h5 = 10 * 1024 * 1024 * 1024)
# Upload size limits: 100MB and 50GB respectively (vars_h5 uses numeric to avoid integer overflow)
.MAX_UPLOAD_BYTES <- list(obs_duckdb = 100L * 1024L * 1024L, vars_h5 = 50 * 1024 * 1024 * 1024)

# Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway)
.CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L)
Expand Down Expand Up @@ -38,7 +38,7 @@
stop(file_kind, " exceeds upload limit: ", size, " bytes (max ", max_bytes, ")")
}

connection_timeout <- 30L
connection_timeout <- 72L

# Step 1 – Initiate (empty POST; explicit empty body for compatibility)
init_resp <- tryCatch(
Expand All @@ -52,7 +52,7 @@
)

upload_id <- init_resp$upload_id
chunk_size <- as.integer(init_resp$chunk_size_bytes %||% (5L * 1024L * 1024L))
chunk_size <- as.integer(init_resp$chunk_size_bytes %||% (50L * 1024L * 1024L)) ## default 50MB
server_max <- init_resp$max_size_bytes
if (!is.null(server_max) && length(server_max) > 0) {
server_max_n <- as.numeric(server_max)[1]
Expand Down Expand Up @@ -145,7 +145,7 @@
req_method("POST") |>
req_body_json(payload, na = "string") |>
req_headers("Content-Type" = "application/json") |>
req_timeout(60)
req_timeout(180)

# Add auth token if provided
if (!is.null(auth_token)) {
Expand Down Expand Up @@ -211,6 +211,8 @@
last_cluster_status <- list()
spinner_frame = 0
consecutive_not_found <- 0
consecutive_errors <- 0
max_consecutive_errors <- 5

# Main polling loop
repeat {
Expand All @@ -225,8 +227,9 @@
status_response <- .make_results_request(job_id, api_url, auth_token)
status <- status_response$status

# Reset not found counter on valid response
if (status != "not_found"){ consecutive_not_found <- 0 }
# Reset counters on valid server response
if (status != "not_found") consecutive_not_found <- 0
if (status != "error") consecutive_errors <- 0

# Extract cluster status
current_cluster_status <- status_response$raw_response$clusterStatus %||% list()
Expand Down Expand Up @@ -274,31 +277,40 @@
"pending" = {
log_debug("Job {job_id} status: {status}. Waiting {poll_interval}s...")
if (show_progress && length(current_cluster_status) > 0){

# Sleep with spinner updates
.sleep_with_spinner(poll_interval,current_cluster_status,show_progress)
.sleep_with_spinner(poll_interval, current_cluster_status, show_progress)
} else {
Sys.sleep(poll_interval)
}
last_cluster_status <- current_cluster_status
},

"not_found" = {
consecutive_not_found <- consecutive_not_found + 1
log_debug("Job {job_id} not found (404, attempt {consecutive_not_found}). Waiting {poll_interval}s...")

if (!is.null(auth_token) && consecutive_not_found >= 3) {
log_warn("\u26a0\ufe0f Getting consecutive 404 responses with auth token. This might indicate authentication issues.")
log_warn("Please verify your auth_token is valid and has proper permissions")
log_warn("If you're using a shared server, contact your administrator.")
consecutive_not_found <- 0 # Reset to avoid spam
if (consecutive_not_found >= 5) {
cat("\n")
stop(paste0("Job '", job_id, "' not found after ", consecutive_not_found,
" attempts. Verify the job_id is correct."))
}

log_debug("Results endpoint not ready yet for job {job_id} (404). Waiting {poll_interval}s...")
.sleep_with_spinner(poll_interval,current_cluster_status,show_progress)
.sleep_with_spinner(poll_interval, current_cluster_status, show_progress)
last_cluster_status <- current_cluster_status
},

"error" = {
consecutive_errors <- consecutive_errors + 1
error_msg <- status_response$message %||% "Unknown error"
log_warn("Error checking job {job_id} ({consecutive_errors}/{max_consecutive_errors}): {error_msg}")
if (consecutive_errors >= max_consecutive_errors) {
cat("\n")
stop(paste("Stopping after", max_consecutive_errors, "consecutive errors:", error_msg))
}
.sleep_with_spinner(poll_interval, current_cluster_status, show_progress)
},

{
log_warn("Job {job_id} has unknown status: '{status}'. Continuing...")
.sleep_with_spinner(poll_interval,current_cluster_status,show_progress)
cat("\n")
stop(paste("Job", job_id, "returned unexpected status:", status))
}
)

Expand Down
Loading
Loading