Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,7 @@ public int getTimeout() {
private long reconnectMaxDurationMillis = PARAMETER_NOT_SET_EXPLICITLY;
private boolean requestDurableAck;
private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY;
private boolean transactional;
private String senderId = DEFAULT_SENDER_ID;
// Per-append deadline for SF appendBlocking spin-then-throw. Used to
// be a hardcoded 30s constant; expose so tight-SLA users can lower
Expand Down Expand Up @@ -1531,6 +1532,7 @@ public Sender build() {
// closing the engine alone would leak the I/O thread,
// dispatcher daemon, drainer pool, microbatch buffers and
// WebSocketClient inside the abandoned `connected`.
connected.setTransactional(transactional);
try {
// Once the foreground sender is up, dispatch drainers
// for any sibling orphan slots. Scan AFTER we acquire
Expand Down Expand Up @@ -2402,6 +2404,24 @@ public LineSenderBuilder requestDurableAck(boolean enabled) {
return this;
}

/**
* Enables transactional mode. Auto-flush sends data to the server
* with {@code FLAG_DEFER_COMMIT}; only an explicit {@code flush()}
* triggers the server-side WAL commit. This allows accumulating
* datasets larger than the server's recv buffer while committing
* atomically per table.
*
* @param enabled true to enable transactional mode
* @return this instance for method chaining
*/
public LineSenderBuilder transactional(boolean enabled) {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("transactional is only supported for WebSocket transport");
}
this.transactional = enabled;
return this;
}

/**
* Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server.
* <br>
Expand Down Expand Up @@ -3122,6 +3142,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
} else {
throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]");
}
} else if (Chars.equals("transaction", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("transaction is only supported for WebSocket transport");
}
pos = getValue(configurationString, pos, sink, "transaction");
if (Chars.equalsIgnoreCase("on", sink)) {
transactional(true);
} else if (Chars.equalsIgnoreCase("off", sink)) {
transactional(false);
} else {
throw new LineSenderException("invalid transaction [value=").put(sink).put(", allowed-values=[on, off]]");
}
} else if (Chars.equals("max_schemas_per_connection", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ public int getWritePos() {
return writePos;
}

@Override
public void patchByte(int offset, byte value) {
Unsafe.getUnsafe().putByte(bufPtr + offset, value);
}

/**
* Patches an int value at the specified offset.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public int getWritableBytes() {
return capacity - position;
}

@Override
public void patchByte(int offset, byte value) {
Unsafe.getUnsafe().putByte(bufferPtr + offset, value);
}

/**
* Patches an int value at the specified offset.
* Used for updating length fields after writing content.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public interface QwpBufferWriter extends ArrayBufferAppender {
*/
int getWritableBytes();

/**
* Patches a byte value at the specified offset in the buffer.
*
* @param offset the byte offset from buffer start
* @param value the byte value to write
*/
void patchByte(int offset, byte value);

/**
* Patches an int value at the specified offset in the buffer.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private void writeSymbolColumn(QwpTableBuffer.ColumnBuffer col, int count, int d
long dataAddr = col.getDataAddress();
buffer.putVarint(dictionarySize);
for (int i = 0; i < dictionarySize; i++) {
buffer.putString((String) col.getSymbolValue(i));
buffer.putString(col.getSymbolValue(i));
}

for (int i = 0; i < count; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ public boolean isGorillaEnabled() {
return (flags & FLAG_GORILLA) != 0;
}

public void setDeferCommit(boolean defer) {
if (defer) {
flags |= FLAG_DEFER_COMMIT;
} else {
flags &= ~FLAG_DEFER_COMMIT;
}
}

public void setGorillaEnabled(boolean enabled) {
if (enabled) {
flags |= FLAG_GORILLA;
Expand Down
Loading
Loading