Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
322f622
fix: harden engine actor messaging
artrixdotdev May 21, 2026
645211b
fix: harden peer actor communication
artrixdotdev May 21, 2026
c3e0774
refactor: split torrent public facade
artrixdotdev May 21, 2026
074a1ad
refactor: extract torrent domain primitives
artrixdotdev May 21, 2026
4df9c72
refactor: split tracker actor modules
artrixdotdev May 21, 2026
3a0bb93
refactor: return engine API errors
artrixdotdev May 21, 2026
699fcb6
docs: mark illustrative doctests
artrixdotdev May 21, 2026
f9e2e36
docs: document actor boundaries
artrixdotdev May 21, 2026
20a9fa2
refactor: bound actor broadcast concurrency
artrixdotdev May 21, 2026
20ac661
fix: timeout incoming peer handshakes
artrixdotdev May 21, 2026
e0f498f
feat: add PieceScheduler for block-level request tracking
artrixdotdev May 22, 2026
18dac50
feat: add PieceStoreActor for disk I/O offloading
artrixdotdev May 22, 2026
eddffe3
refactor: move PieceManager trait to pieces module
artrixdotdev May 22, 2026
b063fff
refactor: expose pieces module from lib
artrixdotdev May 22, 2026
ea43f7c
refactor: register pieces module in lib.rs
artrixdotdev May 22, 2026
0ef612f
refactor: extract piece flow logic into dedicated module
artrixdotdev May 22, 2026
b2fd539
refactor: extract swarm management into dedicated module
artrixdotdev May 22, 2026
f68e4d0
refactor: reorganize torrent module declarations
artrixdotdev May 22, 2026
1d54730
refactor: point handle.rs PieceManager import to pieces module
artrixdotdev May 22, 2026
0c6086c
refactor: restructure torrent actor internal state
artrixdotdev May 22, 2026
fd57fad
refactor: add PeerConnected variant to TorrentMessage
artrixdotdev May 22, 2026
d0d15f5
refactor: add PeerRejectedRequest variant to TorrentMessage
artrixdotdev May 22, 2026
35a4543
refactor: widen IncomingPiece to carry PeerId
artrixdotdev May 22, 2026
b0ff388
refactor: remove unused CurrentPeers and CurrentTrackers request types
artrixdotdev May 22, 2026
729a657
feat: implement piece serving via Request handler
artrixdotdev May 22, 2026
aa9e2a9
refactor: update KillPeer and PeerReady message handlers
artrixdotdev May 22, 2026
86bd597
refactor: update InfoBytes and Bitfield response for plain BitVec
artrixdotdev May 22, 2026
3c43e8b
feat: add on_stop lifecycle hook to peer actor
artrixdotdev May 22, 2026
3b4ea79
refactor: add notify_ready and reject_piece_request helpers to peer a…
artrixdotdev May 22, 2026
411f96d
refactor: simplify send_message choked handling
artrixdotdev May 22, 2026
0377913
refactor: update NeedPiece handler for choked queuing and send errors
artrixdotdev May 22, 2026
de21500
refactor: improve Piece handler with PeerId validation and error repo…
artrixdotdev May 22, 2026
2b66bd1
refactor: notify torrent when peer becomes ready after unchoke and bi…
artrixdotdev May 22, 2026
a1b965e
fix: handle disconnected peer stream in remote_addr and Display
artrixdotdev May 22, 2026
b91c2cf
feat: add announce retry on stale connection ID
artrixdotdev May 22, 2026
754c8db
chore: remove deprecated torrent/piece.rs
artrixdotdev May 22, 2026
a7cc4a3
chore: remove deprecated torrent/piece_manager.rs
artrixdotdev May 22, 2026
a5ed5c3
chore: formatting
artrixdotdev May 22, 2026
c7e6bc7
perf: move interest calculation to torrent
artrixdotdev May 22, 2026
031ac7e
chore: use shorter import syntax
artrixdotdev May 22, 2026
f02294c
fix: apply CodeRabbit auto-fixes
coderabbitai[bot] May 22, 2026
fd98eb1
refactor(#1930: improve actor efficiency
artrixdotdev May 22, 2026
ca9fbf3
chore: fix clippy errors
artrixdotdev May 22, 2026
98271b3
chore: format code
artrixdotdev May 22, 2026
0cde487
fix: apply CodeRabbit auto-fixes
coderabbitai[bot] May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions crates/libtortillas/src/ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# libtortillas Architecture

The library is organized around a small actor hierarchy:

- `EngineActor` owns global listeners, the shared UDP tracker socket, and the torrent registry.
- `TorrentActor` owns per-torrent state and coordinates peers, trackers, piece progress, and exports.
- `PeerActor` owns one peer connection and peer-local protocol state.
- `TrackerActor` owns one tracker announce loop and forwards discovered peers to its torrent supervisor.

Module facades should export stable public types while keeping actor internals private to the crate.
Domain types such as torrent state, storage strategy, exported snapshots, tracker model types, and tracker stats live outside actor files so actors can focus on orchestration.
53 changes: 39 additions & 14 deletions crates/libtortillas/src/engine/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use kameo::{
mailbox,
prelude::{ActorRef, Context, Message},
};
use tokio::time::{Duration, timeout};
use tracing::{error, warn};

use super::{EngineActor, EngineExport};
Expand All @@ -22,6 +23,8 @@ use crate::{
},
};

const INCOMING_PEER_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);

pub(crate) enum EngineMessage {
/// Handles an incoming peer connection. The peer has been neither handshaked
/// nor verified at this point.
Expand Down Expand Up @@ -49,32 +52,56 @@ impl Message<EngineMessage> for EngineActor {
) -> Self::Reply {
match msg {
EngineMessage::IncomingPeer(mut stream) => {
let msg = stream.recv().await.expect("Can't read from peer stream");
let peer_addr = stream.remote_addr().expect("Can't get remote address");
let msg = match timeout(INCOMING_PEER_HANDSHAKE_TIMEOUT, stream.recv()).await {
Ok(Ok(msg)) => msg,
Ok(Err(err)) => {
warn!(error = %err, %stream, "Failed to read incoming peer handshake");
return;
}
Err(_) => {
warn!(%stream, timeout = ?INCOMING_PEER_HANDSHAKE_TIMEOUT, "Timed out reading incoming peer handshake");
return;
}
};
let peer_addr = match stream.remote_addr() {
Ok(addr) => addr,
Err(err) => {
warn!(error = %err, %stream, "Failed to get incoming peer remote address");
return;
}
};

if let PeerMessages::Handshake(handshake) = msg {
let info_hash = *handshake.info_hash;
let peer = Peer::from_socket_addr(peer_addr);
let mut peer = Peer::from_socket_addr(peer_addr);

// Populate peer fields from parsed handshake
peer.id = Some(handshake.peer_id);
peer.reserved = handshake.reserved;

if let Some(torrent) = self.torrents.get(&info_hash) {
torrent
if let Err(err) = torrent
.tell(TorrentMessage::IncomingPeer(peer, stream))
.await
.expect("Failed to tell torrent about incoming peer");
{
warn!(error = %err, %info_hash, "Failed to route incoming peer to torrent");
}
} else {
error!(%stream, "Received incoming peer for unknown torrent, killing connection");
drop(stream);
}
} else {
error!("Received unexpected message from peer");
error!(message = %msg, "Received unexpected message from peer");
}
}
EngineMessage::StartAll => {
for torrent in self.torrents.iter() {
torrent
if let Err(err) = torrent
.tell(TorrentMessage::SetState(TorrentState::Downloading))
.await
.expect("Failed to start torrent");
{
warn!(error = %err, "Failed to start torrent");
}
}
}
};
Expand All @@ -88,12 +115,10 @@ impl Message<EngineRequest> for EngineActor {
) -> Self::Reply {
match msg {
EngineRequest::Torrent(metainfo) => {
let info_hash = metainfo
.info_hash()
.map_err(|e| {
error!(error = %e, "Failed to unwrap info hash");
})
.expect("Failed to unwrap info hash");
let info_hash = metainfo.info_hash().map_err(|e| {
error!(error = %e, "Failed to unwrap info hash");
EngineError::Other(e)
})?;

if self.torrents.contains_key(&info_hash) {
error!(
Expand Down
35 changes: 20 additions & 15 deletions crates/libtortillas/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@ use crate::{
///
/// # Example
/// ```no_run
/// use std::net::SocketAddr;
///
/// use libtortillas::prelude::*;
/// // Create an engine with no explicit addresses
/// let engine = Engine::builder()
/// // Optionally provide addresses for our sockets to listen on
/// .tcp_addr("127.0.0.1:6881".parse().unwrap())
/// .utp_addr("127.0.0.1:6882".parse().unwrap())
/// .udp_addr("127.0.0.1:6883".parse().unwrap())
/// .tcp_addr("127.0.0.1:6881".parse::<SocketAddr>().unwrap())
/// .utp_addr("127.0.0.1:6882".parse::<SocketAddr>().unwrap())
/// .udp_addr("127.0.0.1:6883".parse::<SocketAddr>().unwrap())
/// .build();
/// ```
/// Or with all default settings
/// ```
/// ```no_run
/// use libtortillas::prelude::*;
/// let engine = Engine::default();
/// ```
Expand Down Expand Up @@ -239,14 +241,15 @@ impl Engine {
})?
};

let info_hash = metainfo.info_hash().expect("Failed to fetch info hash");
let info_hash = metainfo.info_hash()?;

let torrent_ref = match self
let response = self
.actor()
.ask(EngineRequest::Torrent(Box::new(metainfo)))
.await
.expect("Failed to add torrent")
{
.map_err(|e| EngineError::Other(anyhow::anyhow!(e.to_string())))?;

let torrent_ref = match response {
EngineResponse::Torrent(torrent_ref) => torrent_ref,
#[allow(unreachable_patterns)]
_ => unreachable!(),
Expand All @@ -258,24 +261,26 @@ impl Engine {
}
/// Starts all torrents managed by the engine.
/// See [`Torrent::start`] for more information.
pub async fn start_all(&self) {
pub async fn start_all(&self) -> Result<(), EngineError> {
self
.actor()
.tell(EngineMessage::StartAll)
.await
.expect("Failed to start all torrents");
.map_err(|e| EngineError::Other(anyhow::anyhow!(e.to_string())))?;
Ok(())
}

/// Exports the current state of the engine.
/// See [`Torrent::export`] for more information.
pub async fn export(&self) -> EngineExport {
match self
pub async fn export(&self) -> Result<EngineExport, EngineError> {
let response = self
.actor()
.ask(EngineRequest::Export)
.await
.expect("Failed to get torrents")
{
EngineResponse::Export(export) => export,
.map_err(|e| EngineError::Other(anyhow::anyhow!(e.to_string())))?;

match response {
EngineResponse::Export(export) => Ok(export),
_ => unreachable!(),
}
}
Expand Down
10 changes: 6 additions & 4 deletions crates/libtortillas/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ pub mod errors;
pub mod hashes;
pub mod metainfo;
pub mod peer;
pub mod pieces;
pub mod protocol;
pub mod torrent;
pub mod tracker;
pub(crate) mod util;

#[cfg(test)]
pub(crate) mod testing {
use std::{net::SocketAddr, path::PathBuf, str::FromStr};
use std::{env, net::SocketAddr, path::PathBuf, process, str::FromStr};

use rand::random_range;
use tokio::fs::read_to_string;

use crate::{
metainfo::{MagnetUri, MetaInfo, TorrentFile},
Expand Down Expand Up @@ -47,7 +49,7 @@ pub(crate) mod testing {
}

pub(crate) async fn read_magnet_fixture(file_name: &str) -> MetaInfo {
let contents = tokio::fs::read_to_string(magnet_fixture_path(file_name))
let contents = read_to_string(magnet_fixture_path(file_name))
.await
.unwrap();
MagnetUri::parse(contents).unwrap()
Expand All @@ -70,9 +72,9 @@ pub(crate) mod testing {
}

pub(crate) fn torrent_temp_path() -> PathBuf {
std::env::temp_dir().join(format!(
env::temp_dir().join(format!(
"tortillas-{}-{}",
std::process::id(),
process::id(),
rand::random::<u64>()
))
}
Expand Down
Loading
Loading