Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
## [Unreleased]

### Added
- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running).
- Metadata operations now use SQL SHOW commands for both Thrift and SEA backends,
ensuring consistent behavior for SQL warehouses regardless of underlying
protocol. To revert to native Thrift metadata RPCs, set `UseQueryForMetadata=0`.
- Added `UseBoundedSeaApi` connection property (default `0`/off). When enabled, the driver uses the bounded SEA API contract for CloudFetch: sends `row_offset` on GetResultData requests and uses `next_chunk_index` for chunk discovery instead of `total_chunk_count`. Requires server support.

### Updated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,11 @@ public boolean isCloudFetchEnabled() {
return getParameter(DatabricksJdbcUrlParams.ENABLE_CLOUD_FETCH).equals("1");
}

@Override
public boolean isBoundedSeaApiEnabled() {
return getParameter(DatabricksJdbcUrlParams.USE_BOUNDED_SEA_API).equals("1");
}

@Override
public int getThriftMaxBatchesInMemory() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum ResultSetType {
private ResultSetType resultSetType = ResultSetType.UNASSIGNED;

private boolean complexDatatypeSupport = false;
private boolean boundedSeaApiEnabled = false;

// Cached telemetry collector resolved once at construction time to avoid
// per-row overhead in next(). The connection-to-collector mapping is stable
Expand Down Expand Up @@ -128,6 +129,7 @@ public DatabricksResultSet(
resultSetMetaData = null;
}
this.complexDatatypeSupport = session.getConnectionContext().isComplexDatatypeSupportEnabled();
this.boundedSeaApiEnabled = session.getConnectionContext().isBoundedSeaApiEnabled();
this.statementType = statementType;
this.updateCount = null;
this.parentStatement = parentStatement;
Expand Down Expand Up @@ -933,8 +935,15 @@ public boolean isBeforeFirst() throws SQLException {
public boolean isAfterLast() throws SQLException {
checkIfClosed();
// Account for client-side maxRows truncation
return truncatedByMaxRows
|| executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows();
if (truncatedByMaxRows) {
return true;
}
// Bounded SEA API: manifest.total_row_count is not populated, so use hasNext()
// which derives end-of-stream from next_chunk_index via the chunk provider.
if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
return executionResult.getCurrentRow() >= resultSetMetaData.getTotalRows();
}

@Override
Expand Down Expand Up @@ -974,6 +983,10 @@ public boolean isLast() throws SQLException {
|| executionResult instanceof StreamingInlineArrowResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
// Bounded SEA API: manifest.total_row_count is not populated, so use hasNext()
if (boundedSeaApiEnabled && executionResult instanceof ArrowStreamResult) {
return executionResult.getCurrentRow() >= 0 && !executionResult.hasNext();
}
return executionResult.getCurrentRow() == resultSetMetaData.getTotalRows() - 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ private static ChunkProvider createRemoteChunkProvider(

IDatabricksConnectionContext connectionContext = session.getConnectionContext();

if (connectionContext.isStreamingChunkProviderEnabled()) {
// Bounded SEA API forces StreamingChunkProvider — it doesn't rely on total_chunk_count
if (connectionContext.isStreamingChunkProviderEnabled()
|| connectionContext.isBoundedSeaApiEnabled()) {
LOGGER.info(
"Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId());

Expand All @@ -113,10 +115,13 @@ private static ChunkProvider createRemoteChunkProvider(
int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds();
double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold();

// Convert ExternalLinks to ChunkLinkFetchResult for the provider
// Convert ExternalLinks to ChunkLinkFetchResult for the provider.
// Bounded SEA API: pass null for totalChunkCount — we must not depend on
// manifest.{chunks, total_chunk_count, total_row_count} per the bounded API contract.
Long totalChunkCount =
connectionContext.isBoundedSeaApiEnabled() ? null : resultManifest.getTotalChunkCount();
ChunkLinkFetchResult initialLinks =
convertToChunkLinkFetchResult(
resultData.getExternalLinks(), resultManifest.getTotalChunkCount());
convertToChunkLinkFetchResult(resultData.getExternalLinks(), totalChunkCount);

return new StreamingChunkProvider(
linkFetcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public SeaChunkLinkFetcher(IDatabricksSession session, StatementId statementId)
@Override
public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset)
throws SQLException {
// SEA uses startChunkIndex; startRowOffset is ignored
// SEA uses startChunkIndex; startRowOffset is passed through for bounded SEA API
LOGGER.debug(
"Fetching links starting from chunk index {} for statement {}",
startChunkIndex,
Expand All @@ -45,7 +45,7 @@ public ChunkLinkFetchResult fetchLinks(long startChunkIndex, long startRowOffset

@Override
public ExternalLink refetchLink(long chunkIndex, long rowOffset) throws SQLException {
// SEA uses chunkIndex; rowOffset is ignored
// SEA uses chunkIndex; rowOffset is passed through for bounded SEA API
LOGGER.info("Refetching expired link for chunk {} of statement {}", chunkIndex, statementId);

ChunkLinkFetchResult result =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.databricks.jdbc.api.impl.arrow;

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
Expand Down Expand Up @@ -29,6 +31,10 @@ public class StreamingChunkDownloadTask implements Callable<Void> {
private final LinkRefresher linkRefresher;
private final double cloudFetchSpeedThreshold;

// Capture caller's thread context for telemetry/logging on the download thread
private final IDatabricksConnectionContext connectionContext;
private final String statementId;

public StreamingChunkDownloadTask(
ArrowResultChunk chunk,
IDatabricksHttpClient httpClient,
Expand All @@ -40,13 +46,21 @@ public StreamingChunkDownloadTask(
this.compressionCodec = compressionCodec;
this.linkRefresher = linkRefresher;
this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold;
this.connectionContext = DatabricksThreadContextHolder.getConnectionContext();
this.statementId = DatabricksThreadContextHolder.getStatementId();
}

@Override
public Void call() throws DatabricksSQLException {
int retries = 0;
boolean downloadSuccessful = false;
Throwable uncaughtException = null;

// Propagate caller's thread context for telemetry/logging
DatabricksThreadContextHolder.setConnectionContext(this.connectionContext);
DatabricksThreadContextHolder.setStatementId(this.statementId);

long taskStartTime = System.nanoTime();
try {
while (!downloadSuccessful) {
try {
Expand All @@ -62,7 +76,12 @@ public Void call() throws DatabricksSQLException {
chunk.downloadData(httpClient, compressionCodec, cloudFetchSpeedThreshold);
downloadSuccessful = true;

LOGGER.debug("Successfully downloaded chunk {}", chunk.getChunkIndex());
long taskTotalMs = (System.nanoTime() - taskStartTime) / 1_000_000;
LOGGER.debug(
"Chunk download complete: chunkIndex={}, totalMs={}, retries={}",
chunk.getChunkIndex(),
taskTotalMs,
retries);

} catch (IOException | SQLException e) {
retries++;
Expand All @@ -72,7 +91,7 @@ public Void call() throws DatabricksSQLException {
chunk.getChunkIndex(),
MAX_RETRIES,
e.getMessage());
// Status will be set to DOWNLOAD_FAILED in the finally block
// Status set to DOWNLOAD_FAILED in the finally block
throw new DatabricksSQLException(
String.format(
"Failed to download chunk %d after %d attempts",
Expand All @@ -95,18 +114,28 @@ public Void call() throws DatabricksSQLException {
}
}
}
} catch (Throwable t) {
uncaughtException = t;
throw t;
} finally {
if (downloadSuccessful) {
chunk.getChunkReadyFuture().complete(null);
} else {
LOGGER.info(
"Download failed for chunk {}: {}",
chunk.getChunkIndex(),
uncaughtException != null ? uncaughtException.getMessage() : "unknown");
chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED);
chunk
.getChunkReadyFuture()
.completeExceptionally(
new DatabricksSQLException(
"Download failed for chunk " + chunk.getChunkIndex(),
uncaughtException,
DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR));
}

DatabricksThreadContextHolder.clearAllContext();
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
*/
public class StreamingChunkProvider implements ChunkProvider {

/** Immutable holder for the next fetch position — ensures atomic reads of (index, rowOffset). */
private static final class FetchPosition {
final long chunkIndex;
final long rowOffset;

FetchPosition(long chunkIndex, long rowOffset) {
this.chunkIndex = chunkIndex;
this.rowOffset = rowOffset;
}
}

private static final JdbcLogger LOGGER =
JdbcLoggerFactory.getLogger(StreamingChunkProvider.class);
private static final String DOWNLOAD_THREAD_PREFIX = "databricks-jdbc-streaming-downloader-";
Expand Down Expand Up @@ -75,8 +86,11 @@ public class StreamingChunkProvider implements ChunkProvider {
// - nextDownloadIndex: written only under downloadLock, but AtomicLong for consistency
private final AtomicLong currentChunkIndex = new AtomicLong(-1);
private final AtomicLong highestKnownChunkIndex = new AtomicLong(-1);
private volatile long nextLinkFetchIndex = 0;
private volatile long nextRowOffsetToFetch = 0;
// Bundled as an immutable pair for atomic reads/writes across threads.
// The prefetch thread reads this (fetchNextLinkBatch) while the download thread
// may update it (getRefreshedLink reconciliation). A volatile reference to an
// immutable holder ensures both fields are always read consistently.
private volatile FetchPosition nextFetchPosition = new FetchPosition(0, 0);
private final AtomicLong nextDownloadIndex = new AtomicLong(0);

// State flags
Expand Down Expand Up @@ -347,11 +361,11 @@ private void linkPrefetchLoop() {
long targetIndex = currentChunkIndex.get() + linkPrefetchWindow;

// Wait if we're caught up
while (!endOfStreamReached && nextLinkFetchIndex > targetIndex) {
while (!endOfStreamReached && nextFetchPosition.chunkIndex > targetIndex) {
if (closed) break;
LOGGER.debug(
"Prefetch caught up, waiting for consumer. next={}, target={}",
nextLinkFetchIndex,
nextFetchPosition.chunkIndex,
targetIndex);
consumerAdvanced.await();
targetIndex = currentChunkIndex.get() + linkPrefetchWindow;
Expand Down Expand Up @@ -396,13 +410,14 @@ private void fetchNextLinkBatch() throws SQLException {
return;
}

FetchPosition pos = nextFetchPosition;
LOGGER.debug(
"Fetching links starting from index {}, row offset {} for statement {}",
nextLinkFetchIndex,
nextRowOffsetToFetch,
pos.chunkIndex,
pos.rowOffset,
statementId);

ChunkLinkFetchResult result = linkFetcher.fetchLinks(nextLinkFetchIndex, nextRowOffsetToFetch);
ChunkLinkFetchResult result = linkFetcher.fetchLinks(pos.chunkIndex, pos.rowOffset);

if (result.isEndOfStream()) {
LOGGER.info("End of stream reached for statement {}", statementId);
Expand All @@ -415,10 +430,9 @@ private void fetchNextLinkBatch() throws SQLException {
createChunkFromLink(link);
}

// Update next fetch positions
// Update next fetch position atomically
if (result.hasMore()) {
nextLinkFetchIndex = result.getNextFetchIndex();
nextRowOffsetToFetch = result.getNextRowOffset();
nextFetchPosition = new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset());
} else {
endOfStreamReached = true;
LOGGER.info("End of stream reached for statement {} (hasMore=false)", statementId);
Expand Down Expand Up @@ -450,14 +464,15 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks)
createChunkFromLink(link);
}

// Set next fetch positions using unified API
// Set next fetch position atomically
if (initialLinks.hasMore()) {
nextLinkFetchIndex = initialLinks.getNextFetchIndex();
nextRowOffsetToFetch = initialLinks.getNextRowOffset();
FetchPosition pos =
new FetchPosition(initialLinks.getNextFetchIndex(), initialLinks.getNextRowOffset());
nextFetchPosition = pos;
LOGGER.debug(
"Next fetch position set to chunk index {}, row offset {} from initial links",
nextLinkFetchIndex,
nextRowOffsetToFetch);
pos.chunkIndex,
pos.rowOffset);
} else {
endOfStreamReached = true;
LOGGER.info("End of stream reached from initial links for statement {}", statementId);
Expand All @@ -471,11 +486,6 @@ private void processInitialLinks(ChunkLinkFetchResult initialLinks)
*/
private void createChunkFromLink(ExternalLink link) throws DatabricksParsingException {
long chunkIndex = link.getChunkIndex();
if (chunks.containsKey(chunkIndex)) {
LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex);
return;
}

long rowCount = link.getRowCount();
long rowOffset = link.getRowOffset();

Expand All @@ -488,7 +498,16 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce
.build();

chunk.setChunkLink(link);
chunks.put(chunkIndex, chunk);

// Atomic insert — if another thread already created this chunk, skip.
// This is safe because createChunkFromLink can be called concurrently from
// the prefetch thread (fetchNextLinkBatch) and download threads (getRefreshedLink).
ArrowResultChunk existing = chunks.putIfAbsent(chunkIndex, chunk);
if (existing != null) {
LOGGER.debug("Chunk {} already exists, skipping creation", chunkIndex);
return;
}

highestKnownChunkIndex.updateAndGet(current -> Math.max(current, chunkIndex));
totalRowCount.addAndGet(rowCount);

Expand Down Expand Up @@ -596,22 +615,48 @@ ExternalLink getRefreshedLink(long chunkIndex, long rowOffset) throws SQLExcepti
// Single batch FetchResults RPC from the lowest expired offset
ChunkLinkFetchResult result = linkFetcher.fetchLinks(minExpiredIndex, minExpiredRowOffset);

// Update ALL pre-download chunks that received fresh links.
// Always overwrite even if the current link hasn't expired yet, since the
// server-provided link has a later expiry and prevents near-expiry races.
// Reconcile ALL links from the refresh response with local chunk state.
for (ExternalLink link : result.getChunkLinks()) {
ArrowResultChunk c = chunks.get(link.getChunkIndex());
if (c != null) {
// Existing chunk: update link only for pre-download states.
// DOWNLOADING stays as-is (download task owns the state machine).
// DOWNLOADED/RELEASED/etc. stay as-is (bytes already in memory).
ChunkStatus status = c.getStatus();
if (status == ChunkStatus.PENDING
|| status == ChunkStatus.URL_FETCHED
|| status == ChunkStatus.DOWNLOAD_FAILED
|| status == ChunkStatus.DOWNLOAD_RETRY) {
c.setChunkLink(link);
}
} else {
// Server returned a chunk not yet in our map — create it.
// Handles cases where refresh response includes chunks beyond
// our current highestKnownChunkIndex.
try {
createChunkFromLink(link);
} catch (Exception e) {
LOGGER.warn(
"Failed to create chunk {} from refresh response: {}",
link.getChunkIndex(),
e.getMessage());
}
}
}

// Update end-of-stream and prefetch position from refresh response
if (!result.hasMore()) {
endOfStreamReached = true;
} else if (result.getNextFetchIndex() > nextFetchPosition.chunkIndex) {
// Avoid re-fetching chunks that the refresh already discovered.
// Atomic update of both fields via immutable holder.
nextFetchPosition =
new FetchPosition(result.getNextFetchIndex(), result.getNextRowOffset());
}

// Trigger downloads for any newly-created chunks
triggerDownloads();

// Check if our target chunk was refreshed by the batch
targetChunk = chunks.get(chunkIndex);
if (targetChunk != null && !targetChunk.isChunkLinkInvalid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ default int getHeartbeatIntervalSeconds() {
*/
boolean isCloudFetchEnabled();

/** Returns whether bounded SEA API mode is enabled for CloudFetch. */
boolean isBoundedSeaApiEnabled();

/**
* Returns the maximum number of batches to keep in memory for Thrift streaming.
*
Expand Down
Loading
Loading