Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/workerd/api/streams-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,16 @@ KJ_TEST("PumpToReader regression") {
}

kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return kj::NEVER_DONE;
events.add(kj::str("got the write"));
paf.fulfiller->fulfill();
// Concatenate pieces into a single buffer for the pipe write.
kj::Vector<byte> data;
for (auto& piece: pieces) {
data.addAll(piece);
}
auto arr = data.releaseAsArray();
return pipe.ends[0]->write(arr).attach(
kj::mv(arr), kj::defer([this] { events.add(kj::str("write promise was dropped")); }));
}

kj::Promise<void> end() override {
Expand Down
36 changes: 36 additions & 0 deletions src/workerd/api/streams/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ jsg::Promise<DrainingReadResult> ValueQueue::Consumer::drainingRead(jsg::Lock& j
size_t prevChunkCount = chunks.size();
bool pullCompletedSync = listener.onConsumerWantsData(js);

// The pull callback may have closed or errored the consumer, which
// destroys the Ready state (and its RingBuffer). We must not touch
// `ready` after that.
if (!impl.state.isActive()) break;

// Drain all buffered data that was added by the pull.
KJ_IF_SOME(errorPromise, drainBuffer(js, impl, ready, chunks, totalRead, isClosing)) {
return kj::mv(errorPromise);
Expand All @@ -244,6 +249,19 @@ jsg::Promise<DrainingReadResult> ValueQueue::Consumer::drainingRead(jsg::Lock& j
}
}

// If the consumer was closed or errored during pumping, the `ready`
// reference is dangling. Return what we have or the appropriate error.
if (!impl.state.isActive()) {
KJ_IF_SOME(errored, impl.state.tryGetErrorUnsafe()) {
return js.rejectedPromise<DrainingReadResult>(errored.reason.getHandle(js));
}
// Closed — all data was already drained. Return collected chunks.
return js.resolvedPromise(DrainingReadResult{
.chunks = chunks.releaseAsArray(),
.done = true,
});
}

// If we collected data, return it immediately.
if (!chunks.empty() || isClosing) {
ready.hasPendingDrainingRead = false;
Expand Down Expand Up @@ -656,6 +674,11 @@ jsg::Promise<DrainingReadResult> ByteQueue::Consumer::drainingRead(jsg::Lock& js
size_t prevChunkCount = chunks.size();
bool pullCompletedSync = listener.onConsumerWantsData(js);

// The pull callback may have closed or errored the consumer, which
// destroys the Ready state (and its RingBuffer). We must not touch
// `ready` after that.
if (!impl.state.isActive()) break;

// Drain all buffered data that was added by the pull.
drainBuffer(ready, chunks, totalRead, isClosing);

Expand All @@ -666,6 +689,19 @@ jsg::Promise<DrainingReadResult> ByteQueue::Consumer::drainingRead(jsg::Lock& js
}
}

// If the consumer was closed or errored during pumping, the `ready`
// reference is dangling. Return what we have or the appropriate error.
if (!impl.state.isActive()) {
KJ_IF_SOME(errored, impl.state.tryGetErrorUnsafe()) {
return js.rejectedPromise<DrainingReadResult>(errored.reason.getHandle(js));
}
// Closed — all data was already drained. Return collected chunks.
return js.resolvedPromise(DrainingReadResult{
.chunks = chunks.releaseAsArray(),
.done = true,
});
}

// If we collected data, return it immediately.
if (!chunks.empty() || isClosing) {
ready.hasPendingDrainingRead = false;
Expand Down
79 changes: 79 additions & 0 deletions src/workerd/api/streams/standard-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,85 @@ KJ_TEST("DrainingReader read from byte stream with BYOB support") {
});
}

KJ_TEST("DrainingReader error during pull in value stream") {
// Test: Calling controller.error() synchronously inside pull during a draining
// read must not cause a use-after-free. The pull callback transitions the
// ConsumerImpl from Ready→Errored which destroys the Ready struct (and its
// RingBuffer). The drainingRead loop must detect this and stop.
preamble([](jsg::Lock& js) {
auto rs = js.alloc<ReadableStream>(newReadableStreamJsController());
// clang-format off
rs->getController().setup(js, UnderlyingSource{
.pull = [](jsg::Lock& js, UnderlyingSource::Controller controller) {
KJ_SWITCH_ONEOF(controller) {
KJ_CASE_ONEOF(c, jsg::Ref<ReadableStreamDefaultController>) {
c->enqueue(js, toBytes(js, kj::str("before-error")));
c->error(js, js.error("deliberate error"));
return js.resolvedPromise();
}
KJ_CASE_ONEOF(c, jsg::Ref<ReadableByteStreamController>) {}
}
KJ_UNREACHABLE;
}
}, StreamQueuingStrategy{.highWaterMark = 0});
// clang-format on

KJ_IF_SOME(reader, DrainingReader::create(js, *rs)) {
bool readCompleted = false;
auto promise = reader->read(js).then(js, [&](jsg::Lock& js, DrainingReadResult&& result) {
KJ_FAIL_ASSERT("Should have rejected, not resolved");
}, [&](jsg::Lock& js, jsg::Value&& err) {
// The draining read should reject with the error from controller.error().
readCompleted = true;
});

js.runMicrotasks();
KJ_ASSERT(readCompleted, "Read should have completed with rejection");

reader->releaseLock(js);
} else {
KJ_FAIL_ASSERT("Failed to create DrainingReader");
}
});
}

KJ_TEST("DrainingReader error during pull in byte stream") {
// Test: Same as above but for byte streams (ByteQueue path).
preamble([](jsg::Lock& js) {
auto rs = js.alloc<ReadableStream>(newReadableStreamJsController());
// clang-format off
rs->getController().setup(js, UnderlyingSource{
.type = kj::str("bytes"),
.pull = [](jsg::Lock& js, UnderlyingSource::Controller controller) {
KJ_SWITCH_ONEOF(controller) {
KJ_CASE_ONEOF(c, jsg::Ref<ReadableStreamDefaultController>) {}
KJ_CASE_ONEOF(c, jsg::Ref<ReadableByteStreamController>) {
c->enqueue(js, toBufferSource(js, kj::str("before-error")));
c->error(js, js.error("deliberate error"));
return js.resolvedPromise();
}
}
KJ_UNREACHABLE;
}
}, StreamQueuingStrategy{.highWaterMark = 0});
// clang-format on

KJ_IF_SOME(reader, DrainingReader::create(js, *rs)) {
bool readCompleted = false;
auto promise = reader->read(js).then(js, [&](jsg::Lock& js, DrainingReadResult&& result) {
KJ_FAIL_ASSERT("Should have rejected, not resolved");
}, [&](jsg::Lock& js, jsg::Value&& err) { readCompleted = true; });

js.runMicrotasks();
KJ_ASSERT(readCompleted, "Read should have completed with rejection");

reader->releaseLock(js);
} else {
KJ_FAIL_ASSERT("Failed to create DrainingReader");
}
});
}

KJ_TEST("DrainingReader read from stream with transform-like pattern") {
// Test: DrainingReader works correctly with a stream that simulates the
// TransformStream pattern where data is written to writable and read from readable
Expand Down
Loading