Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 20 additions & 29 deletions src/workerd/server/alarm-scheduler.c++
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ std::default_random_engine makeSeededRandomEngine() {

} // namespace

AlarmScheduler::AlarmScheduler(
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path)
AlarmScheduler::AlarmScheduler(const kj::Clock& clock,
kj::Timer& timer,
const SqliteDatabase::Vfs& vfs,
kj::Path path,
GetActorFn getActor)
: clock(clock),
timer(timer),
random(makeSeededRandomEngine()),
getActor(kj::mv(getActor)),
db([&] {
auto db = kj::heap<SqliteDatabase>(vfs, kj::mv(path),
kj::WriteMode::CREATE | kj::WriteMode::MODIFY | kj::WriteMode::CREATE_PARENT);
Expand All @@ -49,10 +53,8 @@ void AlarmScheduler::ensureInitialized(SqliteDatabase& db) {

db.run(R"(
CREATE TABLE IF NOT EXISTS _cf_ALARM (
actor_unique_key TEXT,
actor_id TEXT,
scheduled_time INTEGER,
PRIMARY KEY (actor_unique_key, actor_id)
actor_id TEXT PRIMARY KEY,
scheduled_time INTEGER
) WITHOUT ROWID;
)");
}
Expand All @@ -63,27 +65,22 @@ void AlarmScheduler::loadAlarmsFromDb() {
// TODO(someday): don't maintain the entire alarm set in memory -- right now for the usecase of
// local development, doing so is sufficient.
auto query = db->run(R"(
SELECT actor_unique_key, actor_id, scheduled_time FROM _cf_ALARM;
SELECT actor_id, scheduled_time FROM _cf_ALARM;
)");

while (!query.isDone()) {
auto date = kj::UNIX_EPOCH + (kj::NANOSECONDS * query.getInt64(2));
auto date = kj::UNIX_EPOCH + (kj::NANOSECONDS * query.getInt64(1));

auto ownUniqueKey = kj::str(query.getText(0));
auto ownActorId = kj::str(query.getText(1));
auto actor = kj::attachVal(ActorKey{.uniqueKey = ownUniqueKey, .actorId = ownActorId},
kj::mv(ownUniqueKey), kj::mv(ownActorId));
auto ownActorId = kj::str(query.getText(0));
auto actor = kj::attachVal(ActorKey{.actorId = ownActorId}, kj::mv(ownActorId));
auto& actorRef = *actor;

alarms.insert(*actor, scheduleAlarm(now, kj::mv(actor), date));
alarms.insert(actorRef, scheduleAlarm(now, kj::mv(actor), date));

query.nextRow();
}
}

void AlarmScheduler::registerNamespace(kj::StringPtr uniqueKey, GetActorFn getActor) {
namespaces.insert(uniqueKey, Namespace{.getActor = kj::mv(getActor)});
}

kj::Maybe<kj::Date> AlarmScheduler::getAlarm(ActorKey actor) {
// TODO(someday): Might be able to simplify AlarmScheduler somewhat, now that ActorSqlite no
// longer relies on it for getAlarm()?
Expand All @@ -103,16 +100,14 @@ kj::Maybe<kj::Date> AlarmScheduler::getAlarm(ActorKey actor) {

bool AlarmScheduler::setAlarm(ActorKey actor, kj::Date scheduledTime) {
int64_t scheduledTimeNs = (scheduledTime - kj::UNIX_EPOCH) / kj::NANOSECONDS;
auto query = stmtSetAlarm.run(actor.uniqueKey, actor.actorId, scheduledTimeNs);
auto query = stmtSetAlarm.run(actor.actorId, scheduledTimeNs);

bool existing = true;
auto& entry = alarms.findOrCreate(actor, [&]() {
existing = false;

auto ownUniqueKey = kj::str(actor.uniqueKey);
auto ownActorId = kj::str(actor.actorId);
auto ownActor = kj::attachVal(ActorKey{.uniqueKey = ownUniqueKey, .actorId = ownActorId},
kj::mv(ownUniqueKey), kj::mv(ownActorId));
auto ownActor = kj::attachVal(ActorKey{.actorId = ownActorId}, kj::mv(ownActorId));

return decltype(alarms)::Entry{
*ownActor, scheduleAlarm(clock.now(), kj::mv(ownActor), scheduledTime)};
Expand All @@ -132,7 +127,7 @@ bool AlarmScheduler::setAlarm(ActorKey actor, kj::Date scheduledTime) {
}

bool AlarmScheduler::deleteAlarm(ActorKey actor) {
auto query = stmtDeleteAlarm.run(actor.uniqueKey, actor.actorId);
auto query = stmtDeleteAlarm.run(actor.actorId);

KJ_IF_SOME(entry, alarms.findEntry(actor)) {
KJ_IF_SOME(queued, entry.value.queuedAlarm) {
Expand All @@ -155,14 +150,10 @@ bool AlarmScheduler::deleteAlarm(ActorKey actor) {

kj::Promise<AlarmScheduler::RetryInfo> AlarmScheduler::runAlarm(
const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount) {
KJ_IF_SOME(ns, namespaces.find(actor.uniqueKey)) {
auto result = co_await ns.getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, retryCount);
auto result = co_await getActor(kj::str(actor.actorId))->runAlarm(scheduledTime, retryCount);

co_return RetryInfo{.retry = result.outcome != EventOutcome::OK && result.retry,
.retryCountsAgainstLimit = result.retryCountsAgainstLimit};
} else {
throw KJ_EXCEPTION(FAILED, "uniqueKey for stored alarm was not registered?");
}
co_return RetryInfo{.retry = result.outcome != EventOutcome::OK && result.retry,
.retryCountsAgainstLimit = result.retryCountsAgainstLimit};
}

AlarmScheduler::ScheduledAlarm AlarmScheduler::scheduleAlarm(
Expand Down
32 changes: 11 additions & 21 deletions src/workerd/server/alarm-scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,22 @@

namespace workerd::server {

using byte = kj::byte;

struct ActorKey {
kj::StringPtr uniqueKey;
kj::StringPtr actorId;

bool operator==(const ActorKey& other) const {
return uniqueKey == other.uniqueKey && actorId == other.actorId;
return actorId == other.actorId;
}

kj::Own<ActorKey> clone() const {
auto ownUniqueKey = kj::str(uniqueKey);
auto ownActorId = kj::str(actorId);

return kj::attachVal(ActorKey{.uniqueKey = ownUniqueKey, .actorId = ownActorId},
kj::mv(ownUniqueKey), kj::mv(ownActorId));
return kj::attachVal(ActorKey{.actorId = ownActorId}, kj::mv(ownActorId));
}
};

inline uint KJ_HASHCODE(const ActorKey& k) {
return kj::hashCode(k.uniqueKey, k.actorId);
return kj::hashCode(k.actorId);
}

// Allows scheduling alarm executions at specific times, returning a promise representing
Expand All @@ -60,25 +55,22 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler {

using GetActorFn = kj::Function<kj::Own<WorkerInterface>(kj::String)>;

AlarmScheduler(
const kj::Clock& clock, kj::Timer& timer, const SqliteDatabase::Vfs& vfs, kj::Path path);
AlarmScheduler(const kj::Clock& clock,
kj::Timer& timer,
const SqliteDatabase::Vfs& vfs,
kj::Path path,
GetActorFn getActor);

kj::Maybe<kj::Date> getAlarm(ActorKey actor);
bool setAlarm(ActorKey actor, kj::Date scheduledTime);
bool deleteAlarm(ActorKey actor);

void registerNamespace(kj::StringPtr uniqueKey, GetActorFn getActor);

private:
enum class AlarmStatus { WAITING, STARTED, FINISHED };
const kj::Clock& clock;
kj::Timer& timer;
std::default_random_engine random;

struct Namespace {
GetActorFn getActor;
};
kj::HashMap<kj::StringPtr, Namespace> namespaces;
GetActorFn getActor;
kj::Own<SqliteDatabase> db;
kj::TaskSet tasks;

Expand Down Expand Up @@ -112,8 +104,6 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler {
kj::Promise<RetryInfo> runAlarm(
const ActorKey& actor, kj::Date scheduledTime, uint32_t retryCount);

void setAlarmInMemory(kj::Own<ActorKey> actor, kj::Date scheduledTime);

ScheduledAlarm scheduleAlarm(kj::Date now, kj::Own<ActorKey> actor, kj::Date scheduledTime);

kj::Promise<void> makeAlarmTask(
Expand All @@ -122,11 +112,11 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler {
kj::Promise<void> checkTimestamp(kj::Duration delay, kj::Date scheduledTime);

SqliteDatabase::Statement stmtSetAlarm = db->prepare(R"(
INSERT INTO _cf_ALARM VALUES(?, ?, ?)
INSERT INTO _cf_ALARM VALUES(?, ?)
ON CONFLICT DO UPDATE SET scheduled_time = excluded.scheduled_time;
)");
SqliteDatabase::Statement stmtDeleteAlarm = db->prepare(R"(
DELETE FROM _cf_ALARM WHERE actor_unique_key = ? AND actor_id = ?
DELETE FROM _cf_ALARM WHERE actor_id = ?
)");

void taskFailed(kj::Exception&& exception) override;
Expand Down
119 changes: 110 additions & 9 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2247,12 +2247,13 @@ KJ_TEST("Server: Durable Objects (on disk)") {
conn.httpGet200("/bar",
"02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79: http://foo/bar 2");

// The storage directory contains .sqlite and .sqlite-wal files for both objects. Note that
// the `-shm` files are missing because SQLite doesn't actually tell the VFS to create these
// as separate files, it leaves it up to the VFS to decide how shared memory works, and our
// KJ-wrapping VFS currently doesn't put this in SHM files. If we were using a real disk
// directory, though, they would be there.
KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 4);
// The storage directory contains .sqlite and .sqlite-wal files for both objects, plus the
// per-namespace metadata.sqlite (alarm scheduler) and its WAL file. Note that the `-shm`
// files are missing because SQLite doesn't actually tell the VFS to create these as separate
// files, it leaves it up to the VFS to decide how shared memory works, and our KJ-wrapping
// VFS currently doesn't put this in SHM files. If we were using a real disk directory,
// though, they would be there.
KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 6);
KJ_EXPECT(dir->exists(kj::Path(
{"mykey", "02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79.sqlite"})));
KJ_EXPECT(dir->exists(kj::Path(
Expand All @@ -2261,10 +2262,12 @@ KJ_TEST("Server: Durable Objects (on disk)") {
{"mykey", "59002eb8cf872e541722977a258a12d6a93bbe8192b502e1c0cb250aa91af234.sqlite"})));
KJ_EXPECT(dir->exists(kj::Path(
{"mykey", "59002eb8cf872e541722977a258a12d6a93bbe8192b502e1c0cb250aa91af234.sqlite-wal"})));
KJ_EXPECT(dir->exists(kj::Path({"mykey", "metadata.sqlite"})));
KJ_EXPECT(dir->exists(kj::Path({"mykey", "metadata.sqlite-wal"})));
}

// Having torn everything down, the WAL files should be gone.
KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 2);
KJ_EXPECT(dir->openSubdir(kj::Path({"mykey"}))->listNames().size() == 3);
Comment thread
kentonv marked this conversation as resolved.
KJ_EXPECT(dir->exists(kj::Path(
{"mykey", "02b496f65dd35cbac90e3e72dc5a398ee93926ea4a3821e26677082d2e6f9b79.sqlite"})));
KJ_EXPECT(dir->exists(kj::Path(
Expand All @@ -2290,6 +2293,102 @@ KJ_TEST("Server: Durable Objects (on disk)") {
}
}

KJ_TEST("Server: Durable Object alarm persistence (on disk)") {
kj::StringPtr config = R"((
services = [
( name = "hello",
worker = (
compatibilityDate = "2024-01-01",
modules = [
( name = "main.js",
esModule =
`export default {
` async fetch(request, env) {
` let id = env.ns.idFromName("alarm-actor")
` let actor = env.ns.get(id)
` return await actor.fetch(request)
` }
`}
`export class MyActorClass {
` constructor(state, env) {
` this.storage = state.storage;
` }
` async fetch(request) {
` let url = new URL(request.url);
` if (url.pathname === "/set") {
` let time = parseInt(url.searchParams.get("t"));
` await this.storage.setAlarm(time);
` return new Response("alarm set to " + time);
` } else if (url.pathname === "/get") {
` let alarm = await this.storage.getAlarm();
` return new Response("alarm=" + alarm);
` } else {
` return new Response("unknown path", {status: 404});
` }
` }
` async alarm() {}
`}
)
],
bindings = [(name = "ns", durableObjectNamespace = "MyActorClass")],
durableObjectNamespaces = [
( className = "MyActorClass",
uniqueKey = "alarmkey",
)
],
durableObjectStorage = (localDisk = "my-disk")
)
),
( name = "my-disk",
disk = (
path = "../../var/do-storage",
writable = true,
)
),
],
sockets = [
( name = "main",
address = "test-addr",
service = "hello"
)
]
))"_kj;

auto dir = kj::newInMemoryDirectory(kj::nullClock());

// A far-future alarm time (won't fire during the test).
kj::StringPtr alarmTime = "4102444800000";
Comment thread
kentonv marked this conversation as resolved.

{
TestServer test(config);
test.root->transfer(kj::Path({"var"_kj, "do-storage"_kj}),
kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT, *dir, nullptr,
kj::TransferMode::LINK);

test.start();
auto conn = test.connect("test-addr");

conn.httpGet200(kj::str("/set?t=", alarmTime), kj::str("alarm set to ", alarmTime));
conn.httpGet200("/get", kj::str("alarm=", alarmTime));
}

// Verify metadata.sqlite exists on disk in the namespace directory.
KJ_EXPECT(dir->exists(kj::Path({"alarmkey", "metadata.sqlite"})));

// Start a new server and verify the alarm is still there.
{
TestServer test(config);
test.root->transfer(kj::Path({"var"_kj, "do-storage"_kj}),
kj::WriteMode::CREATE | kj::WriteMode::CREATE_PARENT, *dir, nullptr,
kj::TransferMode::LINK);

test.start();
auto conn = test.connect("test-addr");

conn.httpGet200("/get", kj::str("alarm=", alarmTime));
}
}

KJ_TEST("Server: Ephemeral Objects") {
TestServer test(R"((
services = [
Expand Down Expand Up @@ -4974,9 +5073,11 @@ KJ_TEST("Server: Durable Object facets") {
kj::Path({"3652ef6221834806dc8df802d1d216e27b7d07e0a6b7adf6cfdaeec90f06459a.5.sqlite"})));

// We didn't create any other durable objects in the namespace. All files in the namespace should
// be prefixed with our one DO ID.
// be prefixed with our one DO ID, except for metadata.sqlite (the per-namespace alarm scheduler).
for (auto& name: nsDir->listNames()) {
KJ_EXPECT(name.startsWith("3652ef6221834806dc8df802d1d216e27b7d07e0a6b7adf6cfdaeec90f06459a."),
KJ_EXPECT(
name.startsWith("3652ef6221834806dc8df802d1d216e27b7d07e0a6b7adf6cfdaeec90f06459a.") ||
name.startsWith("metadata.sqlite"),
"unexpected file found in namespace storage", name);
}

Expand Down
Loading
Loading