Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 131 additions & 12 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import java.security.PrivateKey;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -729,7 +731,9 @@ public int getTimeout() {
// build() time. 0 or negative is a documented "disable" value, so
// a Long.MIN_VALUE sentinel keeps it distinguishable from "unset".
private static final long DURABLE_ACK_KEEPALIVE_NOT_SET = Long.MIN_VALUE;
private long authTimeoutMillis = QwpWebSocketSender.DEFAULT_AUTH_TIMEOUT_MS;
private long durableAckKeepaliveIntervalMillis = DURABLE_ACK_KEEPALIVE_NOT_SET;
private boolean gorillaEnabled = true;
// Drives the initial-connect strategy. OFF is fail-fast (default).
// SYNC retries on the user thread up to the reconnect cap. ASYNC
// returns immediately and lets the I/O thread retry in the
Expand Down Expand Up @@ -833,6 +837,58 @@ public LineSenderBuilder address(CharSequence address) {
return this;
}

private void addAddressEntry(CharSequence src, int start, int end, int defaultPort) {
int colon = Chars.indexOf(src, start, end, ':');
if (colon == end - 1) {
throw new LineSenderException("invalid address, use IPv4 address or a domain name [address=")
.put(src.subSequence(start, end)).put("]");
}
int hostEnd = colon < 0 ? end : colon;
int parsedPort = -1;
if (colon >= 0) {
try {
parsedPort = Numbers.parseInt(src, colon + 1, end);
if (parsedPort < 1 || parsedPort > 65535) {
throw new LineSenderException("invalid port [port=").put(parsedPort).put("]");
}
} catch (NumericException e) {
throw new LineSenderException("cannot parse a port from the address, use IPv4 address or a domain name")
.put(" [address=").put(src.subSequence(start, end)).put("]");
}
}
if (hostEnd == start) {
throw new LineSenderException("empty host in addr entry [address=")
.put(src.subSequence(start, end)).put("]");
}
int effectivePort = parsedPort != -1 ? parsedPort : defaultPort;
for (int i = 0, n = hosts.size(); i < n; i++) {
String storedHost = hosts.get(i);
if (charsEqualsRange(storedHost, src, start, hostEnd)) {
if (ports.size() > i && ports.getQuick(i) == effectivePort) {
throw new LineSenderException("duplicated addresses are not allowed [address=")
.put(src.subSequence(start, end)).put("]");
}
}
}
hosts.add(src.subSequence(start, hostEnd).toString());
if (parsedPort != -1) {
ports.add(parsedPort);
}
}

private static boolean charsEqualsRange(CharSequence a, CharSequence b, int bStart, int bEnd) {
int len = bEnd - bStart;
if (a.length() != len) {
return false;
}
for (int i = 0; i < len; i++) {
if (a.charAt(i) != b.charAt(bStart + i)) {
return false;
}
}
return true;
}

/**
* Advanced TLS configuration. Most users should not need to use this.
*
Expand Down Expand Up @@ -1029,8 +1085,8 @@ public Sender build() {
}

if (protocol == PROTOCOL_WEBSOCKET) {
if (hosts.size() != 1 || ports.size() != 1) {
throw new LineSenderException("only a single address (host:port) is supported for WebSocket transport");
if (hosts.size() < 1 || ports.size() != hosts.size()) {
throw new LineSenderException("WebSocket transport requires at least one host:port pair");
}

int actualAutoFlushRows = autoFlushRows == PARAMETER_NOT_SET_EXPLICITLY ? DEFAULT_WS_AUTO_FLUSH_ROWS : autoFlushRows;
Expand Down Expand Up @@ -1134,11 +1190,15 @@ public Sender build() {
int actualErrorInboxCapacity = errorInboxCapacity != PARAMETER_NOT_SET_EXPLICITLY
? errorInboxCapacity
: io.questdb.client.cutlass.qwp.client.sf.cursor.SenderErrorDispatcher.DEFAULT_CAPACITY;
List<QwpWebSocketSender.Endpoint> wsEndpoints =
new ArrayList<>(hosts.size());
for (int i = 0, n = hosts.size(); i < n; i++) {
wsEndpoints.add(new QwpWebSocketSender.Endpoint(hosts.getQuick(i), ports.getQuick(i)));
}
QwpWebSocketSender connected;
try {
connected = QwpWebSocketSender.connect(
hosts.getQuick(0),
ports.getQuick(0),
wsEndpoints,
wsTlsConfig,
actualAutoFlushRows,
actualAutoFlushBytes,
Expand All @@ -1155,7 +1215,9 @@ public Sender build() {
initialConnectMode,
errorHandler,
actualErrorInboxCapacity,
actualDurableAckKeepaliveIntervalMillis
actualDurableAckKeepaliveIntervalMillis,
authTimeoutMillis,
gorillaEnabled
);
} catch (Throwable t) {
// connect() failed before ownership of cursorEngine
Expand Down Expand Up @@ -1971,6 +2033,30 @@ public LineSenderBuilder durableAckKeepaliveIntervalMillis(long millis) {
return this;
}

/**
* Per-endpoint timeout on the WebSocket upgrade response read. Default
* {@value QwpWebSocketSender#DEFAULT_AUTH_TIMEOUT_MS} ms.
*/
public LineSenderBuilder authTimeoutMillis(long millis) {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException(
"auth_timeout_ms is only supported for WebSocket transport");
}
if (millis <= 0L) {
throw new LineSenderException("auth_timeout_ms must be > 0: ").put(millis);
}
this.authTimeoutMillis = millis;
return this;
}

public LineSenderBuilder gorilla(boolean enabled) {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("gorilla is only supported for WebSocket transport");
}
this.gorillaEnabled = enabled;
return this;
}

/**
* Per-outage cap on the cursor I/O loop's reconnect retry budget.
* Once a wire failure occurs, the loop retries with exponential
Expand Down Expand Up @@ -2434,13 +2520,28 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
}
if (Chars.equals("addr", sink)) {
pos = getValue(configurationString, pos, sink, "address");
address(sink);
if (ports.size() == hosts.size() - 1) {
// not set
port(protocol == PROTOCOL_HTTP ? DEFAULT_HTTP_PORT
: protocol == PROTOCOL_UDP ? DEFAULT_UDP_PORT
: protocol == PROTOCOL_WEBSOCKET ? DEFAULT_WEBSOCKET_PORT
: DEFAULT_TCP_PORT);
int defaultPort = protocol == PROTOCOL_HTTP ? DEFAULT_HTTP_PORT
: protocol == PROTOCOL_UDP ? DEFAULT_UDP_PORT
: protocol == PROTOCOL_WEBSOCKET ? DEFAULT_WEBSOCKET_PORT
: DEFAULT_TCP_PORT;
int valLen = sink.length();
int entryStart = 0;
for (int i = 0; i <= valLen; i++) {
if (i == valLen || sink.charAt(i) == ',') {
int s = entryStart;
int e = i;
while (s < e && sink.charAt(s) == ' ') s++;
while (e > s && sink.charAt(e - 1) == ' ') e--;
if (s == e) {
throw new LineSenderException("empty addr entry");
}
int portsBefore = ports.size();
addAddressEntry(sink, s, e, defaultPort);
if (ports.size() == portsBefore) {
port(defaultPort);
}
entryStart = i + 1;
}
}
} else if (Chars.equals("user", sink)) {
// deprecated key: user, new key: username
Expand Down Expand Up @@ -2666,6 +2767,24 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
}
pos = getValue(configurationString, pos, sink, "close_flush_timeout_millis");
closeFlushTimeoutMillis(parseLongValue(sink, "close_flush_timeout_millis"));
} else if (Chars.equals("auth_timeout_ms", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("auth_timeout_ms is only supported for WebSocket transport");
}
pos = getValue(configurationString, pos, sink, "auth_timeout_ms");
authTimeoutMillis(parseLongValue(sink, "auth_timeout_ms"));
} else if (Chars.equals("gorilla", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("gorilla is only supported for WebSocket transport");
}
pos = getValue(configurationString, pos, sink, "gorilla");
if (Chars.equals("on", sink) || Chars.equals("true", sink)) {
gorilla(true);
} else if (Chars.equals("off", sink) || Chars.equals("false", sink)) {
gorilla(false);
} else {
throw new LineSenderException("invalid gorilla [value=").put(sink).put(", allowed=[on, off]]");
}
} else if (Chars.equals("durable_ack_keepalive_interval_millis", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public abstract class WebSocketClient implements QuietCloseable {
private static final int PARSE_INCOMPLETE = 0;
private static final int PARSE_NEED_MORE = -1;
private static final int PARSE_OK = 1;
private static final String QUESTDB_ROLE_HEADER_NAME = "X-QuestDB-Role:";
private static final String QWP_DURABLE_ACK_ENABLED_VALUE = "enabled";
private static final String QWP_DURABLE_ACK_HEADER_NAME = "X-QWP-Durable-Ack:";
private static final String QWP_VERSION_HEADER_NAME = "X-QWP-Version:";
Expand Down Expand Up @@ -133,6 +134,8 @@ public abstract class WebSocketClient implements QuietCloseable {
// setQwpRequestDurableAck) is the early-fail signal.
private boolean serverDurableAckEnabled;
private int serverQwpVersion = 1;
private String upgradeRejectRole;
private int upgradeStatusCode;
private boolean upgraded;

public WebSocketClient(HttpClientConfiguration configuration, SocketFactory socketFactory) {
Expand Down Expand Up @@ -296,6 +299,22 @@ public int getServerQwpVersion() {
return serverQwpVersion;
}

/**
* Role from {@code X-QuestDB-Role} on the most recent rejected upgrade,
* or null when no such header was present.
*/
public String getUpgradeRejectRole() {
return upgradeRejectRole;
}

/**
* HTTP status code from the most recent rejected upgrade, or 0 if no
* upgrade rejection has been observed yet.
*/
public int getUpgradeStatusCode() {
return upgradeStatusCode;
}

/**
* Returns whether the WebSocket is connected and upgraded.
*/
Expand Down Expand Up @@ -504,6 +523,8 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe
if (upgraded) {
return; // Already upgraded
}
upgradeRejectRole = null;
upgradeStatusCode = 0;

// Generate random key
byte[] keyBytes = new byte[16];
Expand Down Expand Up @@ -624,6 +645,35 @@ private static boolean extractDurableAckEnabled(String response) {
return false;
}

private static int parseStatusCode(String statusLine) {
int sp1 = statusLine.indexOf(' ');
if (sp1 < 0 || sp1 + 4 > statusLine.length()) return 0;
int code = 0;
for (int i = sp1 + 1; i < sp1 + 4; i++) {
char c = statusLine.charAt(i);
if (c < '0' || c > '9') return 0;
code = code * 10 + (c - '0');
}
return code;
}

private static String extractRoleHeader(String response) {
int headerLen = QUESTDB_ROLE_HEADER_NAME.length();
int responseLen = response.length();
for (int i = 0; i <= responseLen - headerLen; i++) {
if (response.regionMatches(true, i, QUESTDB_ROLE_HEADER_NAME, 0, headerLen)) {
int valueStart = i + headerLen;
int lineEnd = response.indexOf('\r', valueStart);
if (lineEnd < 0) {
lineEnd = responseLen;
}
String value = response.substring(valueStart, lineEnd).trim();
return value.isEmpty() ? null : value.toUpperCase(java.util.Locale.ROOT);
}
}
return null;
}

private static int extractQwpVersion(String response) {
int headerLen = QWP_VERSION_HEADER_NAME.length();
int responseLen = response.length();
Expand Down Expand Up @@ -1028,9 +1078,12 @@ private void validateUpgradeResponse(int headerEnd) {
}
String response = new String(responseBytes, StandardCharsets.US_ASCII);

// Check status line
if (!response.startsWith("HTTP/1.1 101")) {
String statusLine = response.split("\r\n")[0];
upgradeStatusCode = parseStatusCode(statusLine);
if (upgradeStatusCode == 421) {
upgradeRejectRole = extractRoleHeader(response);
}
throw new HttpClientException("WebSocket upgrade failed: ").put(statusLine);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public class QueryEvent {
* having to reconstruct the classification from a side-channel latch.
*/
public static final int KIND_TRANSPORT_ERROR = 4;
/**
* Permanent protocol-level disagreement (unsupported QWP version, framing
* corruption that won't recover). {@code execute()} surfaces this directly
* instead of triggering failover -- version mismatch is cluster-wide and
* retrying against another endpoint masks the disagreement.
*/
public static final int KIND_PROTOCOL_ERROR = 5;

public QwpBatchBuffer buffer; // valid for KIND_BATCH (must be released to pool by consumer)
public String errorMessage; // valid for KIND_ERROR
Expand Down Expand Up @@ -90,6 +97,14 @@ public QueryEvent asTransportError(byte status, String message) {
return this;
}

public QueryEvent asProtocolError(byte status, String message) {
this.kind = KIND_PROTOCOL_ERROR;
this.buffer = null;
this.errorStatus = status;
this.errorMessage = message;
return this;
}

/**
* Clears object references and resets primitive fields so a pooled event is
* safe to reuse across queries. The I/O thread calls the {@code asX(...)}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2026 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package io.questdb.client.cutlass.qwp.client;

import io.questdb.client.cutlass.http.client.HttpClientException;

/**
* WebSocket upgrade rejected with {@code 401}/{@code 403}/{@code 404}. Terminal
* across all configured endpoints: a rejected credential is uniformly rejected
* across the cluster, so failing fast keeps server logs clean and surfaces the
* configuration error immediately.
*/
public final class QwpAuthFailedException extends HttpClientException {
private final String host;
private final int port;
private final int statusCode;

public QwpAuthFailedException(int statusCode, String host, int port) {
super("WebSocket upgrade rejected with HTTP ");
put(statusCode).put(" for ").put(host).put(':').put(port);
this.statusCode = statusCode;
this.host = host;
this.port = port;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public int getStatusCode() {
return statusCode;
}
}
Loading
Loading