Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 29 additions & 39 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use payjoin::bitcoin::consensus::encode::serialize_hex;
use payjoin::bitcoin::{Amount, FeeRate};
use payjoin::persist::OptionalTransitionOutcome;
use payjoin::receive::v2::{
process_err_res, replay_event_log as replay_receiver_event_log, Initialized, MaybeInputsOwned,
MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal, ReceiveSession,
Receiver, ReceiverBuilder, SessionHistory, UncheckedOriginalPayload, WantsFeeRange,
replay_event_log as replay_receiver_event_log, HasReplyableError, Initialized,
MaybeInputsOwned, MaybeInputsSeen, OutputsUnknown, PayjoinProposal, ProvisionalProposal,
ReceiveSession, Receiver, ReceiverBuilder, UncheckedOriginalPayload, WantsFeeRange,
WantsInputs, WantsOutputs,
};
use payjoin::send::v2::{
Expand Down Expand Up @@ -70,7 +70,7 @@ impl StatusText for ReceiveSession {
| ReceiveSession::WantsFeeRange(_)
| ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
ReceiveSession::TerminalFailure => "Session failure",
ReceiveSession::HasReplyableError(_) => "Session failure",
}
}
}
Expand Down Expand Up @@ -350,13 +350,17 @@ impl AppTrait for App {
self.db.get_inactive_send_session_ids()?.into_iter().try_for_each(
|(session_id, completed_at)| {
let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
if let Ok((sender_state, session_history)) = replay_sender_event_log(&persister) {
if let Ok((sender_state, _)) = replay_sender_event_log(&persister) {
let row = SessionHistoryRow {
session_id,
role: Role::Sender,
status: sender_state,
status: sender_state.clone(),
completed_at: Some(completed_at),
error_message: session_history.terminal_error(),
error_message: match sender_state {
SendSession::TerminalFailure =>
Some("Sender terminal failure".to_string()),
_ => None,
},
};
send_rows.push(row);
}
Expand All @@ -367,14 +371,17 @@ impl AppTrait for App {
self.db.get_inactive_recv_session_ids()?.into_iter().try_for_each(
|(session_id, completed_at)| {
let persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
if let Ok((receiver_state, session_history)) = replay_receiver_event_log(&persister)
{
if let Ok((receiver_state, _)) = replay_receiver_event_log(&persister) {
let row = SessionHistoryRow {
session_id,
role: Role::Receiver,
status: receiver_state,
status: receiver_state.clone(),
completed_at: Some(completed_at),
error_message: session_history.terminal_error().map(|e| e.0),
error_message: match &receiver_state {
ReceiveSession::HasReplyableError(replyable_error) =>
Some(replyable_error.error_reply().to_json().to_string()),
_ => None,
},
};
recv_rows.push(row);
}
Expand Down Expand Up @@ -519,22 +526,11 @@ impl App {
self.finalize_proposal(proposal, persister).await,
ReceiveSession::PayjoinProposal(proposal) =>
self.send_payjoin_proposal(proposal, persister).await,
ReceiveSession::TerminalFailure =>
return Err(anyhow!("Terminal receiver session")),
ReceiveSession::HasReplyableError(error) =>
self.handle_error(error, persister).await,
}
};

match res {
Ok(_) => Ok(()),
Err(e) => {
let (_, session_history) = replay_receiver_event_log(persister)?;
let pj_uri = session_history.pj_uri().extras.endpoint().clone();
let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(pj_uri)).await?;
self.handle_recoverable_error(&ohttp_relay, &session_history).await?;

Err(e)
}
}
res
}

#[allow(clippy::incompatible_msrv)]
Expand Down Expand Up @@ -700,20 +696,14 @@ impl App {
Ok(ohttp_relay)
}

/// Handle request error by sending an error response over the directory
async fn handle_recoverable_error(
/// Handle error by attempting to send an error response over the directory
async fn handle_error(
&self,
ohttp_relay: &payjoin::Url,
session_history: &SessionHistory,
session: Receiver<HasReplyableError>,
persister: &ReceiverPersister,
) -> Result<()> {
let e = match session_history.terminal_error() {
Some((_, Some(e))) => e,
_ => return Ok(()),
};
let (err_req, err_ctx) = session_history
.extract_err_req(ohttp_relay.as_str())?
.expect("If JsonReply is Some, then err_req and err_ctx should be Some");
let to_return = anyhow!("Replied with error: {}", e.to_json());
let (err_req, err_ctx) =
session.create_error_request(self.unwrap_relay_or_else_fetch(None).await?.as_str())?;

let err_response = match self.post_request(err_req).await {
Ok(response) => response,
Expand All @@ -725,11 +715,11 @@ impl App {
Err(e) => return Err(anyhow!("Failed to get error response bytes: {}", e)),
};

if let Err(e) = process_err_res(&err_bytes, err_ctx) {
if let Err(e) = session.process_error_response(&err_bytes, err_ctx).save(persister) {
return Err(anyhow!("Failed to process error response: {}", e));
}

Err(to_return)
Ok(())
}

async fn post_request(&self, req: payjoin::Request) -> Result<reqwest::Response> {
Expand Down
128 changes: 83 additions & 45 deletions payjoin-ffi/src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub enum ReceiveSession {
WantsFeeRange { inner: Arc<WantsFeeRange> },
ProvisionalProposal { inner: Arc<ProvisionalProposal> },
PayjoinProposal { inner: Arc<PayjoinProposal> },
TerminalFailure,
HasReplyableError { inner: Arc<HasReplyableError> },
}

impl From<payjoin::receive::v2::ReceiveSession> for ReceiveSession {
Expand All @@ -112,7 +112,8 @@ impl From<payjoin::receive::v2::ReceiveSession> for ReceiveSession {
Self::ProvisionalProposal { inner: Arc::new(inner.into()) },
ReceiveSession::PayjoinProposal(inner) =>
Self::PayjoinProposal { inner: Arc::new(inner.into()) },
ReceiveSession::TerminalFailure => Self::TerminalFailure,
ReceiveSession::HasReplyableError(inner) =>
Self::HasReplyableError { inner: Arc::new(inner.into()) },
}
}
}
Expand Down Expand Up @@ -146,19 +147,6 @@ impl From<payjoin::receive::v2::SessionHistory> for SessionHistory {
fn from(value: payjoin::receive::v2::SessionHistory) -> Self { Self(value) }
}

#[derive(uniffi::Object)]
pub struct TerminalErr {
error: String,
reply: Option<JsonReply>,
}

#[uniffi::export]
impl TerminalErr {
pub fn error(&self) -> String { self.error.clone() }

pub fn reply(&self) -> Option<Arc<JsonReply>> { self.reply.clone().map(Arc::new) }
}

#[uniffi::export]
impl SessionHistory {
/// Receiver session Payjoin URI
Expand All @@ -169,33 +157,10 @@ impl SessionHistory {
self.0.psbt_ready_for_signing().map(|psbt| Arc::new(psbt.into()))
}

/// Terminal error from the session if present
pub fn terminal_error(&self) -> Option<Arc<TerminalErr>> {
self.0.terminal_error().map(|(error, reply)| {
Arc::new(TerminalErr { error, reply: reply.map(|reply| reply.into()) })
})
}

/// Fallback transaction from the session if present
pub fn fallback_tx(&self) -> Option<Arc<crate::Transaction>> {
self.0.fallback_tx().map(|tx| Arc::new(tx.into()))
}

/// Construct the error request to be posted on the directory if an error occurred.
/// To process the response, use [process_err_res]
pub fn extract_err_req(
&self,
ohttp_relay: String,
) -> Result<Option<RequestResponse>, SessionError> {
match self.0.extract_err_req(ohttp_relay) {
Ok(Some((request, ctx))) => Ok(Some(RequestResponse {
request: request.into(),
client_response: Arc::new(ctx.into()),
})),
Ok(None) => Ok(None),
Err(e) => Err(SessionError::from(e)),
}
}
}

#[derive(uniffi::Object)]
Expand Down Expand Up @@ -502,12 +467,6 @@ impl UncheckedOriginalPayload {
}
}

/// Process an OHTTP Encapsulated HTTP POST Error response
/// to ensure it has been posted properly
#[uniffi::export]
pub fn process_err_res(body: &[u8], context: &ClientResponse) -> Result<(), SessionError> {
payjoin::receive::v2::process_err_res(body, context.into()).map_err(Into::into)
}
#[derive(Clone, uniffi::Object)]
pub struct MaybeInputsOwned(payjoin::receive::v2::Receiver<payjoin::receive::v2::MaybeInputsOwned>);

Expand Down Expand Up @@ -961,7 +920,7 @@ pub struct PayjoinProposalTransition(
payjoin::persist::MaybeSuccessTransition<
payjoin::receive::v2::SessionEvent,
(),
payjoin::receive::Error,
payjoin::receive::ProtocolError,
>,
>,
>,
Expand Down Expand Up @@ -1036,6 +995,85 @@ impl PayjoinProposal {
}
}

#[derive(Clone, uniffi::Object)]
pub struct HasReplyableError(
pub payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
);

impl From<HasReplyableError>
for payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>
{
fn from(value: HasReplyableError) -> Self { value.0 }
}

impl From<payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>>
for HasReplyableError
{
fn from(
value: payjoin::receive::v2::Receiver<payjoin::receive::v2::HasReplyableError>,
) -> Self {
Self(value)
}
}

#[derive(uniffi::Object)]
pub struct HasReplyableErrorTransition(
Arc<
RwLock<
Option<
payjoin::persist::MaybeSuccessTransition<
payjoin::receive::v2::SessionEvent,
(),
payjoin::receive::Error,
>,
>,
>,
>,
);

#[uniffi::export]
impl HasReplyableErrorTransition {
pub fn save(
&self,
persister: Arc<dyn JsonReceiverSessionPersister>,
) -> Result<(), ReceiverPersistedError> {
let adapter = CallbackPersisterAdapter::new(persister);
let mut inner =
self.0.write().map_err(|_| ImplementationError::from("Lock poisoned".to_string()))?;

let value = inner
.take()
.ok_or_else(|| ImplementationError::from("Already saved or moved".to_string()))?;

value.save(&adapter).map_err(ReceiverPersistedError::from)?;
Ok(())
}
}

#[uniffi::export]
impl HasReplyableError {
pub fn create_error_request(
&self,
ohttp_relay: String,
) -> Result<RequestResponse, SessionError> {
self.0.clone().create_error_request(ohttp_relay).map_err(Into::into).map(|(req, ctx)| {
RequestResponse { request: req.into(), client_response: Arc::new(ctx.into()) }
})
}

pub fn process_error_response(
&self,
body: &[u8],
ohttp_context: &ClientResponse,
) -> PayjoinProposalTransition {
PayjoinProposalTransition(Arc::new(RwLock::new(Some(
self.0.clone().process_error_response(body, ohttp_context.into()),
))))
}

pub fn error_reply(&self) -> String { self.0.error_reply().to_json().to_string() }
}

/// Session persister that should save and load events as JSON strings.
#[uniffi::export(with_foreign)]
pub trait JsonReceiverSessionPersister: Send + Sync {
Expand Down
Loading
Loading