Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build/deps/gen/deps/dep_capnp_cpp.MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
http = use_extension("@//:build/exts/http.bzl", "http")
http.archive(
name = "capnp-cpp",
sha256 = "fc67ba4a0dbf683ebac4ed3acc0c9b7366a0271766e23c4f4038febf5149a13b",
strip_prefix = "capnproto-capnproto-fd3cc9c/c++",
sha256 = "cbf7dcef02deb3a3addcfefacb76672c46a48c953024860bf80fceabc255d41d",
strip_prefix = "capnproto-capnproto-c1bce20/c++",
type = "tgz",
url = "https://github.com/capnproto/capnproto/tarball/fd3cc9ce8e66363bc70a88166e912788fa903173",
url = "https://github.com/capnproto/capnproto/tarball/c1bce2095a8dd76851fe3c1c61550f79b69d671d",
)
use_repo(http, "capnp-cpp")
56 changes: 42 additions & 14 deletions src/workerd/api/basics.c++
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ class AbortTriggerRpcClient final {

namespace {
// The jsrpc handler that receives aborts from the remote and triggers them locally
//
// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
// StreamSink-related code. For now I'm not trying to avoid duplication.
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
public:
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
Expand Down Expand Up @@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
return;
}

auto streamCap = externalHandler
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortTrigger();
}).castAs<rpc::AbortTrigger>();
auto triggerCap = [&]() -> rpc::AbortTrigger::Client {
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline();

externalHandler->write(
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortSignal(kj::mv(signal));
});

return pipeline.getTrigger();
} else {
return externalHandler
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortTrigger();
}).castAs<rpc::AbortTrigger>();
}
}();

auto& ioContext = IoContext::current();
// Keep track of every AbortSignal cloned from this one.
// If this->triggerAbort(...) is called, each rpcClient will be informed.
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(streamCap))));
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(triggerCap))));
}

jsg::Ref<AbortSignal> AbortSignal::deserialize(
Expand All @@ -890,20 +907,31 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
return js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);
}

auto reader = externalHandler->read();
KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag");

// The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort

auto signal = js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);

auto paf = kj::newPromiseAndFulfiller<void>();
auto pendingReason = IoContext::current().addObject(kj::refcounted<PendingReason>());
auto& ioctx = IoContext::current();

auto reader = externalHandler->read();
if (reader.isAbortTrigger()) {
// Old-style StreamSink.
// TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out.
auto paf = kj::newPromiseAndFulfiller<void>();
auto pendingReason = ioctx.addObject(kj::refcounted<PendingReason>());

externalHandler->setLastStream(
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise)));
signal->pendingReason = kj::mv(pendingReason);
} else {
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");

auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());

externalHandler->setLastStream(
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise)));
signal->pendingReason = kj::mv(pendingReason);
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
}

return signal;
}
Expand Down
5 changes: 2 additions & 3 deletions src/workerd/api/basics.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// TODO(cleanup): Rename to events.h?

#include <workerd/io/compatibility-date.capnp.h>
#include <workerd/io/external-pusher.h>
#include <workerd/io/io-own.h>
#include <workerd/io/worker-interface.capnp.h>
#include <workerd/jsg/jsg.h>
Expand Down Expand Up @@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget {
jsg::Optional<jsg::JsRef<jsg::JsValue>> maybeReason = kj::none,
Flag flag = Flag::NONE);

using PendingReason = kj::RefcountedWrapper<
kj::OneOf<kj::Array<kj::byte> /* v8Serialized */, kj::Exception /* if capability is dropped */
>>;
using PendingReason = ExternalPusherImpl::PendingAbortReason;

// The AbortSignal explicitly does not expose a constructor(). It is
// illegal for user code to create an AbortSignal directly.
Expand Down
67 changes: 47 additions & 20 deletions src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ jsg::Optional<uint32_t> ByteLengthQueuingStrategy::size(

namespace {

// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
// StreamSink-related code. For now I'm not trying to avoid duplication.

// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we
// wrap the two ends of the pipe in special adapters that track whether end() was called.
class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream {
Expand Down Expand Up @@ -688,18 +692,37 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
StreamEncoding encoding = controller.getPreferredEncoding();
auto expectedLength = controller.tryGetLength(encoding);

auto streamCap = externalHandler->writeStream(
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
auto rs = builder.initReadableStream();
rs.setEncoding(encoding);
KJ_IF_SOME(l, expectedLength) {
rs.getExpectedLength().setKnown(l);
capnp::ByteStream::Client streamCap = [&]() {
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0});
KJ_IF_SOME(el, expectedLength) {
req.setLengthPlusOne(el + 1);
}
auto pipeline = req.sendForPipeline();

externalHandler->write([encoding, expectedLength, source = pipeline.getSource()](
rpc::JsValue::External::Builder builder) mutable {
auto rs = builder.initReadableStream();
rs.setStream(kj::mv(source));
rs.setEncoding(encoding);
});

return pipeline.getSink();
} else {
return externalHandler
->writeStream(
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
auto rs = builder.initReadableStream();
rs.setEncoding(encoding);
KJ_IF_SOME(l, expectedLength) {
rs.getExpectedLength().setKnown(l);
}
}).castAs<capnp::ByteStream>();
}
});
}();

kj::Own<capnp::ExplicitEndOutputStream> kjStream =
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(
kj::mv(streamCap).castAs<capnp::ByteStream>());
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap));

auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx);

Expand Down Expand Up @@ -730,21 +753,25 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(

auto& ioctx = IoContext::current();

kj::Maybe<uint64_t> expectedLength;
auto el = rs.getExpectedLength();
if (el.isKnown()) {
expectedLength = el.getKnown();
}
kj::Own<kj::AsyncInputStream> in;
if (rs.hasStream()) {
in = ioctx.getExternalPusher()->unwrapStream(rs.getStream());
} else {
kj::Maybe<uint64_t> expectedLength;
auto el = rs.getExpectedLength();
if (el.isKnown()) {
expectedLength = el.getKnown();
}

auto pipe = kj::newOneWayPipe(expectedLength);
auto pipe = kj::newOneWayPipe(expectedLength);

auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);
auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);

auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
auto in =
kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);
auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
in = kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);

externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
}

return js.alloc<ReadableStream>(ioctx,
kj::heap<NoDeferredProxyReadableStream>(newSystemStream(kj::mv(in), encoding, ioctx), ioctx));
Expand Down
29 changes: 29 additions & 0 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2066,3 +2066,32 @@ export let sendServiceStubOverRpc = {
}
},
};

// Make sure that calls are delivered in e-order, even in the presence of pushed externals.
export let eOrderTest = {
async test(controller, env, ctx) {
let abortController = new AbortController();
let abortSignal = abortController.signal;

let readableController;
let readableStream = new ReadableStream({
start(c) {
readableController = c;
},
});

let stub = await env.MyService.makeCounter(0);

let promises = [];
promises.push(stub.increment(1));
promises.push(stub.increment(1));
promises.push(stub.increment(1, abortSignal));
promises.push(stub.increment(1));
promises.push(stub.increment(1, readableStream));
promises.push(stub.increment(1));

let results = await Promise.all(promises);

assert.deepEqual(results, [1, 2, 3, 4, 5, 6]);
},
};
Loading