Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/pubsub/src/subscriber/keepalive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ pub(super) fn spawn(
#[cfg(test)]
mod tests {
use super::*;
use google_cloud_test_macros::tokio_test_no_panics;
use tokio::sync::mpsc::channel;

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn keepalive_interval() {
let start = Instant::now();
let (request_tx, mut request_rx) = channel(1);
Expand All @@ -74,7 +75,7 @@ mod tests {
assert_eq!(start.elapsed(), KEEPALIVE_PERIOD * 3);
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn shutdown_immediately() -> anyhow::Result<()> {
let start = Instant::now();
let (request_tx, mut request_rx) = channel(1);
Expand Down
11 changes: 6 additions & 5 deletions src/pubsub/src/subscriber/lease_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ mod tests {
use super::super::lease_state::tests::{sorted, test_id, test_ids, test_info};
use super::super::leaser::tests::MockLeaser;
use super::*;
use google_cloud_test_macros::tokio_test_no_panics;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Duration, Instant};
Expand All @@ -103,7 +104,7 @@ mod tests {
}
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn flush_acks_nacks_on_interval() -> anyhow::Result<()> {
const FLUSH_PERIOD: Duration = Duration::from_secs(1);
const FLUSH_START: Duration = Duration::from_millis(200);
Expand Down Expand Up @@ -202,7 +203,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn deadline_interval() -> anyhow::Result<()> {
const EXTEND_PERIOD: Duration = Duration::from_secs(1);
const EXTEND_START: Duration = Duration::from_millis(200);
Expand Down Expand Up @@ -266,7 +267,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn drop_does_not_wait_for_pending_operations() -> anyhow::Result<()> {
let start = Instant::now();
let mock = MockLeaser::new();
Expand All @@ -293,7 +294,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn close_waits_for_flush() -> anyhow::Result<()> {
const EXPECTED_SLEEP: Duration = Duration::from_millis(100);

Expand Down Expand Up @@ -338,7 +339,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn no_add_and_ack_race() -> anyhow::Result<()> {
// This test validates the use of `biased` in the select statement.
//
Expand Down
35 changes: 18 additions & 17 deletions src/pubsub/src/subscriber/message_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ mod tests {
use super::*;
use gaxi::grpc::tonic::{Response as TonicResponse, Status as TonicStatus};
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use google_cloud_test_macros::tokio_test_no_panics;
use pubsub_grpc_mock::google::pubsub::v1;
use pubsub_grpc_mock::{MockSubscriber, start};
use tokio::sync::mpsc::{channel, unbounded_channel};
Expand Down Expand Up @@ -326,7 +327,7 @@ mod tests {
.await?)
}

#[tokio::test]
#[tokio_test_no_panics]
async fn error_starting_stream() -> anyhow::Result<()> {
let mut mock = MockSubscriber::new();
mock.expect_streaming_pull()
Expand All @@ -350,7 +351,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn initial_request() -> anyhow::Result<()> {
const MIB: i64 = 1024 * 1024;

Expand Down Expand Up @@ -406,7 +407,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn basic_success() -> anyhow::Result<()> {
let (response_tx, response_rx) = channel(10);
let (ack_tx, mut ack_rx) = unbounded_channel();
Expand Down Expand Up @@ -450,7 +451,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn basic_lease_management() -> anyhow::Result<()> {
let (response_tx, response_rx) = channel(10);
let (ack_tx, mut ack_rx) = unbounded_channel();
Expand Down Expand Up @@ -542,7 +543,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn delayed_responses() -> anyhow::Result<()> {
// In this test, we verify the case where an application asks for a
// message, but a response is not immediately available on the stream.
Expand Down Expand Up @@ -573,7 +574,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn serves_messages_immediately() -> anyhow::Result<()> {
// This test verifies we do not do something crazy like draining the
// stream (which would never end) before serving messages to the
Expand Down Expand Up @@ -603,7 +604,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn handles_empty_response() -> anyhow::Result<()> {
let (response_tx, response_rx) = channel(10);

Expand Down Expand Up @@ -632,7 +633,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn handles_missing_message_field() -> anyhow::Result<()> {
let (response_tx, response_rx) = channel(10);
let (extend_tx, mut extend_rx) = unbounded_channel();
Expand Down Expand Up @@ -695,7 +696,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn permanent_error_midstream() -> anyhow::Result<()> {
let (response_tx, response_rx) = channel(10);

Expand Down Expand Up @@ -734,7 +735,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn keepalives() -> anyhow::Result<()> {
// We use this channel to surface writes (requests) from outside our
// mock expectation.
Expand Down Expand Up @@ -791,7 +792,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn client_id() -> anyhow::Result<()> {
// We use this channel to surface writes (requests) from outside our
// mock expectation.
Expand Down Expand Up @@ -860,7 +861,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn no_immediate_message() -> anyhow::Result<()> {
const TEST_TIMEOUT: Duration = Duration::from_secs(42);

Expand All @@ -881,7 +882,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn retry_transient_when_starting_stream() -> anyhow::Result<()> {
// The policy should retry forever. Our default retry policies have an
// attempt limit of 10. So we arbitrarily pick a number greater than 10
Expand Down Expand Up @@ -931,7 +932,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn resume_midstream_success() -> anyhow::Result<()> {
let (response_tx_1, response_rx_1) = channel(10);
let (response_tx_2, response_rx_2) = channel(10);
Expand Down Expand Up @@ -1002,7 +1003,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn resume_midstream_hits_permanent_error() -> anyhow::Result<()> {
let (response_tx, response_rx) = channel(10);
let (ack_tx, mut ack_rx) = unbounded_channel();
Expand Down Expand Up @@ -1076,7 +1077,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn routing_header() -> anyhow::Result<()> {
let mut mock = MockSubscriber::new();

Expand Down Expand Up @@ -1104,7 +1105,7 @@ mod tests {
}

#[cfg(feature = "unstable-stream")]
#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn into_stream() -> anyhow::Result<()> {
use futures::TryStreamExt;
let (response_tx, response_rx) = channel(10);
Expand Down
11 changes: 6 additions & 5 deletions src/pubsub/src/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ mod tests {
use google_cloud_gax::backoff_policy::BackoffPolicy;
use google_cloud_gax::error::rpc::{Code, Status};
use google_cloud_gax::retry_state::RetryState;
use google_cloud_test_macros::tokio_test_no_panics;

mockall::mock! {
#[derive(Debug)]
Expand Down Expand Up @@ -212,7 +213,7 @@ mod tests {
StreamingPullRequest::default()
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn success() -> anyhow::Result<()> {
let (response_tx, response_rx) = mpsc::channel(10);

Expand All @@ -236,7 +237,7 @@ mod tests {
Ok(())
}

#[tokio::test(start_paused = true)]
#[tokio_test_no_panics(start_paused = true)]
async fn keepalives() -> anyhow::Result<()> {
let (response_tx, response_rx) = mpsc::channel(10);
// We use this channel to surface writes (requests) from outside our
Expand Down Expand Up @@ -281,7 +282,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn error() -> anyhow::Result<()> {
let mut mock = MockStub::new();
mock.expect_streaming_pull()
Expand All @@ -297,7 +298,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn retry_then_success() -> anyhow::Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock_stub = MockStub::new();
Expand Down Expand Up @@ -344,7 +345,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio_test_no_panics]
async fn retry_then_permanent_failure() -> anyhow::Result<()> {
let mut seq = mockall::Sequence::new();
let mut mock_stub = MockStub::new();
Expand Down
Loading