Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/workerd/api/streams/internal-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "internal.h"
#include "readable.h"
#include "standard.h"
#include "writable.h"

#include <workerd/jsg/jsg-test.h>
Expand Down Expand Up @@ -352,5 +353,105 @@ KJ_TEST("WritableStreamInternalController observability") {
KJ_ASSERT(observer.queueSizeBytes == 0);
}

// Test for use-after-free fix in pipeLoop when abort is called during pending read.
// This tests the scenario where:
// 1. A JavaScript-backed ReadableStream is piped to an internal WritableStream
// 2. The pipeLoop is waiting for a read from the JS stream
// 3. abort() is called on the writable stream, which triggers drain()
// 4. drain() destroys the Pipe object
// 5. The pending read callback must not access the freed Pipe
//
// The fix ensures the Pipe::State is ref-counted and survives until all callbacks complete.
KJ_TEST("WritableStreamInternalController pipeLoop abort during pending read") {
capnp::MallocMessageBuilder message;
auto flags = message.initRoot<CompatibilityFlags>();
flags.setNodeJsCompat(true);
flags.setWorkerdExperimental(true);
flags.setStreamsJavaScriptControllers(true);
// Enable the flag that causes abort to call drain() immediately
flags.setInternalWritableStreamAbortClearsQueue(true);

TestFixture fixture({.featureFlags = flags.asReader()});

class MySink final: public WritableStreamSink {
public:
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
return kj::READY_NOW;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
kj::Promise<void> end() override {
return kj::READY_NOW;
}
void abort(kj::Exception reason) override {}
};

fixture.runInIoContext([&](const TestFixture::Environment& env) {
// Create a JavaScript-backed ReadableStream.
// The pull function will be called when the pipe tries to read.
// We use a JS-backed stream so that pipeLoop is used (not the kj pipe path).
//
// We need to simulate:
// 1. First read succeeds with some data
// 2. Second read is pending (the promise from pull is not resolved)
// 3. While pending, we abort the writable stream
//
// Using an UnderlyingSource with a pull callback that enqueues data once,
// then on the second call returns without enqueuing (leaving the read pending).

int pullCount = 0;
jsg::Ref<ReadableStream> source = ReadableStream::constructor(env.js,
UnderlyingSource{.pull =
[&pullCount](jsg::Lock& js, UnderlyingSource::Controller controller) {
pullCount++;
auto& c = KJ_ASSERT_NONNULL(controller.tryGet<jsg::Ref<ReadableStreamDefaultController>>());
if (pullCount == 1) {
// First pull: enqueue some data so the pipe loop can make progress
auto data = js.bytes(kj::heapArray<kj::byte>({1, 2, 3, 4}));
c->enqueue(js, data.getHandle(js));
}
// Second pull onwards: don't enqueue anything, leaving the read pending.
// This simulates an async data source that hasn't received data yet.
// The promise returned by read() will be pending.
return js.resolvedPromise();
}},
kj::none);

jsg::Ref<WritableStream> sink =
env.js.alloc<WritableStream>(env.context, kj::heap<MySink>(), kj::none);

// Start the pipe. This will:
// 1. Call pull() which enqueues data
// 2. pipeLoop reads the data and writes it to the sink
// 3. pipeLoop calls read() again, which calls pull()
// 4. pull() returns without enqueuing, so read() returns a pending promise
// 5. pipeLoop's callback is now waiting for that promise
auto pipeTo = source->pipeTo(env.js, sink.addRef(), PipeToOptions{});
pipeTo.markAsHandled(env.js);

// Run microtasks to let the pipe make progress (first read/write cycle)
env.js.runMicrotasks();

// At this point, pipeLoop should be waiting for the second read.
// Now abort the writable stream. This should:
// 1. Call doAbort() which calls drain()
// 2. drain() destroys the Pipe (setting state->aborted = true)
// 3. The pending read callback should check aborted and bail out safely

// Before the fix, this would cause a use-after-free when the pending callback
// tried to access the freed Pipe.
auto abortPromise = sink->getController().abort(env.js, env.js.v8TypeError("Test abort"_kj));
abortPromise.markAsHandled(env.js);

// Run microtasks to process the abort and any pending callbacks
env.js.runMicrotasks();

// If we get here without crashing, the test passes.
// The fix ensures that the Pipe::State survives until all callbacks complete.
KJ_ASSERT(pullCount >= 1); // Verify pull was called at least once
});
}

} // namespace
} // namespace workerd::api
Loading
Loading