Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ const fn seed_node_addrs(network: Network) -> &'static [SocketAddr] {
pub struct Net {
pub server: Endpoint,
archive: Archive,
network: Network,
state: State,
active_peers: Arc<RwLock<HashMap<SocketAddr, PeerConnectionHandle>>>,
// None indicates that the stream has ended
Expand Down Expand Up @@ -280,6 +281,7 @@ impl Net {
let connection_ctxt = PeerConnectionCtxt {
env,
archive: self.archive.clone(),
network: self.network,
state: self.state.clone(),
};

Expand Down Expand Up @@ -358,6 +360,7 @@ impl Net {
let net = Net {
server,
archive,
network,
state,
active_peers,
peer_info_tx,
Expand Down Expand Up @@ -430,7 +433,7 @@ impl Net {
remote_address,
}
})?;
Connection::from(raw_conn)
Connection::new(raw_conn, self.network)
}
None => {
tracing::debug!("server endpoint closed");
Expand Down Expand Up @@ -462,6 +465,7 @@ impl Net {
let connection_ctxt = PeerConnectionCtxt {
env,
archive: self.archive.clone(),
network: self.network,
state: self.state.clone(),
};
let (connection_handle, info_rx) =
Expand Down
4 changes: 4 additions & 0 deletions lib/net/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ pub(in crate::net::peer) mod connection {

#[derive(Debug, Error)]
pub enum Receive {
#[error("received incorrect magic: {}", hex::encode(.0))]
BadMagic(crate::net::peer::message::MagicBytes),
#[error("bincode error")]
Bincode(#[from] bincode::Error),
#[error("connection error")]
Connection(#[from] quinn::ConnectionError),
#[error("failed to read magic bytes")]
ReadMagic(#[source] quinn::ReadExactError),
#[error("read to end error")]
ReadToEnd(#[from] quinn::ReadToEndError),
}
Expand Down
22 changes: 21 additions & 1 deletion lib/net/peer/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,29 @@ use serde::{Deserialize, Serialize};

use crate::{
net::peer::{PeerState, PeerStateId},
types::{AuthorizedTransaction, BlockHash, Body, Header, Tip, Txid},
types::{
AuthorizedTransaction, BlockHash, Body, Header, Network, Tip, Txid,
},
};

pub const MAGIC_BYTES_LEN: usize = 4;

pub type MagicBytes = [u8; MAGIC_BYTES_LEN];

pub const fn magic_bytes(network: Network) -> MagicBytes {
// First 4 bytes are the US-TTY (LSB Right) Baudot–Murray code for "THNDR".
// Rightmost bits of the 4th byte is the network identifier.
let b0 = 0b1000_0101;
let b1 = 0b0001_1000;
let b2 = 0b1001_0101;
let mut b3 = 0b0000_0000;
match network {
Network::Regtest => (),
Network::Signet => b3 |= 0b0000_0001,
}
[b0, b1, b2, b3]
}

#[derive(BorshSerialize, Clone, Debug, Deserialize, Serialize)]
pub struct Heartbeat(pub PeerState);

Expand Down
97 changes: 69 additions & 28 deletions lib/net/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{spawn, task::JoinHandle, time::Duration};
use crate::{
archive::Archive,
state::State,
types::{AuthorizedTransaction, Hash, Tip, Version, hash, schema},
types::{AuthorizedTransaction, Hash, Network, Tip, Version, hash, schema},
};

mod channel_pool;
Expand Down Expand Up @@ -140,6 +140,7 @@ where
#[derive(Clone)]
pub struct Connection {
pub(in crate::net) inner: quinn::Connection,
pub network: Network,
}

impl Connection {
Expand All @@ -154,14 +155,25 @@ impl Connection {
self.inner.remote_address()
}

pub async fn new(
pub fn new(connection: quinn::Connection, network: Network) -> Self {
Self {
inner: connection,
network,
}
}

pub async fn from_connecting(
connecting: quinn::Connecting,
network: Network,
) -> Result<Self, quinn::ConnectionError> {
let addr = connecting.remote_address();
tracing::trace!(%addr, "connecting to peer");
let connection = connecting.await?;
tracing::info!(%addr, "connected successfully to peer");
Ok(Self { inner: connection })
Ok(Self {
inner: connection,
network,
})
}

async fn receive_request(
Expand All @@ -170,6 +182,15 @@ impl Connection {
{
let (tx, mut rx) = self.inner.accept_bi().await?;
tracing::trace!(recv_id = %rx.id(), "Receiving request");
let mut magic_bytes = [0u8; message::MAGIC_BYTES_LEN];
rx.read_exact(&mut magic_bytes)
.await
.map_err(error::connection::Receive::ReadMagic)?;
if magic_bytes != message::magic_bytes(self.network) {
return Err(
error::connection::Receive::BadMagic(magic_bytes).into()
);
}
let msg_bytes = rx.read_to_end(Connection::READ_REQUEST_LIMIT).await?;
let msg: RequestMessage = bincode::deserialize(&msg_bytes)?;
tracing::trace!(
Expand All @@ -191,8 +212,9 @@ impl Connection {
"Sending heartbeat"
);
let message = RequestMessageRef::from(heartbeat);
let message = bincode::serialize(&message)?;
send.write_all(&message).await.map_err(|err| {
let mut message_buf = message::magic_bytes(self.network).to_vec();
bincode::serialize_into::<&mut Vec<_>, _>(&mut message_buf, &message)?;
send.write_all(&message_buf).await.map_err(|err| {
error::connection::Send::Write {
stream_id: send.id(),
source: err,
Expand All @@ -203,10 +225,20 @@ impl Connection {
}

async fn receive_response(
network: Network,
mut recv: RecvStream,
read_response_limit: NonZeroUsize,
) -> ResponseResult {
tracing::trace!(recv_id = %recv.id(), "Receiving response");
let mut magic_bytes = [0u8; message::MAGIC_BYTES_LEN];
recv.read_exact(&mut magic_bytes)
.await
.map_err(error::connection::Receive::ReadMagic)?;
if magic_bytes != message::magic_bytes(network) {
return Err(
error::connection::Receive::BadMagic(magic_bytes).into()
);
}
let response_bytes =
recv.read_to_end(read_response_limit.get()).await?;
let response: ResponseMessage = bincode::deserialize(&response_bytes)?;
Expand All @@ -230,40 +262,52 @@ impl Connection {
"Sending request"
);
let message = RequestMessageRef::from(request);
let message = bincode::serialize(&message)?;
send.write_all(&message).await.map_err(|err| {
let mut message_buf = message::magic_bytes(self.network).to_vec();
bincode::serialize_into::<&mut Vec<_>, _>(&mut message_buf, &message)?;
send.write_all(&message_buf).await.map_err(|err| {
error::connection::Send::Write {
stream_id: send.id(),
source: err,
}
})?;
send.finish()?;
Ok(Self::receive_response(recv, read_response_limit).await)
Ok(
Self::receive_response(self.network, recv, read_response_limit)
.await,
)
}

// Send a pre-serialized response, where the response does not include
// magic bytes
async fn send_serialized_response(
network: Network,
mut response_tx: SendStream,
serialized_response: &[u8],
) -> Result<(), error::connection::SendResponse> {
tracing::trace!(
send_id = %response_tx.id(),
"Sending response"
);
response_tx
.write_all(serialized_response)
.await
.map_err(|err| {
{
error::connection::Send::Write {
stream_id: response_tx.id(),
source: err,
}
async {
response_tx
.write_all(&message::magic_bytes(network))
.await?;
response_tx.write_all(serialized_response).await
}
.await
.map_err(|err| {
{
error::connection::Send::Write {
stream_id: response_tx.id(),
source: err,
}
.into()
})
}
.into()
})
}

async fn send_response(
network: Network,
mut response_tx: SendStream,
response: ResponseMessage,
) -> Result<(), error::connection::SendResponse> {
Expand All @@ -272,8 +316,9 @@ impl Connection {
send_id = %response_tx.id(),
"Sending response"
);
let response_bytes = bincode::serialize(&response)?;
response_tx.write_all(&response_bytes).await.map_err(|err| {
let mut message_buf = message::magic_bytes(network).to_vec();
bincode::serialize_into::<&mut Vec<_>, _>(&mut message_buf, &response)?;
response_tx.write_all(&message_buf).await.map_err(|err| {
{
error::connection::Send::Write {
stream_id: response_tx.id(),
Expand All @@ -285,15 +330,10 @@ impl Connection {
}
}

impl From<quinn::Connection> for Connection {
fn from(inner: quinn::Connection) -> Self {
Self { inner }
}
}

pub struct ConnectionContext {
pub env: sneed::Env,
pub archive: Archive,
pub network: Network,
pub state: State,
}

Expand Down Expand Up @@ -427,7 +467,8 @@ pub fn connect(
let status_repr = status_repr.clone();
let info_tx = info_tx.clone();
move || async move {
let connection = Connection::new(connecting).await?;
let connection =
Connection::from_connecting(connecting, ctxt.network).await?;
status_repr.store(
PeerConnectionStatus::Connected.as_repr(),
atomic::Ordering::SeqCst,
Expand Down
7 changes: 6 additions & 1 deletion lib/net/peer/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ impl ConnectionTask {
}
(_, _) => ResponseMessage::NoBlock { block_hash },
};
let () = Connection::send_response(response_tx, resp).await?;
let () =
Connection::send_response(ctxt.network, response_tx, resp).await?;
Ok(())
}

Expand Down Expand Up @@ -661,6 +662,7 @@ impl ConnectionTask {
match validate_tx_result {
Err(err) => {
Connection::send_response(
ctxt.network,
response_tx,
ResponseMessage::TransactionRejected(txid),
)
Expand All @@ -669,6 +671,7 @@ impl ConnectionTask {
}
Ok(_) => {
Connection::send_response(
ctxt.network,
response_tx,
ResponseMessage::TransactionAccepted(txid),
)
Expand Down Expand Up @@ -844,8 +847,10 @@ impl ConnectionTask {
serialized_response,
response_tx,
}) => {
let network = ctxt.network;
self.mailbox_tx.send_response_spawner.spawn(async move {
Connection::send_serialized_response(
network,
response_tx,
&serialized_response,
)
Expand Down
Loading