Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 80 additions & 16 deletions src/workerd/server/container-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
kj::String imageName,
kj::Maybe<kj::String> containerEgressInterceptorImage,
kj::TaskSet& waitUntilTasks,
kj::Function<void()> cleanupCallback,
kj::Promise<void> pendingCleanup,
kj::Function<void(kj::Promise<void>)> cleanupCallback,
ChannelTokenHandler& channelTokenHandler)
: byteStreamFactory(byteStreamFactory),
timer(timer),
Expand All @@ -191,23 +192,29 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
imageName(kj::mv(imageName)),
containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImage)),
waitUntilTasks(waitUntilTasks),
pendingCleanup(kj::mv(pendingCleanup).fork()),
cleanupCallback(kj::mv(cleanupCallback)),
channelTokenHandler(channelTokenHandler) {}

ContainerClient::~ContainerClient() noexcept(false) {
stopEgressListener();

// Call the cleanup callback to remove this client from the ActorNamespace map
cleanupCallback();

// Sidecar shares main container's network namespace, so must be destroyed first
waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
// Sidecar shares main container's network namespace, so must be destroyed first.
auto sidecarCleanup = dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
kj::str("/containers/", sidecarContainerName, "?force=true"))
.ignoreResult());
.ignoreResult()
.catch_([](kj::Exception&&) {});

waitUntilTasks.add(dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
auto mainCleanup = dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::DELETE,
kj::str("/containers/", containerName, "?force=true"))
.ignoreResult());
.ignoreResult()
.catch_([](kj::Exception&&) {});

// Pass the joined cleanup promise to the callback. The callback wraps it with the
// canceler (so a future client creation can cancel it), stores it so the next
// ContainerClient can await it, and adds a branch to waitUntilTasks to keep the
// underlying I/O alive.
cleanupCallback(kj::joinPromises(kj::arr(kj::mv(sidecarCleanup), kj::mv(mainCleanup))));
}

// Docker-specific Port implementation that implements rpc::Container::Port::Server
Expand Down Expand Up @@ -417,14 +424,15 @@ static kj::Maybe<kj::String> gatewayForPlatform(kj::String gateway) {
#endif
}

kj::Promise<uint16_t> ContainerClient::startEgressListener(kj::String listenAddress) {
kj::Promise<uint16_t> ContainerClient::startEgressListener(
kj::String listenAddress, uint16_t port) {
auto service = kj::heap<EgressHttpService>(*this, headerTable);
auto httpServer = kj::heap<kj::HttpServer>(timer, headerTable, *service);
auto& httpServerRef = *httpServer;

egressHttpServer = httpServer.attach(kj::mv(service));

auto addr = co_await network.parseAddress(kj::str(listenAddress, ":0"));
auto addr = co_await network.parseAddress(kj::str(listenAddress, ":", port));
auto listener = addr->listen();

uint16_t chosenPort = listener->getPort();
Expand Down Expand Up @@ -534,6 +542,43 @@ kj::Promise<ContainerClient::InspectResponse> ContainerClient::inspectContainer(
co_return InspectResponse{.isRunning = running, .ports = kj::mv(portMappings)};
}

kj::Promise<kj::Maybe<uint16_t>> ContainerClient::inspectSidecarEgressPort() {
auto endpoint = kj::str("/containers/", sidecarContainerName, "/json");
auto response = co_await dockerApiRequest(
network, kj::str(dockerPath), kj::HttpMethod::GET, kj::mv(endpoint));

if (response.statusCode == 404) {
co_return kj::Maybe<uint16_t>(kj::none);
}

JSG_REQUIRE(response.statusCode == 200, Error, "Sidecar container inspect failed");

auto jsonRoot = decodeJsonResponse<docker_api::Docker::ContainerInspectResponse>(response.body);

// Check if sidecar is actually running
if (jsonRoot.hasState()) {
auto state = jsonRoot.getState();
if (state.hasStatus()) {
auto status = state.getStatus();
if (status != "running" && status != "restarting") {
co_return kj::Maybe<uint16_t>(kj::none);
}
}
}

// Parse args to find --http-egress-port value
if (jsonRoot.hasArgs()) {
auto args = jsonRoot.getArgs();
for (auto i = 0u; i < args.size(); i++) {
if (args[i] == "--http-egress-port" && i + 1 < args.size()) {
co_return kj::str(args[i + 1]).parseAs<uint16_t>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseAs<uint16_t>() throws if the value is not a valid integer. Since this is a recovery/best-effort path inside status(), a parse failure shouldn't crash the status check. Consider catching the exception and returning kj::none:

Suggested change
co_return kj::str(args[i + 1]).parseAs<uint16_t>();
auto parsed = kj::str(args[i + 1]).tryParseAs<uint16_t>();
KJ_IF_SOME(port, parsed) {
co_return port;
}
co_return kj::Maybe<uint16_t>(kj::none);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a parse failure has to crash the status check as its unexpected state that we dont have the flag set!

}
}
}

co_return kj::Maybe<uint16_t>(kj::none);
}

kj::Promise<void> ContainerClient::createContainer(
kj::Maybe<capnp::List<capnp::Text>::Reader> entrypoint,
kj::Maybe<capnp::List<capnp::Text>::Reader> environment,
Expand Down Expand Up @@ -701,7 +746,6 @@ kj::Promise<void> ContainerClient::createSidecarContainer(
// Sidecar needs NET_ADMIN capability for iptables/TPROXY
auto capAdd = hostConfig.initCapAdd(1);
capAdd.set(0, "NET_ADMIN");
hostConfig.setAutoRemove(true);

auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST,
kj::str("/containers/create?name=", sidecarContainerName), codec.encode(jsonRoot));
Expand Down Expand Up @@ -755,8 +799,29 @@ ContainerClient::RpcTurn ContainerClient::getRpcTurn() {
}

kj::Promise<void> ContainerClient::status(StatusContext context) {
// Wait for any pending cleanup from a previous ContainerClient (Docker DELETE).
// If the cleanup was already cancelled via containerCleanupCanceler the .catch_()
// in the destructor resolves it immediately, so this is a no-op in that case.
co_await pendingCleanup.addBranch();

auto [ready, done] = getRpcTurn();
co_await ready;
KJ_DEFER(done->fulfill());

const auto [isRunning, _ports] = co_await inspectContainer();
containerStarted.store(isRunning, std::memory_order_release);

if (isRunning && containerEgressInterceptorImage != kj::none) {
// If the sidecar container is already running (e.g. workerd restarted while
// containers stayed up), recover the egress port it was started with and
// start the host-side egress listener on that same port so the sidecar can
// reconnect.
KJ_IF_SOME(port, co_await inspectSidecarEgressPort()) {
containerSidecarStarted.store(true, std::memory_order_release);
co_await ensureEgressListenerStarted(port);
}
}

context.getResults().setRunning(isRunning);
}

Expand Down Expand Up @@ -788,8 +853,7 @@ kj::Promise<void> ContainerClient::start(StartContext context) {
// for now. In the future, it will always run when a user container is running
if (!egressMappings.empty()) {
// The user container will be blocked on network connectivity until this finishes.
// When workerd-network is more battle-tested and goes out of experimental so it's non-optional,
// we should make the sidecar start first and _then_ make the user container join the sidecar network.
containerSidecarStarted = false;
co_await ensureSidecarStarted();
}

Expand Down Expand Up @@ -898,7 +962,7 @@ kj::Promise<void> ContainerClient::ensureSidecarStarted() {
co_await startSidecarContainer();
}

kj::Promise<void> ContainerClient::ensureEgressListenerStarted() {
kj::Promise<void> ContainerClient::ensureEgressListenerStarted(uint16_t port) {
if (egressListenerStarted.exchange(true, std::memory_order_acquire)) {
co_return;
}
Expand All @@ -910,7 +974,7 @@ kj::Promise<void> ContainerClient::ensureEgressListenerStarted() {
// routes host-gateway to host loopback through the VM).
auto ipamConfig = co_await getDockerBridgeIPAMConfig();
egressListenerPort = co_await startEgressListener(
gatewayForPlatform(kj::mv(ipamConfig.gateway)).orDefault(kj::str("127.0.0.1")));
gatewayForPlatform(kj::mv(ipamConfig.gateway)).orDefault(kj::str("127.0.0.1")), port);
}

kj::Promise<void> ContainerClient::setEgressHttp(SetEgressHttpContext context) {
Expand Down
40 changes: 33 additions & 7 deletions src/workerd/server/container-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@

namespace workerd::server {

// Tracks the canceler and cleanup promise for a Docker container's lifecycle cleanup.
// Useful to await on async calls of a ContainerClient destructor when the new
// one appears before they've been resolved.
struct ContainerCleanupState {
// Canceler that wraps the promise fired in ~ContainerClient. Replacing
// it cancels any pending cleanup, which resolves the promise immediately.
kj::Own<kj::Canceler> canceler;

// Forked cleanup promise. A branch is added to waitUntilTasks to keep the I/O alive,
// and another branch is passed to the next ContainerClient so its status() can await.
kj::ForkedPromise<void> promise = kj::Promise<void>(kj::READY_NOW).fork();
};

// Docker-based implementation that implements the rpc::Container::Server interface
// so it can be used as a rpc::Container::Client via kj::heap<ContainerClient>().
// This allows the Container JSG class to use Docker directly without knowing
Expand All @@ -41,7 +54,8 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
kj::String imageName,
kj::Maybe<kj::String> containerEgressInterceptorImage,
kj::TaskSet& waitUntilTasks,
kj::Function<void()> cleanupCallback,
kj::Promise<void> pendingCleanup,
kj::Function<void(kj::Promise<void>)> cleanupCallback,
ChannelTokenHandler& channelTokenHandler);

~ContainerClient() noexcept(false);
Expand Down Expand Up @@ -74,6 +88,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte

kj::TaskSet& waitUntilTasks;

// Forked promise representing pending cleanup from a previous ContainerClient for the same
// container ID. status() co_awaits a branch so that Docker inspect only runs after any
// in-flight DELETE from the previous client has settled (either completed or been cancelled
// via containerCleanupCanceler, in which case the .catch_() resolves it immediately).
kj::ForkedPromise<void> pendingCleanup;

static constexpr kj::StringPtr defaultEnv[] = {"CLOUDFLARE_COUNTRY_A2=XX"_kj,
"CLOUDFLARE_DEPLOYMENT_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"_kj,
"CLOUDFLARE_LOCATION=loc01"_kj, "CLOUDFLARE_REGION=REGN"_kj,
Expand Down Expand Up @@ -108,6 +128,9 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
kj::String endpoint,
kj::Maybe<kj::String> body = kj::none);
kj::Promise<InspectResponse> inspectContainer();
// Inspect the sidecar container and extract the --http-egress-port from its args.
// Returns kj::none if the sidecar doesn't exist or is not running.
kj::Promise<kj::Maybe<uint16_t>> inspectSidecarEgressPort();
kj::Promise<void> createContainer(kj::Maybe<capnp::List<capnp::Text>::Reader> entrypoint,
kj::Maybe<capnp::List<capnp::Text>::Reader> environment,
rpc::Container::StartParams::Reader params);
Expand All @@ -122,8 +145,10 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
kj::Promise<void> destroySidecarContainer();
kj::Promise<void> monitorSidecarContainer();

// Cleanup callback to remove from ActorNamespace map when destroyed
kj::Function<void()> cleanupCallback;
// Cleanup callback invoked from the destructor. Receives the joined cleanup promise so
// ActorNamespace can wrap it with the canceler, store it for the next ContainerClient
// to await, and add a branch to waitUntilTasks to keep the cleanup tasks alive.
kj::Function<void(kj::Promise<void>)> cleanupCallback;

// For redeeming channel tokens received via setEgressHttp
ChannelTokenHandler& channelTokenHandler;
Expand Down Expand Up @@ -172,12 +197,13 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
// Check if the Docker daemon has IPv6 enabled by inspecting the default bridge network's
// IPAM config for IPv6 subnets.
kj::Promise<bool> isDaemonIpv6Enabled();
// Start the egress listener on the given address with an OS-chosen port.
kj::Promise<uint16_t> startEgressListener(kj::String listenAddress);
// Start the egress listener on the given address. If port is 0, an OS-chosen port is used.
kj::Promise<uint16_t> startEgressListener(kj::String listenAddress, uint16_t port = 0);
void stopEgressListener();
// Ensure the egress listener is started exactly once.
// Uses egressListenerStarted as a guard. Called from setEgressHttp().
kj::Promise<void> ensureEgressListenerStarted();
// Uses egressListenerStarted as a guard. Called from setEgressHttp() and status().
// If port is non-zero, binds to that specific port (for reconnecting to an existing sidecar).
kj::Promise<void> ensureEgressListenerStarted(uint16_t port = 0);
// Ensure the egress listener and sidecar container are started exactly once.
// Uses containerSidecarStarted as a guard. Called from both start() and setEgressHttp().
kj::Promise<void> ensureSidecarStarted();
Expand Down
56 changes: 48 additions & 8 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2855,15 +2855,52 @@ class Server::WorkerService final: public Service,
auto& dockerPathRef = KJ_ASSERT_NONNULL(
dockerPath, "dockerPath must be defined to enable containers on this Durable Object.");

// Remove from the map when the container is destroyed
kj::Function<void()> cleanupCallback = [this, containerId = kj::str(containerId)]() {
containerClients.erase(containerId);
// Grab a branch of any pending cleanup from a previous ContainerClient for this
// container. If it exists, pass it to the container client so it knows that it has to sync.
kj::Promise<void> previousCleanup = kj::READY_NOW;
KJ_IF_SOME(state, containerCleanupState.find(containerId)) {
previousCleanup = state.promise.addBranch();
}

// Upsert the cleanup state for this container ID. Replacing the
// canceler auto-cancels any in-flight cleanup tasks from the previous
// client's destructor.
auto canceler = kj::heap<kj::Canceler>();
auto* cancelerPtr = canceler.get();
containerCleanupState.upsert(kj::str(containerId),
ContainerCleanupState{.canceler = kj::mv(canceler)},
[](ContainerCleanupState& existing, ContainerCleanupState&& incoming) {
existing.canceler = kj::mv(incoming.canceler);
});

// Cleanup callback: invoked from the ContainerClient destructor with the joined
// with a cleanup promise
kj::Function<void(kj::Promise<void>)> cleanupCallback =
[this, containerId = kj::str(containerId), cancelerPtr](
kj::Promise<void> cleanupPromise) mutable {
KJ_IF_SOME(state, containerCleanupState.find(containerId)) {
if (state.canceler.get() != cancelerPtr) {
// A newer ContainerClient has replaced us already with another destructor.
// drop the promise.
return;
}

containerClients.erase(containerId);
// Wrap with the canceler so a future client creation can cancel these
// tasks
auto cancellable =
state.canceler->wrap(kj::mv(cleanupPromise)).catch_([](kj::Exception&&) {});

auto forked = kj::mv(cancellable).fork();
waitUntilTasks.add(forked.addBranch());
state.promise = kj::mv(forked);
}
};

auto client = kj::refcounted<ContainerClient>(byteStreamFactory, timer, dockerNetwork,
kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName),
containerEgressInterceptorImage.map([](kj::StringPtr s) { return kj::str(s); }),
waitUntilTasks, kj::mv(cleanupCallback), channelTokenHandler);
waitUntilTasks, kj::mv(previousCleanup), kj::mv(cleanupCallback), channelTokenHandler);

// Store raw pointer in map (does not own)
containerClients.insert(kj::str(containerId), client.get());
Expand Down Expand Up @@ -2895,16 +2932,19 @@ class Server::WorkerService final: public Service,
// declare `actorStorage` before `actors`.
kj::Maybe<ActorStorage> actorStorage;

// If the actor is broken, we remove it from the map. However, if it's just evicted due to
// inactivity, we keep the ActorContainer in the map but drop the Own<Worker::Actor>. When a new
// request comes in, we recreate the Own<Worker::Actor>.
ActorMap actors;
// Per-container cleanup state: canceler + forked cleanup promise.
kj::HashMap<kj::String, ContainerCleanupState> containerCleanupState;

// Map of container IDs to ContainerClients (for reconnection support with inactivity timeouts).
// The map holds raw pointers (not ownership) - ContainerClients are owned by actors and timers.
// When the last reference is dropped, the destructor removes the entry from this map.
kj::HashMap<kj::String, ContainerClient*> containerClients;

// If the actor is broken, we remove it from the map. However, if it's just evicted due to
// inactivity, we keep the ActorContainer in the map but drop the Own<Worker::Actor>. When a new
// request comes in, we recreate the Own<Worker::Actor>.
ActorMap actors;

kj::Maybe<kj::Promise<void>> cleanupTask;
kj::Timer& timer;
capnp::ByteStreamFactory& byteStreamFactory;
Expand Down
Loading
Loading