Skip to content
Merged
5 changes: 4 additions & 1 deletion .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Workflow derived from https://github.com/r-lib/actions/tree/v2/examples
# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help

on:
push:
branches: [main, master]
pull_request:
types: [opened, synchronize, reopened, ready_for_review]

name: R-CMD-check.yaml

Expand All @@ -12,6 +14,7 @@ permissions: read-all
jobs:
R-CMD-check:
runs-on: ${{ matrix.config.os }}
if: github.event_name != 'pull_request' || github.event.pull_request.draft == false

name: ${{ matrix.config.os }} (${{ matrix.config.r }})

Expand All @@ -27,7 +30,7 @@ jobs:
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
R_KEEP_PKG_SOURCE: yes

steps:
- uses: actions/checkout@v4

Expand Down
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: CyteTypeR
Title: CyteType for R
Version: 0.4.0
Version: 0.4.1
Description: CyteTypeR is the R version of CyteType python package.
Authors@R:
person("Nygen Analytics AB", , ,"contact@nygen.io", role = c("aut", "cre"))
Expand All @@ -26,8 +26,10 @@ Imports:
tidyr
Suggests:
duckdb,
future,
furrr,
knitr,
progressr,
rhdf5filters,
rmarkdown,
testthat (>= 3.0.0)
Expand Down
135 changes: 53 additions & 82 deletions R/api.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
# API Response Helper for CyteType
# HTTP primitives, request utilities, and shared constants for CyteType API communication.

# Upload size limits (numeric to avoid integer overflow)
.MAX_UPLOAD_BYTES <- list(
obs_duckdb = 2 * 1024 * 1024 * 1024, # 2 GB
vars_h5 = 50 * 1024 * 1024 * 1024 # 50 GB
)

# Chunked upload retry: delays (sec) after 1st, 2nd, 3rd failure; status codes treated as transient (incl. network/gateway)
.CHUNK_UPLOAD_BACKOFF_SECS <- c(1L, 5L, 20L)
.CHUNK_UPLOAD_TRANSIENT_STATUSES <- c(500L, 502L, 503L, 504L)

# URL path builder (avoids file.path backslashes on Windows)
.url_path <- function(...) {
x <- vapply(c(...), function(seg) gsub("^/+|/+$", "", as.character(seg)), character(1))
paste(x[nzchar(x)], collapse = "/")
}

.make_req <- function(base_url, path, auth_token) {
req <- request(paste0(base_url, "/", path))
if (!is.null(auth_token)) {
req <- req_headers(req, Authorization = paste("Bearer", auth_token))
}
return(req)
}

#' @importFrom httr2 req_auth_bearer_token req_body_json req_headers req_method req_perform req_timeout request resp_body_json resp_body_string resp_status
.api_response_helper <- function(job_id, api_url, req_item, auth_token = NULL) {
Expand Down Expand Up @@ -77,89 +101,36 @@
})
}

# Make Request for Job Results
.make_results_request <- function(job_id, api_url, auth_token = NULL) {

# Helper for consistent responses
make_response <- function(status, result = NULL, message, raw = NULL) {
list(status = status, result = result, message = message, raw_response = raw)
# PUT raw bytes to a presigned URL with retry, ETag validation, and proper error classification.
.put_to_presigned_url <- function(presigned_url, chunk_data, timeout_seconds) {
resp <- request(presigned_url) |>
req_method("PUT") |>
httr2::req_body_raw(chunk_data, type = "application/octet-stream") |>
req_timeout(timeout_seconds) |>
httr2::req_error(is_error = function(resp) FALSE) |>
httr2::req_retry(
max_tries = length(.CHUNK_UPLOAD_BACKOFF_SECS) + 1L,
retry_on_failure = TRUE,
is_transient = function(resp) resp_status(resp) %in% .CHUNK_UPLOAD_TRANSIENT_STATUSES,
backoff = function(tries) .CHUNK_UPLOAD_BACKOFF_SECS[min(tries, length(.CHUNK_UPLOAD_BACKOFF_SECS))]
) |>
req_perform()

status <- resp_status(resp)
if (status >= 400L) {
stop(cytetype_api_error(
message = paste0("Presigned upload rejected with HTTP ", status),
call = "api"
))
}

tryCatch({
# Get job status
status_resp <- .api_response_helper(job_id, api_url, 'status', auth_token)

# Handle 404 immediately
if (status_resp$status_code == 404) {
return(make_response("not_found", message = "Job not found"))
}

job_status <- status_resp$data$jobStatus
status_data <- status_resp$data

# Process based on job status
if (job_status == "completed") {
# Try to get results
results_resp <- tryCatch(
.api_response_helper(job_id, api_url, 'results', auth_token),
error = function(e) {
make_response(
"failed",
message = paste("Job completed but results unavailable:", e$message),
raw = status_data
)
}
)

if (!is.null(results_resp$status) && results_resp$status == "failed") {
return(results_resp)
}

if (results_resp$status_code == 404) {
return(make_response(
"failed",
message = "Job completed but results are unavailable",
raw = status_data
))
}

return(make_response(
"completed",
result = results_resp$data,
message = "Job completed successfully",
raw = status_data
))
}

if (job_status == "failed") {
return(make_response(
"failed",
message = "Job failed",
raw = status_data
))
}

if (job_status %in% c("processing", "pending")) {
return(make_response(
job_status,
message = paste("Job is", job_status),
raw = status_data
))
}

# Unknown status
return(make_response(
"unknown",
message = paste("Unknown job status:", job_status),
raw = status_data
etag <- httr2::resp_header(resp, "ETag")
if (is.null(etag) || !nzchar(etag)) {
stop(cytetype_api_error(
message = "Presigned PUT succeeded but response is missing ETag header",
call = "network"
))
}

}, error = function(e) {
# Handle any unexpected errors
return(make_response(
"error",
message = paste("Error checking job status:", e$message)
))
})
etag
}

46 changes: 33 additions & 13 deletions R/artifacts.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,35 @@
rhdf5::h5writeDataset(.as_string_values(vec), h5loc = fid, name = col_path)
}
did <- rhdf5::H5Dopen(fid, col_path)
rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name")
rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype")
rhdf5::h5writeAttribute(col_name, h5obj = did, name = "source_name", asScalar = TRUE)
rhdf5::h5writeAttribute(source_dtype, h5obj = did, name = "source_dtype", asScalar = TRUE)
rhdf5::H5Dclose(did)
}
invisible(NULL)
}

# Write a sparse matrix under a named HDF5 group.
# csr = FALSE (default): CSC — indptr over columns (genes), indices are row (cell) indices.
# Input m must be cells × genes (n_obs × n_vars).
# Input m must be cells x genes (n_obs x n_vars).
# csr = TRUE: CSR — indptr over rows (cells), indices are column (gene) indices.
# Input m must be genes × cells (n_vars × n_obs) as returned by Seurat GetAssayData.
# Stored via CSC(genes × cells) CSR(cells × genes); no transpose needed.
# Input m must be genes x cells (n_vars x n_obs) as returned by Seurat GetAssayData.
# Stored via CSC(genes x cells) == CSR(cells x genes); no transpose needed.
.write_sparse_group <- function(fid, group, m, n_obs, col_batch, chunk_size,
csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT") {
csr = FALSE, data_h5type = "H5T_NATIVE_FLOAT",
pb_id = NULL) {
if (csr) {
m <- as(m, "CsparseMatrix")
n_vars <- nrow(m)
} else {
n_vars <- ncol(m)
}
n_cols <- ncol(m)

rhdf5::h5createGroup(fid, group)
gid <- rhdf5::H5Gopen(fid, group)
on.exit(rhdf5::H5Gclose(gid), add = TRUE)
rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs")
rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars")
rhdf5::h5writeAttribute(as.integer(n_obs), h5obj = gid, name = "n_obs", asScalar = TRUE)
rhdf5::h5writeAttribute(as.integer(n_vars), h5obj = gid, name = "n_vars", asScalar = TRUE)

rhdf5::h5createDataset(fid, paste0(group, "/indices"), dims = 0L,
maxdims = rhdf5::H5Sunlimited(), chunk = chunk_size,
Expand All @@ -107,7 +109,7 @@
chunk <- as(m[, start:end, drop = FALSE], "CsparseMatrix")
chunk_indices <- as.integer(chunk@i)
chunk_data <- if (data_h5type == "H5T_NATIVE_INT32") as.integer(chunk@x) else as.numeric(chunk@x)
chunk_nnz <- length(chunk_indices)
chunk_nnz <- length(chunk_indices)
if (chunk_nnz > 0L) {
rhdf5::h5set_extent(fid, paste0(group, "/indices"), current_size + chunk_nnz)
rhdf5::h5writeDataset(chunk_indices, h5loc = fid, name = paste0(group, "/indices"),
Expand All @@ -119,6 +121,7 @@
}
new_indptr <- as.numeric(chunk@p[-1L]) + indptr[length(indptr)]
indptr <- c(indptr, new_indptr)
if (!is.null(pb_id)) cli::cli_progress_update(id = pb_id)
}

rhdf5::h5createDataset(fid, paste0(group, "/indptr"), dims = length(indptr),
Expand All @@ -142,6 +145,21 @@
}
chunk_size <- max(1L, min(n_obs * 10L, min_chunk_size))

raw_col_batch <- if (!is.null(raw_mat)) {
max(1L, as.integer(100000000 / max(nrow(raw_mat), 1)))
} else NULL

vars_n_batches <- length(seq(1L, n_vars, by = col_batch))
raw_n_batches <- if (!is.null(raw_mat)) length(seq(1L, ncol(raw_mat), by = raw_col_batch)) else 0L
total_batches <- vars_n_batches + raw_n_batches

pb_label <- if (raw_n_batches > 0L) "Writing vars.h5 (normalized + raw)" else "Writing vars.h5"
pb_id <- cli::cli_progress_bar(
format = paste0(pb_label, " {cli::pb_bar} {cli::pb_current}/{cli::pb_total} batches ({cli::pb_rate})"),
total = total_batches,
clear = FALSE
)

if (file.exists(out_file) && !file.remove(out_file)) {
stop("Could not remove existing file: ", out_file)
}
Expand All @@ -150,16 +168,16 @@
fid <- rhdf5::H5Fopen(out_file, flags = "H5F_ACC_RDWR")
on.exit(rhdf5::H5Fclose(fid), add = TRUE)

.write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size)
.write_sparse_group(fid, "vars", mat, n_obs, col_batch, chunk_size, pb_id = pb_id)

if (!is.null(raw_mat)) {
raw_col_batch <- max(1L, as.integer(100000000 / max(nrow(raw_mat), 1)))
.write_sparse_group(fid, "raw", raw_mat, n_obs, raw_col_batch, chunk_size,
csr = TRUE, data_h5type = "H5T_NATIVE_INT32")
csr = TRUE, data_h5type = "H5T_NATIVE_INT32", pb_id = pb_id)
}

if (!is.null(feature_df)) {
.write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df, feature_names = feature_names)
.write_var_metadata(fid, n_cols = n_vars, feature_df = feature_df,
feature_names = feature_names)
}

invisible(out_file)
Expand Down Expand Up @@ -187,6 +205,8 @@
}
}

cli::cli_progress_step("Writing obs.duckdb")

if (file.exists(out_file)) file.remove(out_file)
config <- list(threads = as.character(threads), memory_limit = memory_limit, temp_directory = temp_directory)
con <- duckdb::dbConnect(duckdb::duckdb(), out_file, config = config)
Expand Down
Loading
Loading