Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions crates/ark/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ use harp::utils::r_typeof;
use harp::CONSOLE_THREAD_ID;
use libr::R_BaseNamespace;
use libr::R_ProcessEvents;
use libr::R_RunPendingFinalizers;
use libr::Rf_ScalarInteger;
use libr::Rf_error;
use libr::Rf_findVarInFrame;
Expand Down Expand Up @@ -128,8 +127,7 @@ use console_filter::ConsoleFilter;
pub use console_repl::catching_panics;
pub(crate) use console_repl::console_inputs;
pub(crate) use console_repl::r_busy;
#[cfg(unix)]
pub(crate) use console_repl::r_polled_events;
pub(crate) use console_repl::r_process_events;
pub(crate) use console_repl::r_read_console;
pub(crate) use console_repl::r_show_message;
pub(crate) use console_repl::r_suicide;
Expand Down Expand Up @@ -235,7 +233,6 @@ pub struct Console {
autoprint_output: String,

/// Channel to send and receive tasks from `QueuedRTask`s
tasks_interrupt_rx: Receiver<QueuedRTask>,
tasks_idle_rx: Receiver<QueuedRTask>,
tasks_idle_any_rx: Receiver<QueuedRTask>,
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo, Option<String>)>,
Expand Down
1 change: 0 additions & 1 deletion crates/ark/src/console/console_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ impl ConsoleFilter {
/// Check for timeout and handle state transitions.
/// Timeout means we didn't reach ReadConsole to confirm debug output,
/// so we emit the accumulated content back to the user.
#[cfg(any(unix, test))]
pub(super) fn check_timeout(&mut self) -> Option<String> {
self.drain_on_timeout()
}
Expand Down
212 changes: 105 additions & 107 deletions crates/ark/src/console/console_repl.rs

Large diffs are not rendered by default.

42 changes: 32 additions & 10 deletions crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use amalthea::comm::server_comm::ServerStartMessage;
use amalthea::comm::server_comm::ServerStartedMessage;
Expand Down Expand Up @@ -85,7 +86,7 @@ macro_rules! cast_response {
},
RequestResponse::Crashed(err) => {
// Notify user that the LSP has crashed and is no longer active
report_crash();
report_crash($self.client()).await;

// The backtrace is reported via `err` and eventually shows up
// in the LSP logs on the client side
Expand All @@ -99,21 +100,34 @@ macro_rules! cast_response {
}};
}

fn report_crash() {
/// Send via `request::ShowMessageRequest` not `notification::ShowMessage` so that we can
/// ensure that the message has been received on the frontend side. We are about to shut
/// the LSP down, and sending out a fire-and-forget notification often won't get sent out
/// before shutdown occurs. The request returns control to us when the user acknowledges
/// the message. It doesn't matter if that takes awhile because we shut down right after,
/// and we've already flipped the `LSP_HAS_CRASHED` global flag, but we do bound it with
/// a 5 second timeout just in case the user ignores the message entirely, so we can still
/// shutdown.
async fn report_crash(client: &Client) {
let user_message = concat!(
"The R language server has crashed and has been disabled. ",
"Smart features such as completions will no longer work in this session. ",
"Please report this crash to https://github.com/posit-dev/positron/issues ",
"with full logs (see https://positron.posit.co/troubleshooting.html#python-and-r-logs)."
);

// NOTE: This is a legit use of interrupt-time task. No R access here, and
// we need to go through Console since it owns the UI comm.
r_task(|| {
if let Some(ui) = Console::get().ui_comm() {
ui.show_message(String::from(user_message));
}
let request = client.send_request::<request::ShowMessageRequest>(ShowMessageRequestParams {
typ: MessageType::ERROR,
message: String::from(user_message),
actions: None,
});
match tokio::time::timeout(Duration::from_secs(5), request).await {
Ok(result) => {
result.log_err();
},
Err(_) => {
log::warn!("Timed out waiting for frontend to acknowledge LSP crash notification");
},
}
Comment on lines -110 to +130
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report_crash() moves off needing to use the ui comm!

}

#[derive(Debug)]
Expand Down Expand Up @@ -228,6 +242,9 @@ struct Backend {
/// Channel for communication with the main loop.
events_tx: TokioUnboundedSender<Event>,

/// Copy of the Client, for reporting crash messages.
client: Client,

/// Handle to main loop. Drop it to cancel the loop, all associated tasks,
/// and drop all owned state.
_main_loop: tokio::task::JoinSet<()>,
Expand Down Expand Up @@ -256,6 +273,10 @@ impl Backend {
.send(Event::Lsp(LspMessage::Notification(notif)))
.unwrap();
}

fn client(&self) -> &Client {
&self.client
}
}

#[tower_lsp::async_trait]
Expand Down Expand Up @@ -557,7 +578,7 @@ pub(crate) fn start_lsp(
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);

let init = |client: Client| {
let state = GlobalState::new(client, r_home, console_notification_tx);
let state = GlobalState::new(client.clone(), r_home, console_notification_tx);
let events_tx = state.events_tx();

// Start main loop and hold onto the handle that keeps it alive
Expand All @@ -579,6 +600,7 @@ pub(crate) fn start_lsp(
Backend {
shutdown_tx,
events_tx,
client,
_main_loop: main_loop,
}
};
Expand Down
119 changes: 47 additions & 72 deletions crates/ark/src/r_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ use crate::console::Console;
use crate::console::ConsoleOutputCapture;
use crate::fixtures::r_test_init;

/// Task channels for interrupt-time tasks
static INTERRUPT_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);

/// Task channels for idle-time tasks
/// Task channels for idle-time tasks (top-level only)
static IDLE_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);

/// Task channels for idle tasks that run at any idle prompt (top-level or browser)
/// Task channels for idle tasks that run at any idle prompt (top-level or browser, but
/// not input)
static IDLE_ANY_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);

// Compared to `futures::BoxFuture`, this doesn't require the future to be Send.
Expand Down Expand Up @@ -64,19 +62,11 @@ impl TaskChannels {
}
}

/// Returns receivers for interrupt, idle, and debug-idle tasks.
/// Returns receivers for idle and idle-any tasks.
/// Initializes the task channels if they haven't been initialized yet.
/// Can only be called once (intended for `Console` during init).
pub(crate) fn take_receivers() -> (
Receiver<QueuedRTask>,
Receiver<QueuedRTask>,
Receiver<QueuedRTask>,
) {
(
INTERRUPT_TASKS.take_rx(),
IDLE_TASKS.take_rx(),
IDLE_ANY_TASKS.take_rx(),
)
pub(crate) fn take_receivers() -> (Receiver<QueuedRTask>, Receiver<QueuedRTask>) {
(IDLE_TASKS.take_rx(), IDLE_ANY_TASKS.take_rx())
}

pub enum QueuedRTask {
Expand Down Expand Up @@ -148,7 +138,7 @@ impl std::task::Wake for RTaskWaker {
}

impl RTaskStartInfo {
pub(crate) fn new(idle: bool) -> Self {
pub(crate) fn new() -> Self {
let thread = std::thread::current();
let thread_id = thread.id();
let thread_name = thread
Expand All @@ -158,7 +148,7 @@ impl RTaskStartInfo {
.to_owned();

let start_time = std::time::Instant::now();
let span = tracing::trace_span!("R task", thread = thread_name, interrupt = !idle,);
let span = tracing::trace_span!("R task", thread = thread_name);

Self {
thread_id,
Expand Down Expand Up @@ -191,6 +181,9 @@ impl RTaskStartInfo {
// running, so borrowing is allowed even though we send it to another
// thread. See also `Crossbeam::thread::ScopedThreadBuilder` (from which
// `r_task()` is adapted) for a similar approach.
//
// `r_task()`s run via `IDLE_ANY_TASKS`, i.e. they run at top level, and at a debugger
// prompt (but notably not the input prompt as a way to reduce risk of reentrancy).

pub fn r_task<'env, F, T>(f: F) -> T
where
Expand Down Expand Up @@ -242,9 +235,9 @@ where
let task = QueuedRTask::Sync(RTaskSync {
fun: closure,
status_tx: Some(status_tx),
start_info: RTaskStartInfo::new(false),
start_info: RTaskStartInfo::new(),
});
INTERRUPT_TASKS.tx().send(task).unwrap();
IDLE_ANY_TASKS.tx().send(task).unwrap();

// Block until we get the signal that the task has started
let status = status_rx.recv().unwrap();
Expand Down Expand Up @@ -289,23 +282,18 @@ where

/// An async task to be run on the R thread.
///
/// Construct via `RTask::interrupt`, `RTask::idle`, or `RTask::idle_any_prompt`
/// when spawning from the R thread. Use the `Send` variants
/// (`RTask::send_interrupt`, etc.) when spawning from other threads.
/// Construct via [RTask::idle] or [RTask::idle_any_prompt] when spawning from the R
/// thread. Use the `Send` variants ([RTask::send_idle], etc.) when spawning from other
/// threads.
///
/// For idle modes, console output is automatically captured during the task's
/// execution via a `ConsoleOutputCapture` passed to the closure.
/// Console output is automatically captured during the task's execution via a
/// `ConsoleOutputCapture` passed to the closure.
pub(crate) enum RTask {
/// Run at the next interrupt check. Must be spawned from the R thread.
Interrupt(BoxFuture<'static, ()>),
/// Run when R is at a top-level idle prompt. Must be spawned from the R thread.
Idle(BoxFuture<'static, ()>),
/// Run when R is at any idle prompt (top-level or browser). Must be spawned
/// from the R thread.
IdleAnyPrompt(BoxFuture<'static, ()>),
/// Like `Interrupt`, but can be spawned from any thread. The constructor
/// enforces `Send` on the closure.
SendInterrupt(BoxFuture<'static, ()>),
/// Like `Idle`, but can be spawned from any thread. The constructor
/// enforces `Send` on the closure.
SendIdle(BoxFuture<'static, ()>),
Expand All @@ -315,14 +303,6 @@ pub(crate) enum RTask {
}

impl RTask {
pub(crate) fn interrupt<F, Fut>(fun: F) -> Self
where
F: FnOnce() -> Fut + 'static,
Fut: Future<Output = ()> + 'static,
{
RTask::Interrupt(Box::pin(fun()))
}

pub(crate) fn idle<F, Fut>(fun: F) -> Self
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static,
Expand All @@ -340,90 +320,85 @@ impl RTask {
RTask::IdleAnyPrompt(Self::pin_with_capture(fun))
}

fn pin_with_capture<F, Fut>(fun: F) -> BoxFuture<'static, ()>
pub(crate) fn send_idle<F, Fut>(fun: F) -> Self
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static,
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static + Send,
Fut: Future<Output = ()> + 'static,
{
Box::pin(async move {
let capture = if Console::is_initialized() {
Console::get_mut().start_capture()
} else {
// Unit tests run without a Console. The dummy capture is
// inert and doesn't interact with Console state.
debug_assert!(stdext::IS_TESTING);
ConsoleOutputCapture::dummy()
};
fun(capture).await
})
RTask::SendIdle(Self::pin_with_capture(fun))
}

pub(crate) fn send_interrupt<F, Fut>(fun: F) -> Self
/// For rare performance sensitive cases where you'd like to avoid the cost of
/// poking R options via [Self::pin_with_capture()] and you know you don't need
/// the safety of capturing output because you aren't running R code
pub(crate) fn send_idle_without_capture<F, Fut>(fun: F) -> Self
Comment on lines +331 to +334
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did end up wanting this for Drop in RThreadSafe

where
F: FnOnce() -> Fut + 'static + Send,
Fut: Future<Output = ()> + 'static,
{
RTask::SendInterrupt(Box::pin(fun()))
RTask::SendIdle(Box::pin(fun()))
}

pub(crate) fn send_idle<F, Fut>(fun: F) -> Self
pub(crate) fn send_idle_any_prompt<F, Fut>(fun: F) -> Self
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static + Send,
Fut: Future<Output = ()> + 'static,
{
RTask::SendIdle(Self::pin_with_capture(fun))
RTask::SendIdleAnyPrompt(Self::pin_with_capture(fun))
}

pub(crate) fn send_idle_any_prompt<F, Fut>(fun: F) -> Self
fn pin_with_capture<F, Fut>(fun: F) -> BoxFuture<'static, ()>
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static + Send,
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static,
Fut: Future<Output = ()> + 'static,
{
RTask::SendIdleAnyPrompt(Self::pin_with_capture(fun))
Box::pin(async move {
let capture = if Console::is_initialized() {
Console::get_mut().start_capture()
} else {
// Unit tests run without a Console. The dummy capture is
// inert and doesn't interact with Console state.
debug_assert!(stdext::IS_TESTING);
ConsoleOutputCapture::dummy()
};
fun(capture).await
})
}
}

/// Spawn an async task on the R thread.
///
/// For `Send` variants (`RTask::send_interrupt`, etc.) this can be called from
/// For `Send` variants ([RTask::send_idle], etc.) this can be called from
/// any thread. Non-`Send` variants must be called from the R thread.
pub(crate) fn spawn(task: RTask) {
if stdext::IS_TESTING && !Console::is_initialized() {
let _lock = harp::fixtures::R_TEST_LOCK.lock();
let fut = match task {
RTask::Interrupt(fut) |
RTask::Idle(fut) |
RTask::IdleAnyPrompt(fut) |
RTask::SendInterrupt(fut) |
RTask::SendIdle(fut) |
RTask::SendIdleAnyPrompt(fut) => fut,
};
futures::executor::block_on(fut);
return;
}

let needs_r_thread = matches!(
task,
RTask::Interrupt(_) | RTask::Idle(_) | RTask::IdleAnyPrompt(_)
);
let needs_r_thread = matches!(task, RTask::Idle(_) | RTask::IdleAnyPrompt(_));
if needs_r_thread && !Console::on_main_thread() {
let thread = std::thread::current();
let name = thread.name().unwrap_or("<unnamed>");
panic!("`spawn()` must be called from the R thread, not thread '{name}'");
}

let (fut, tasks_tx, only_idle) = match task {
RTask::Interrupt(fut) | RTask::SendInterrupt(fut) => (fut, INTERRUPT_TASKS.tx(), false),
RTask::Idle(fut) | RTask::SendIdle(fut) => (fut, IDLE_TASKS.tx(), true),
RTask::IdleAnyPrompt(fut) | RTask::SendIdleAnyPrompt(fut) => {
(fut, IDLE_ANY_TASKS.tx(), true)
},
let (fut, tasks_tx) = match task {
RTask::Idle(fut) | RTask::SendIdle(fut) => (fut, IDLE_TASKS.tx()),
RTask::IdleAnyPrompt(fut) | RTask::SendIdleAnyPrompt(fut) => (fut, IDLE_ANY_TASKS.tx()),
};

let task = QueuedRTask::Async(RTaskAsync {
fut,
tasks_tx: tasks_tx.clone(),
start_info: RTaskStartInfo::new(only_idle),
start_info: RTaskStartInfo::new(),
});

tasks_tx.send(task).unwrap();
Expand Down
Loading
Loading