Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/amalthea/src/comm/ui_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ pub struct DidChangePlotsRenderSettingsParams {
pub settings: PlotRenderSettings,
}

/// Parameters for the FrontendReady method.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct FrontendReadyParams {
/// The type of session start: 'new' for new sessions, 'restart' for
/// restarted sessions, 'reconnect' for reconnected sessions
pub start_type: String,
}
Comment on lines +171 to +175
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Architectural (later):

In the future, we think start_type might eventually come through comm_open(). In that world, I think it does make sense for this to be a 3 valued string like is shown here. Because we get a comm_open() for all 3 types of "start".

When we move start_type to comm_open(), this method probably changes from FrontendReady to something more specific like FireHooks (or maybe the name still makes sense, idk yet?). Regardless, what definitely would change is that start_type would be removed from the params here, because it would have already come through in comm_open().

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I stayed away from hard-wiring "fire the hooks!" terminology into the Positron side. Because I figured, Positron really doesn't need to know what the backend is doing with this info. It just needs to say "I am ready".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that is also where I was starting to land with "or maybe the name still makes sense, idk yet?"


/// Parameters for the CallMethod method.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct CallMethodParams {
Expand Down Expand Up @@ -412,6 +420,15 @@ pub enum UiBackendRequest {
#[serde(rename = "did_change_plots_render_settings")]
DidChangePlotsRenderSettings(DidChangePlotsRenderSettingsParams),

/// Notification that the frontend is ready
///
/// This notification is sent by the frontend after the UI comm has been
/// established. The backend uses this signal to run session
/// initialization hooks that may need to communicate with the frontend
/// via RPCs (e.g. rstudioapi calls).
#[serde(rename = "frontend_ready")]
FrontendReady(FrontendReadyParams),

/// Run a method in the interpreter and return the result to the frontend
///
/// Unlike other RPC methods, `call_method` calls into methods implemented
Expand All @@ -437,6 +454,9 @@ pub enum UiBackendReply {
/// Unused response to notification
DidChangePlotsRenderSettingsReply(),

/// Unused response to notification
FrontendReadyReply(),

/// The method result
CallMethodReply(CallMethodResult),

Expand Down
11 changes: 10 additions & 1 deletion crates/amalthea/src/language/shell_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@ pub trait ShellHandler: Send {
///
/// * `target` - The target name of the comm, such as `positron.variables`
/// * `comm` - The comm channel to use to communicate with the frontend
async fn handle_comm_open(&self, target: Comm, comm: CommSocket) -> crate::Result<bool>;
/// * `data` - The `data` payload from the `comm_open` message
async fn handle_comm_open(
&self,
target: Comm,
comm: CommSocket,
data: serde_json::Value,
) -> crate::Result<bool>;

/// Handle an incoming comm message (RPC or data). Return
/// `CommHandled::Handled` if the message was processed, or
Expand All @@ -98,11 +104,14 @@ pub trait ShellHandler: Send {
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name (e.g. `"positron.dataExplorer"`)
/// * `msg` - The parsed `CommMsg`
/// * `originator` - The originator of the Jupyter message, threaded through
/// so that comm handlers can make RPCs back to the frontend
fn handle_comm_msg(
&mut self,
_comm_id: &str,
_comm_name: &str,
_msg: CommMsg,
_originator: Originator,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
}
Expand Down
30 changes: 21 additions & 9 deletions crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::wire::comm_info_request::CommInfoRequest;
use crate::wire::comm_msg::CommWireMsg;
use crate::wire::comm_open::CommOpen;
use crate::wire::exception::Exception;
use crate::wire::header::JupyterHeader;
use crate::wire::jupyter_message::JupyterMessage;
use crate::wire::jupyter_message::Message;
use crate::wire::jupyter_message::ProtocolMessage;
Expand Down Expand Up @@ -288,9 +287,9 @@ impl Shell {
},
Message::CommMsg(req) => {
let open_comms = &self.open_comms;
let header = req.header.clone();
let originator = Originator::from(&req);
Self::handle_notification(iopub_tx, req, |msg| {
Self::handle_comm_msg(shell_handler, open_comms, header, msg)
Self::handle_comm_msg(shell_handler, open_comms, originator, msg)
})
},
Message::CommClose(req) => {
Expand Down Expand Up @@ -467,21 +466,21 @@ impl Shell {
fn handle_comm_msg(
shell_handler: &mut Box<dyn ShellHandler>,
open_comms: &[CommSocket],
header: JupyterHeader,
originator: Originator,
msg: &CommWireMsg,
) -> crate::Result<()> {
// The presence of an `id` field means this is a request, not a notification
// https://github.com/posit-dev/positron/issues/7448
let comm_msg = if msg.data.get("id").is_some() {
// Note that the JSON-RPC `id` field must exactly match the one in
// the Jupyter header
let request_id = header.msg_id.clone();
let request_id = originator.header.msg_id.clone();

// Include the header so it can be echoed back in the reply for
// proper message parenting
CommMsg::Rpc {
id: request_id,
parent_header: header,
parent_header: originator.header.clone(),
data: msg.data.clone(),
}
} else {
Expand All @@ -497,7 +496,12 @@ impl Shell {
};

// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
match shell_handler.handle_comm_msg(
&msg.comm_id,
&comm.comm_name,
comm_msg.clone(),
originator,
)? {
CommHandled::Handled => Ok(()),
CommHandled::NotHandled => {
// Fall back to old approach for compatibility while we migrate comms
Expand Down Expand Up @@ -595,14 +599,22 @@ impl Shell {
true
} else {
// No server handler found, pass through to shell handler
block_on(shell_handler.handle_comm_open(comm, comm_socket.clone()))?
block_on(shell_handler.handle_comm_open(
comm,
comm_socket.clone(),
msg.data.clone(),
))?
}
},

// All comms tied to known Positron clients are passed through to the shell handler
_ => {
// Call the shell handler to open the comm
block_on(shell_handler.handle_comm_open(comm, comm_socket.clone()))?
block_on(shell_handler.handle_comm_open(
comm,
comm_socket.clone(),
msg.data.clone(),
))?
},
};

Expand Down
7 changes: 6 additions & 1 deletion crates/amalthea/tests/client/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ impl ShellHandler for Shell {
})
}

async fn handle_comm_open(&self, req: Comm, comm: CommSocket) -> amalthea::Result<bool> {
async fn handle_comm_open(
&self,
req: Comm,
comm: CommSocket,
_data: serde_json::Value,
) -> amalthea::Result<bool> {
// Used to test error replies
match req {
Comm::Other(name) if name == "unknown" => {
Expand Down
5 changes: 5 additions & 0 deletions crates/ark/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ pub(crate) struct Console {
/// the reply should be send to once computation has finished.
active_request: Option<ActiveReadConsoleRequest>,

/// Originator from the current `comm_msg` being handled, if any.
/// This is temporarily set during `comm_handle_msg` so that comm
/// handlers (e.g. `frontend_ready`) can make RPCs back to the frontend.
comm_msg_originator: Option<Originator>,

/// Execution request counter used to populate `In[n]` and `Out[n]` prompts
execution_count: u32,

Expand Down
6 changes: 5 additions & 1 deletion crates/ark/src/console/console_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ impl Console {
let comm = self.comms.get(self.ui_comm_id.as_deref()?)?;
Some(UiCommRef {
comm,
originator: self.active_request.as_ref().map(|r| &r.originator),
originator: self
.active_request
.as_ref()
.map(|r| &r.originator)
.or(self.comm_msg_originator.as_ref()),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change that makes frontend RPCs work from comm_msg handlers.

stdin_request_tx: &self.stdin_request_tx,
})
}
Expand Down
4 changes: 4 additions & 0 deletions crates/ark/src/console/console_repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ impl Console {
iopub_tx,
kernel_request_rx,
active_request: None,
comm_msg_originator: None,
execution_count: 0,
autoprint_output: String::new(),
ui_comm_id: None,
Expand Down Expand Up @@ -1899,9 +1900,12 @@ impl Console {
KernelRequest::CommMsg {
comm_id,
msg,
originator,
done_tx,
} => {
self.comm_msg_originator = Some(*originator);
self.comm_handle_msg(&comm_id, msg);
self.comm_msg_originator = None;
done_tx.send(()).log_err();
},
KernelRequest::CommClose { comm_id, done_tx } => {
Expand Down
37 changes: 37 additions & 0 deletions crates/ark/src/modules/positron/hooks.R
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,43 @@ check_version <- function(pkg) {
)
}

.ps.run_session_init_hooks <- function(start_type = c("new", "restart")) {
start_type <- match.arg(start_type)
hooks <- getHook("positron.session_init")

for (hook_fn in hooks) {
tryCatch(
hook_fn(start_type),
error = function(err) {
log_warning(sprintf(
"Error in positron.session_init hook: %s",
conditionMessage(err)
))
}
)
}

invisible(NULL)
}

.ps.run_session_reconnect_hooks <- function() {
hooks <- getHook("positron.session_reconnect")

for (hook_fn in hooks) {
tryCatch(
hook_fn(),
error = function(err) {
log_warning(sprintf(
"Error in positron.session_reconnect hook: %s",
conditionMessage(err)
))
}
)
}

invisible(NULL)
}
Comment thread
jennybc marked this conversation as resolved.

# We don't support `utils::recover()` in Ark, but the same functionality is
# provided via the call stack pane of IDEs. So replace it by `browser()` so that
# people can enter the debugger on error using the familiar `options(error =
Expand Down
1 change: 1 addition & 0 deletions crates/ark/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub enum KernelRequest {
CommMsg {
comm_id: String,
msg: CommMsg,
originator: Box<Originator>,
done_tx: Sender<()>,
},

Expand Down
13 changes: 11 additions & 2 deletions crates/ark/src/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,19 @@ impl ShellHandler for Shell {
///
/// Note that there might be multiple requests during a single session if
/// the UI has been disconnected and reconnected.
async fn handle_comm_open(&self, target: Comm, comm: CommSocket) -> amalthea::Result<bool> {
async fn handle_comm_open(
&self,
target: Comm,
comm: CommSocket,
data: serde_json::Value,
) -> amalthea::Result<bool> {
match target {
Comm::Variables => handle_comm_open_variables(comm),
Comm::Ui => handle_comm_open_ui(
comm,
self.kernel_request_tx.clone(),
self.graphics_device_tx.clone(),
data,
),
Comm::Help => handle_comm_open_help(comm),
Comm::Other(target_name) if target_name == "ark" => ArkComm::handle_comm_open(comm),
Expand All @@ -255,12 +261,14 @@ impl ShellHandler for Shell {
comm_id: &str,
comm_name: &str,
msg: CommMsg,
originator: Originator,
) -> amalthea::Result<CommHandled> {
match comm_name {
DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => {
self.dispatch_kernel_request(|done_tx| KernelRequest::CommMsg {
comm_id: comm_id.to_string(),
msg,
originator: Box::new(originator),
done_tx,
})?;
Ok(CommHandled::Handled)
Expand Down Expand Up @@ -316,8 +324,9 @@ fn handle_comm_open_ui(
comm: CommSocket,
kernel_request_tx: Sender<KernelRequest>,
graphics_device_tx: AsyncUnboundedSender<GraphicsDeviceNotification>,
data: serde_json::Value,
) -> amalthea::Result<bool> {
let handler = UiComm::new(graphics_device_tx);
let handler = UiComm::new(graphics_device_tx, data);

let (done_tx, done_rx) = bounded(0);
kernel_request_tx
Expand Down
Loading
Loading