Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ lazy_static = "1.5.0"
quickcheck = "1.0.3"
nix = { version = "0.29", features = ["process"] }
serde = { version = "1.0.216", default-features = false, features = ["derive"] }
bincode = "1.3.3"
tokio = { version = "1.42.0", features = ["full"] }
tokio-util = { version = "0.7.13", features = ["codec"] }
tokio-stream = "0.1.17"
futures = "0.3.31"
tokio-serde = { version = "0.9.0", features = ["bincode"] }
tokio-serde = { version = "0.9.0", features = ["messagepack"] }
tracing = { version = "0.1.41", features = ["attributes"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
futures-util = "0.3.31"
Expand Down
1 change: 0 additions & 1 deletion torture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ libc.workspace = true
anyhow.workspace = true
cfg-if.workspace = true
serde.workspace = true
bincode.workspace = true
nomt = { path = "../nomt" }
tokio.workspace = true
tokio-util.workspace = true
Expand Down
10 changes: 5 additions & 5 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
},
time::{error::Elapsed, sleep, timeout},
};
use tokio_serde::{formats::SymmetricalBincode, SymmetricallyFramed};
use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed};
use tokio_stream::StreamExt as _;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::trace;
Expand Down Expand Up @@ -458,12 +458,12 @@ struct Stream {
rd_stream: SymmetricallyFramed<
FramedRead<BufReader<OwnedReadHalf>, LengthDelimitedCodec>,
Envelope<ToAgent>,
SymmetricalBincode<Envelope<ToAgent>>,
SymmetricalMessagePack<Envelope<ToAgent>>,
>,
wr_stream: SymmetricallyFramed<
FramedWrite<BufWriter<OwnedWriteHalf>, LengthDelimitedCodec>,
Envelope<ToSupervisor>,
SymmetricalBincode<Envelope<ToSupervisor>>,
SymmetricalMessagePack<Envelope<ToSupervisor>>,
>,
}

Expand All @@ -479,7 +479,7 @@ impl Stream {
.max_frame_length(MAX_ENVELOPE_SIZE)
.new_codec(),
),
SymmetricalBincode::default(),
SymmetricalMessagePack::default(),
);
let wr_stream = SymmetricallyFramed::new(
FramedWrite::new(
Expand All @@ -489,7 +489,7 @@ impl Stream {
.max_frame_length(MAX_ENVELOPE_SIZE)
.new_codec(),
),
SymmetricalBincode::default(),
SymmetricalMessagePack::default(),
);
Self {
rd_stream,
Expand Down
14 changes: 7 additions & 7 deletions torture/src/supervisor/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,33 @@ use tokio::{
sync::{oneshot, Mutex},
time::timeout,
};
use tokio_serde::{formats::SymmetricalBincode, SymmetricallyFramed};
use tokio_serde::{formats::SymmetricalMessagePack, SymmetricallyFramed};
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

use crate::message::{self, Envelope, ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE};

/// The type definition of a sink which is built:
///
/// - bincode serializer using [`Envelope<ToAgent>`].
/// - MessagePack serializer using [`Envelope<ToAgent>`].
/// - length-delimited codec.
/// - buf writer.
/// - unix stream (write half).
type WrStream = SymmetricallyFramed<
FramedWrite<BufWriter<OwnedWriteHalf>, LengthDelimitedCodec>,
Envelope<ToAgent>,
SymmetricalBincode<Envelope<ToAgent>>,
SymmetricalMessagePack<Envelope<ToAgent>>,
>;
/// The type definition of a stream which is built:
///
/// - unix stream (read half).
/// - buf reader.
/// - length-delimited codec.
/// - bincode deserializer using [`Envelope<ToSupervisor>`].
/// - MessagePack deserializer using [`Envelope<ToSupervisor>`].
type RdStream = SymmetricallyFramed<
FramedRead<BufReader<OwnedReadHalf>, LengthDelimitedCodec>,
Envelope<ToSupervisor>,
SymmetricalBincode<Envelope<ToSupervisor>>,
SymmetricalMessagePack<Envelope<ToSupervisor>>,
>;

/// A means to communicate with an agent.
Expand Down Expand Up @@ -147,7 +147,7 @@ pub fn run(stream: UnixStream) -> (RequestResponse, impl Future<Output = anyhow:
.max_frame_length(MAX_ENVELOPE_SIZE)
.new_codec(),
),
SymmetricalBincode::default(),
SymmetricalMessagePack::default(),
);
let rd_stream = SymmetricallyFramed::new(
FramedRead::new(
Expand All @@ -157,7 +157,7 @@ pub fn run(stream: UnixStream) -> (RequestResponse, impl Future<Output = anyhow:
.max_frame_length(MAX_ENVELOPE_SIZE)
.new_codec(),
),
SymmetricalBincode::default(),
SymmetricalMessagePack::default(),
);

let timeout = Duration::from_secs(60);
Expand Down