Skip to content

Commit 24f7591

Browse files
authored
fix(qwp): fix ingestion stalls after append failures and on sender startup (#32)
1 parent 510d9b5 commit 24f7591

4 files changed

Lines changed: 124 additions & 6 deletions

File tree

core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3072,6 +3072,18 @@ private void sealAndSwapBuffer() {
30723072
cursorEngine.appendBlocking(toSend.getBufferPtr(), toSend.getBufferPos());
30733073
toSend.markRecycled();
30743074
} catch (Throwable t) {
3075+
// appendBlocking failed synchronously on the user thread — the
3076+
// payload never reached the engine, so no I/O thread will
3077+
// recycle toSend. Recycle it here so a later flush can swap
3078+
// back to it; flushPendingRows aborts its post-enqueue state
3079+
// updates after this throw, so the source rows and the
3080+
// sent-schema watermark stay intact and the next batch re-emits
3081+
// the same rows along with the full schema + symbol-dict delta.
3082+
if (toSend.isSending()) {
3083+
toSend.markRecycled();
3084+
} else if (toSend.isSealed()) {
3085+
toSend.rollbackSealForRetry();
3086+
}
30753087
// Surface any I/O thread error first — appendBlocking itself only
30763088
// throws on PAYLOAD_TOO_LARGE / backpressure deadline, but the
30773089
// I/O loop can have failed independently.

core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,17 @@ public void register(SegmentRing ring, String dir, AckWatermark watermark) {
247247
}
248248
}
249249
ring.setManagerWakeup(this::wakeWorker);
250+
// Nudge the worker so it picks up the new ring on its very next
251+
// iteration. Without this, register-after-start has a race window:
252+
// start() schedules the worker thread, and if that thread reaches
253+
// workerLoop and takes `lock` before this method does, it observes
254+
// an empty `rings` snapshot, services nothing, then parkNanos
255+
// (potentially seconds). A new ring whose first append does not
256+
// cross the high-water mark fires no producer-side wakeup either,
257+
// leaving the ring without a spare for the full poll interval.
258+
// wakeWorker is cheap (a single LockSupport.unpark) and a no-op
259+
// when the worker has not been started yet.
260+
wakeWorker();
250261
}
251262

252263
public synchronized void start() {

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,22 @@
2727
import io.questdb.client.cutlass.line.LineSenderException;
2828
import io.questdb.client.cutlass.line.array.DoubleArray;
2929
import io.questdb.client.cutlass.line.array.LongArray;
30+
import io.questdb.client.cutlass.qwp.client.MicrobatchBuffer;
3031
import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender;
32+
import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine;
3133
import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer;
3234
import io.questdb.client.std.Decimal128;
3335
import io.questdb.client.std.Decimal256;
3436
import io.questdb.client.std.Decimal64;
3537
import io.questdb.client.std.bytes.DirectByteSlice;
38+
import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer;
3639
import org.junit.Assert;
3740
import org.junit.Test;
3841

42+
import java.lang.reflect.Field;
3943
import java.time.Instant;
4044
import java.time.temporal.ChronoUnit;
45+
import java.util.concurrent.TimeUnit;
4146

4247
import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
4348

@@ -326,6 +331,54 @@ public void testDoubleColumnAfterCloseThrows() throws Exception {
326331
});
327332
}
328333

334+
@Test
335+
public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exception {
336+
assertMemoryLeak(() -> {
337+
int port = TestPorts.findUnusedPort();
338+
try (TestWebSocketServer server = new TestWebSocketServer(port, new TestWebSocketServer.WebSocketServerHandler() {
339+
})) {
340+
server.start();
341+
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));
342+
343+
// Memory-only engine with a 33-byte budget and a 1 ns append
344+
// deadline guarantees every appendBlocking() call trips the
345+
// backpressure deadline and throws.
346+
CursorSendEngine engine = new CursorSendEngine(null, 33, 33, 1L);
347+
QwpWebSocketSender sender = QwpWebSocketSender.connect(
348+
"localhost", port, null, Integer.MAX_VALUE, 0, 0L, null,
349+
QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L);
350+
try {
351+
sender.table("t").longColumn("v", 1L).atNow();
352+
353+
try {
354+
sender.flushAndGetSequence();
355+
Assert.fail("Expected LineSenderException");
356+
} catch (LineSenderException e) {
357+
Assert.assertTrue(e.getMessage().contains("cursor SF append failed"));
358+
}
359+
360+
MicrobatchBuffer buffer0 = getMicrobatchBuffer(sender, "buffer0");
361+
MicrobatchBuffer buffer1 = getMicrobatchBuffer(sender, "buffer1");
362+
Assert.assertFalse(
363+
"failed append must not leave any buffer in use [buffer0="
364+
+ MicrobatchBuffer.stateName(buffer0.getState())
365+
+ ", buffer1=" + MicrobatchBuffer.stateName(buffer1.getState()) + "]",
366+
buffer0.isInUse() || buffer1.isInUse());
367+
} finally {
368+
// close() drains pending rows, which appendBlocking still
369+
// rejects because the engine is permanently wedged in this
370+
// test. The bug under test is about microbatch buffer
371+
// state, not about close() being lenient toward residual
372+
// unflushed rows — swallow the predictable rethrow here.
373+
try {
374+
sender.close();
375+
} catch (LineSenderException ignored) {
376+
}
377+
}
378+
}
379+
});
380+
}
381+
329382
@Test
330383
public void testGeoHashColumnLongAfterCloseThrows() throws Exception {
331384
assertMemoryLeak(() -> {
@@ -705,6 +758,12 @@ private static void assertClosed(Runnable r) {
705758
}
706759
}
707760

761+
private static MicrobatchBuffer getMicrobatchBuffer(QwpWebSocketSender sender, String fieldName) throws Exception {
762+
Field field = QwpWebSocketSender.class.getDeclaredField(fieldName);
763+
field.setAccessible(true);
764+
return (MicrobatchBuffer) field.get(sender);
765+
}
766+
708767
/**
709768
* Creates a sender without connecting.
710769
* For unit tests that don't need actual connectivity.

core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,18 +218,20 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception {
218218
TestUtils.assertMemoryLeak(() -> {
219219
// pollNanos is intentionally long enough that the 5s park can be
220220
// ruled out as the mechanism by which the first spare arrives.
221-
// The worker thread enters workerLoop on start(), takes the lock,
222-
// sees the just-registered ring with needsHotSpare()==true, and
223-
// provisions the spare BEFORE parking. The spare must therefore
224-
// land within seconds of register(), not minutes -- the 5s park is
225-
// never reached on the first iteration.
221+
// register() unparks the worker after publishing the new ring,
222+
// so the worker re-iterates and provisions the spare even when
223+
// its first loop snapshot ran before register() acquired `lock`.
224+
// The spare must therefore land within seconds of register(),
225+
// not minutes -- the 5s park is never reached.
226226
//
227227
// The append below is incidental to the contract under test; it
228228
// does NOT cross the SegmentRing high-water mark for this 4-frame
229229
// segment (HEADER_SIZE 24 + FRAME_HEADER_SIZE 8 + 16 = 48 vs
230230
// signalAtBytes = (120 >> 2) * 3 = 90), so no producer-side wakeup
231231
// fires. The rotation/high-water wakeup paths are covered by
232-
// testRotationWakeupTriggersImmediateSparePrep.
232+
// testRotationWakeupTriggersImmediateSparePrep, and the
233+
// deterministic register-after-park case is covered by
234+
// testRegisterAfterWorkerParkedWakesWorker.
233235
long pollNanos = 5_000_000_000L; // 5 seconds
234236
long segSize = MmapSegment.HEADER_SIZE
235237
+ 4 * (MmapSegment.FRAME_HEADER_SIZE + 16);
@@ -256,6 +258,40 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception {
256258
});
257259
}
258260

261+
@Test
262+
public void testRegisterAfterWorkerParkedWakesWorker() throws Exception {
263+
TestUtils.assertMemoryLeak(() -> {
264+
// Deterministic version of testFirstSpareLandsBeforeFirstPoll:
265+
// sleep between start() and register() long enough for the worker
266+
// to definitely complete its first (empty) iteration and enter
267+
// parkNanos. Without register()'s wakeWorker() the spare would
268+
// not land for the full 5s poll interval; with it the spare lands
269+
// promptly because register() unparks the worker out of its park.
270+
// No append at all, so no producer-side wakeup can mask a missing
271+
// register-side wakeup.
272+
long pollNanos = 5_000_000_000L; // 5 seconds
273+
long segSize = MmapSegment.HEADER_SIZE
274+
+ 4 * (MmapSegment.FRAME_HEADER_SIZE + 16);
275+
MmapSegment seg0 = MmapSegment.create(tmpDir + "/0000000000000000.sfa", 0, segSize);
276+
try (SegmentRing ring = new SegmentRing(seg0, segSize);
277+
SegmentManager mgr = new SegmentManager(segSize, pollNanos)) {
278+
mgr.start();
279+
// Give the worker plenty of time to enter workerLoop, snapshot
280+
// an empty rings list, and reach parkNanos. 250ms is far more
281+
// than the OS scheduling + thread startup cost on any sane
282+
// CI runner, and still well below the 5s poll interval.
283+
Thread.sleep(250);
284+
long t0 = System.nanoTime();
285+
mgr.register(ring, tmpDir);
286+
assertTrue("register must wake a worker that has already parked",
287+
waitFor(() -> !ring.needsHotSpare(), 2000));
288+
long elapsedMs = (System.nanoTime() - t0) / 1_000_000L;
289+
assertTrue("spare arrived in " + elapsedMs + "ms -- should be <<5000ms",
290+
elapsedMs < 4000);
291+
}
292+
});
293+
}
294+
259295
@Test
260296
public void testRotationWakeupTriggersImmediateSparePrep() throws Exception {
261297
TestUtils.assertMemoryLeak(() -> {

0 commit comments

Comments
 (0)