-
-
Notifications
You must be signed in to change notification settings - Fork 34.2k
src: add support for AbortSignal in backup method #59333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,32 @@ | ||
| 'use strict'; | ||
| const { | ||
| PromiseReject, | ||
| } = primordials; | ||
| const { | ||
| AbortError, | ||
| } = require('internal/errors'); | ||
| const { | ||
| validateAbortSignal, | ||
| validateObject, | ||
| validateString, | ||
| } = require('internal/validators'); | ||
| const { emitExperimentalWarning } = require('internal/util'); | ||
|
|
||
| emitExperimentalWarning('SQLite'); | ||
|
|
||
| module.exports = internalBinding('sqlite'); | ||
| const binding = internalBinding('sqlite'); | ||
|
|
||
| function backup(sourceDb, path, options = {}) { | ||
| validateObject(sourceDb, 'sourceDb'); | ||
| validateString(path, 'options.headers.host'); | ||
| validateAbortSignal(options.signal, 'options.signal'); | ||
jasnell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (options.signal?.aborted) { | ||
| return PromiseReject(new AbortError(undefined, { cause: options.signal.reason })); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One issue I can see with this, which does not need to be addressed now, is that this will mean we have two
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out, I wasn't sure if I understood this correctly when I was told in a previous comment! I'd like to follow up on this after landing the current PR. It definitely makes sense to reconcile the two Would you happen to have any pointers on the preferred approach for reusing the js native I have put the comment here: https://github.com/nodejs/node/pull/59333/files#diff-dd810db4fe69364c3a67a274e0725f386040c0fd1dcfade7093f23c8514328aeR119-R120 |
||
| } | ||
| return binding.backup(sourceDb, path, options); | ||
| } | ||
|
|
||
| module.exports = { | ||
| ...binding, | ||
| backup, | ||
| }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| #include "threadpoolwork-inl.h" | ||
| #include "util-inl.h" | ||
|
|
||
| #include <atomic> | ||
| #include <cinttypes> | ||
|
|
||
| namespace node { | ||
|
|
@@ -120,6 +121,55 @@ using v8::Value; | |
| UNREACHABLE("Bad SQLite value"); \ | ||
| } \ | ||
| } while (0) | ||
| // TODO(@lluisemper) This is a copy of node::AbortError, use js native | ||
| // AbortError constructor to allow instanceof checks in JS. | ||
| class AbortError { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @himself65 I have tried to implement this class as you suggested, I hope I have not misunderstood 😂 I have check its js implementation from here: https://github.com/nodejs/node/blob/main/lib/internal/errors.js#L976-L988 |
||
| public: | ||
| static MaybeLocal<Object> New( | ||
| Isolate* isolate, | ||
| std::string_view message = "The operation was aborted", | ||
| Local<Value> cause = Local<Value>()) { | ||
| Local<String> js_msg; | ||
| Local<Object> error_obj; | ||
| Local<Context> context = isolate->GetCurrentContext(); | ||
| Environment* env = Environment::GetCurrent(isolate); | ||
|
|
||
| if (!String::NewFromUtf8(isolate, | ||
| message.data(), | ||
| NewStringType::kNormal, | ||
| static_cast<int>(message.size())) | ||
| .ToLocal(&js_msg) || | ||
| !Exception::Error(js_msg)->ToObject(context).ToLocal(&error_obj)) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
|
|
||
| Local<String> error_name; | ||
| if (!String::NewFromUtf8(isolate, "AbortError").ToLocal(&error_name)) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
| if (error_obj->Set(context, env->name_string(), error_name).IsNothing()) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
|
|
||
| Local<String> code_key; | ||
| Local<String> code_value; | ||
| if (!String::NewFromUtf8(isolate, "code").ToLocal(&code_key) || | ||
| !String::NewFromUtf8(isolate, "ABORT_ERR").ToLocal(&code_value)) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
| if (error_obj->Set(context, code_key, code_value).IsNothing()) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
|
|
||
| if (!cause.IsEmpty() && !cause->IsUndefined()) { | ||
| if (error_obj->Set(context, env->cause_string(), cause).IsNothing()) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
| } | ||
|
|
||
| return error_obj; | ||
| } | ||
| }; | ||
|
|
||
| namespace { | ||
| Local<DictionaryTemplate> getLazyIterTemplate(Environment* env) { | ||
|
|
@@ -134,11 +184,16 @@ Local<DictionaryTemplate> getLazyIterTemplate(Environment* env) { | |
| } // namespace | ||
|
|
||
| inline MaybeLocal<Object> CreateSQLiteError(Isolate* isolate, | ||
| const char* message) { | ||
| std::string_view message) { | ||
| Local<String> js_msg; | ||
| Local<Object> e; | ||
| Environment* env = Environment::GetCurrent(isolate); | ||
| if (!String::NewFromUtf8(isolate, message).ToLocal(&js_msg) || | ||
|
|
||
| if (!String::NewFromUtf8(isolate, | ||
| message.data(), | ||
| NewStringType::kNormal, | ||
| static_cast<int>(message.size())) | ||
| .ToLocal(&js_msg) || | ||
| !Exception::Error(js_msg) | ||
| ->ToObject(isolate->GetCurrentContext()) | ||
| .ToLocal(&e) || | ||
|
|
@@ -148,6 +203,7 @@ inline MaybeLocal<Object> CreateSQLiteError(Isolate* isolate, | |
| .IsNothing()) { | ||
| return MaybeLocal<Object>(); | ||
| } | ||
|
|
||
| return e; | ||
| } | ||
|
|
||
|
|
@@ -450,16 +506,21 @@ class BackupJob : public ThreadPoolWork { | |
| std::string destination_name, | ||
| std::string dest_db, | ||
| int pages, | ||
| Local<Function> progressFunc) | ||
| Local<Function> progress_func, | ||
| Local<Object> abort_signal = Local<Object>()) | ||
| : ThreadPoolWork(env, "node_sqlite3.BackupJob"), | ||
| env_(env), | ||
| source_(source), | ||
| pages_(pages), | ||
| source_db_(std::move(source_db)), | ||
| destination_name_(std::move(destination_name)), | ||
| dest_db_(std::move(dest_db)) { | ||
| dest_db_(std::move(dest_db)), | ||
| is_aborted_(false) { | ||
| resolver_.Reset(env->isolate(), resolver); | ||
| progressFunc_.Reset(env->isolate(), progressFunc); | ||
| progress_func_.Reset(env->isolate(), progress_func); | ||
| if (!abort_signal.IsEmpty()) { | ||
| abort_signal_.Reset(env->isolate(), abort_signal); | ||
| } | ||
| } | ||
|
|
||
| void ScheduleBackup() { | ||
|
|
@@ -488,6 +549,10 @@ class BackupJob : public ThreadPoolWork { | |
| } | ||
|
|
||
| void DoThreadPoolWork() override { | ||
| if (is_aborted_.load(std::memory_order_acquire)) { | ||
| backup_status_ = SQLITE_INTERRUPT; | ||
| return; | ||
| } | ||
| backup_status_ = sqlite3_backup_step(backup_, pages_); | ||
| } | ||
|
|
||
|
|
@@ -496,6 +561,12 @@ class BackupJob : public ThreadPoolWork { | |
| Local<Promise::Resolver> resolver = | ||
| Local<Promise::Resolver>::New(env()->isolate(), resolver_); | ||
|
|
||
| if (is_aborted_.load(std::memory_order_acquire) || | ||
| backup_status_ == SQLITE_INTERRUPT) { | ||
| HandleAbortError(resolver); | ||
| return; | ||
| } | ||
|
|
||
| if (!(backup_status_ == SQLITE_OK || backup_status_ == SQLITE_DONE || | ||
| backup_status_ == SQLITE_BUSY || backup_status_ == SQLITE_LOCKED)) { | ||
| HandleBackupError(resolver, backup_status_); | ||
|
|
@@ -506,7 +577,7 @@ class BackupJob : public ThreadPoolWork { | |
| int remaining_pages = sqlite3_backup_remaining(backup_); | ||
| if (remaining_pages != 0) { | ||
| Local<Function> fn = | ||
| Local<Function>::New(env()->isolate(), progressFunc_); | ||
| Local<Function>::New(env()->isolate(), progress_func_); | ||
| if (!fn.IsEmpty()) { | ||
| Local<Object> progress_info = Object::New(env()->isolate()); | ||
| if (progress_info | ||
|
|
@@ -531,6 +602,14 @@ class BackupJob : public ThreadPoolWork { | |
| return; | ||
| } | ||
| } | ||
| if (CheckAbortSignal()) { | ||
| // TODO(@lluisemper): BackupJob does not implement proper async context | ||
| // tracking yet. | ||
| // Consider inheriting from AsyncWrap and using CallbackScope to | ||
| // propagate async context, similar to other ThreadPoolWork items. | ||
| HandleAbortError(resolver); | ||
jasnell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return; | ||
| } | ||
|
|
||
| // There's still work to do | ||
| this->ScheduleWork(); | ||
|
|
@@ -564,6 +643,10 @@ class BackupJob : public ThreadPoolWork { | |
| sqlite3_close_v2(dest_); | ||
| dest_ = nullptr; | ||
| } | ||
|
|
||
| if (!abort_signal_.IsEmpty()) { | ||
| abort_signal_.Reset(); | ||
| } | ||
| } | ||
|
|
||
| private: | ||
|
|
@@ -589,19 +672,73 @@ class BackupJob : public ThreadPoolWork { | |
| resolver->Reject(env()->context(), e).ToChecked(); | ||
| } | ||
|
|
||
| inline MaybeLocal<Object> CreateAbortError( | ||
| Isolate* isolate, | ||
| std::string_view message = "The operation was aborted") { | ||
| Environment* env = Environment::GetCurrent(isolate); | ||
| HandleScope scope(isolate); | ||
| Local<Value> cause; | ||
|
|
||
| if (!abort_signal_.IsEmpty()) { | ||
| Local<Object> signal = abort_signal_.Get(isolate); | ||
| Local<String> reason_key = env->reason_string(); | ||
|
|
||
| if (!signal->Get(isolate->GetCurrentContext(), reason_key) | ||
| .ToLocal(&cause)) { | ||
| cause = Local<Value>(); | ||
| } | ||
| } | ||
|
|
||
| return AbortError::New(isolate, message, cause); | ||
| } | ||
|
|
||
| void HandleAbortError(Local<Promise::Resolver> resolver) { | ||
| Local<Object> e; | ||
| if (!CreateAbortError(env()->isolate()).ToLocal(&e)) { | ||
| Finalize(); | ||
| return; | ||
| } | ||
|
|
||
| Finalize(); | ||
| resolver->Reject(env()->context(), e).ToChecked(); | ||
| } | ||
|
|
||
| bool CheckAbortSignal() { | ||
| if (abort_signal_.IsEmpty()) { | ||
| return false; | ||
| } | ||
|
|
||
| Isolate* isolate = env()->isolate(); | ||
| HandleScope scope(isolate); | ||
| Local<Object> signal = abort_signal_.Get(isolate); | ||
|
|
||
| Local<Value> aborted_value; | ||
| if (signal->Get(env()->context(), env()->aborted_string()) | ||
| .ToLocal(&aborted_value)) { | ||
| if (aborted_value->BooleanValue(isolate)) { | ||
| is_aborted_.store(true, std::memory_order_release); | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| Environment* env() const { return env_; } | ||
|
|
||
| Environment* env_; | ||
| DatabaseSync* source_; | ||
| Global<Promise::Resolver> resolver_; | ||
| Global<Function> progressFunc_; | ||
| Global<Function> progress_func_; | ||
| Global<Object> abort_signal_; | ||
| sqlite3* dest_ = nullptr; | ||
| sqlite3_backup* backup_ = nullptr; | ||
| int pages_; | ||
| int backup_status_ = SQLITE_OK; | ||
| std::string source_db_; | ||
| std::string destination_name_; | ||
| std::string dest_db_; | ||
| std::atomic<bool> is_aborted_; | ||
| }; | ||
|
|
||
| UserDefinedFunction::UserDefinedFunction(Environment* env, | ||
|
|
@@ -1597,7 +1734,8 @@ void Backup(const FunctionCallbackInfo<Value>& args) { | |
| int rate = 100; | ||
| std::string source_db = "main"; | ||
| std::string dest_db = "main"; | ||
| Local<Function> progressFunc = Local<Function>(); | ||
| Local<Function> progress_func = Local<Function>(); | ||
| Local<Object> abort_signal = Local<Object>(); | ||
|
|
||
| if (args.Length() > 2) { | ||
| if (!args[2]->IsObject()) { | ||
|
|
@@ -1669,7 +1807,13 @@ void Backup(const FunctionCallbackInfo<Value>& args) { | |
| "The \"options.progress\" argument must be a function."); | ||
| return; | ||
| } | ||
| progressFunc = progress_v.As<Function>(); | ||
| progress_func = progress_v.As<Function>(); | ||
| } | ||
|
|
||
| Local<Value> signal_v; | ||
| if (!options->Get(env->context(), env->signal_string()) | ||
| .ToLocal(&signal_v)) { | ||
| return; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1686,7 +1830,8 @@ void Backup(const FunctionCallbackInfo<Value>& args) { | |
| dest_path.value(), | ||
| std::move(dest_db), | ||
| rate, | ||
| progressFunc); | ||
| progress_func, | ||
| abort_signal); | ||
| db->AddBackup(job); | ||
| job->ScheduleBackup(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this function all in C++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering why we would need to send it to the C++ layer if it is not valid. On the other hand, it might look cleaner to leave this file as it is and add everything in C++ instead.