Skip to content

Commit 5c1e60f

Browse files
lambda improvements (#6171)
* make lambda support optional * map lambda error to actual variant * retry rate limited lambdas
1 parent abd8d8b commit 5c1e60f

11 files changed

Lines changed: 149 additions & 32 deletions

File tree

quickwit/quickwit-cli/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ release-feature-set = [
101101
"quickwit-indexing/pulsar",
102102
"quickwit-indexing/sqs",
103103
"quickwit-indexing/vrl",
104+
"quickwit-serve/lambda",
104105
"quickwit-storage/azure",
105106
"quickwit-storage/gcs",
106107
"quickwit-metastore/postgres",
@@ -114,6 +115,7 @@ release-feature-vendored-set = [
114115
"quickwit-indexing/sqs",
115116
"quickwit-indexing/vrl",
116117
"quickwit-indexing/vendored-kafka",
118+
"quickwit-serve/lambda",
117119
"quickwit-storage/azure",
118120
"quickwit-storage/gcs",
119121
"quickwit-metastore/postgres",
@@ -126,6 +128,7 @@ release-macos-feature-vendored-set = [
126128
"quickwit-indexing/sqs",
127129
"quickwit-indexing/vrl",
128130
"quickwit-indexing/vendored-kafka-macos",
131+
"quickwit-serve/lambda",
129132
"quickwit-storage/azure",
130133
"quickwit-storage/gcs",
131134
"quickwit-metastore/postgres",

quickwit/quickwit-lambda-client/src/deploy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
//! - Old versions are garbage collected (keep current + top 5 most recent)
2727
2828
use std::collections::HashMap;
29-
use std::sync::{Arc, OnceLock};
29+
use std::sync::OnceLock;
3030

3131
use anyhow::{Context, anyhow};
3232
use aws_sdk_lambda::Client as LambdaClient;
@@ -108,7 +108,7 @@ fn version_description(deploy_config_opt: Option<&LambdaDeployConfig>) -> String
108108
/// ensuring the deployed Lambda matches the embedded binary.
109109
pub async fn try_get_or_deploy_invoker(
110110
lambda_config: &LambdaConfig,
111-
) -> anyhow::Result<Arc<dyn LambdaLeafSearchInvoker>> {
111+
) -> anyhow::Result<impl LambdaLeafSearchInvoker> {
112112
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
113113
let client = LambdaClient::new(&aws_config);
114114
let function_name = &lambda_config.function_name;

quickwit/quickwit-lambda-client/src/invoker.rs

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,65 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use anyhow::Context as _;
1816
use async_trait::async_trait;
1917
use aws_sdk_lambda::Client as LambdaClient;
18+
use aws_sdk_lambda::error::{DisplayErrorContext, SdkError};
19+
use aws_sdk_lambda::operation::invoke::InvokeError;
2020
use aws_sdk_lambda::primitives::Blob;
2121
use aws_sdk_lambda::types::InvocationType;
2222
use base64::prelude::*;
2323
use prost::Message;
24+
use quickwit_common::retry::{RetryParams, retry};
2425
use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload};
2526
use quickwit_proto::search::{LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest};
2627
use quickwit_search::{LambdaLeafSearchInvoker, SearchError};
2728
use tracing::{debug, info, instrument};
2829

2930
use crate::metrics::LAMBDA_METRICS;
3031

32+
/// Maps an AWS Lambda SDK invocation error to a `SearchError`.
33+
fn invoke_error_to_search_error(error: SdkError<InvokeError>) -> SearchError {
34+
if let SdkError::ServiceError(ref service_error) = error
35+
&& matches!(
36+
service_error.err(),
37+
InvokeError::TooManyRequestsException(_)
38+
| InvokeError::EniLimitReachedException(_)
39+
| InvokeError::SubnetIpAddressLimitReachedException(_)
40+
| InvokeError::Ec2ThrottledException(_)
41+
| InvokeError::ResourceConflictException(_)
42+
)
43+
{
44+
return SearchError::TooManyRequests;
45+
}
46+
47+
let is_timeout = match &error {
48+
SdkError::TimeoutError(_) => true,
49+
SdkError::DispatchFailure(failure) => failure.is_timeout(),
50+
SdkError::ServiceError(service_error) => matches!(
51+
service_error.err(),
52+
InvokeError::EfsMountTimeoutException(_) | InvokeError::SnapStartTimeoutException(_)
53+
),
54+
_ => false,
55+
};
56+
57+
let error_msg = format!("lambda invocation failed: {}", DisplayErrorContext(&error));
58+
59+
if is_timeout {
60+
SearchError::Timeout(error_msg)
61+
} else {
62+
SearchError::Internal(error_msg)
63+
}
64+
}
65+
3166
/// Create a Lambda invoker for a specific version.
3267
///
3368
/// The version number is used as the qualifier when invoking, ensuring we call
3469
/// the exact published version (not $LATEST).
3570
pub(crate) async fn create_lambda_invoker_for_version(
3671
function_name: String,
3772
version: String,
38-
) -> anyhow::Result<Arc<dyn LambdaLeafSearchInvoker>> {
73+
) -> anyhow::Result<AwsLambdaInvoker> {
3974
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
4075
let client = LambdaClient::new(&aws_config);
4176
let invoker = AwsLambdaInvoker {
@@ -44,11 +79,11 @@ pub(crate) async fn create_lambda_invoker_for_version(
4479
version,
4580
};
4681
invoker.validate().await?;
47-
Ok(Arc::new(invoker))
82+
Ok(invoker)
4883
}
4984

5085
/// AWS Lambda implementation of RemoteFunctionInvoker.
51-
struct AwsLambdaInvoker {
86+
pub(crate) struct AwsLambdaInvoker {
5287
client: LambdaClient,
5388
function_name: String,
5489
/// The version number to invoke (e.g., "7", "12").
@@ -79,6 +114,12 @@ impl AwsLambdaInvoker {
79114
}
80115
}
81116

117+
const LAMBDA_RETRY_PARAMS: RetryParams = RetryParams {
118+
base_delay: std::time::Duration::from_secs(1),
119+
max_delay: std::time::Duration::from_secs(10),
120+
max_attempts: 3,
121+
};
122+
82123
#[async_trait]
83124
impl LambdaLeafSearchInvoker for AwsLambdaInvoker {
84125
#[instrument(skip(self, request), fields(function_name = %self.function_name, version = %self.version))]
@@ -88,7 +129,10 @@ impl LambdaLeafSearchInvoker for AwsLambdaInvoker {
88129
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
89130
let start = std::time::Instant::now();
90131

91-
let result = self.invoke_leaf_search_inner(request).await;
132+
let result = retry(&LAMBDA_RETRY_PARAMS, || {
133+
self.invoke_leaf_search_inner(request.clone())
134+
})
135+
.await;
92136

93137
let elapsed = start.elapsed().as_secs_f64();
94138
let status = if result.is_ok() { "success" } else { "error" };
@@ -141,7 +185,7 @@ impl AwsLambdaInvoker {
141185
let response = invoke_builder
142186
.send()
143187
.await
144-
.map_err(|e| SearchError::Internal(format!("Lambda invocation error: {}", e)))?;
188+
.map_err(invoke_error_to_search_error)?;
145189

146190
// Check for function error
147191
if let Some(error) = response.function_error() {

quickwit/quickwit-lambda-server/src/context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ impl LambdaSearcherContext {
3333
info!("initializing lambda searcher context");
3434

3535
let searcher_config = try_searcher_config_from_env()?;
36-
let searcher_context = Arc::new(SearcherContext::new(searcher_config, None, None));
36+
let searcher_context =
37+
Arc::new(SearcherContext::new_without_invoker(searcher_config, None));
3738
let storage_resolver = StorageResolver::configured(&Default::default());
3839

3940
Ok(Self {

quickwit/quickwit-search/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use itertools::Itertools;
1616
use quickwit_common::rate_limited_error;
17+
use quickwit_common::retry::Retryable;
1718
use quickwit_doc_mapper::QueryParserError;
1819
use quickwit_proto::error::grpc_error_to_grpc_status;
1920
use quickwit_proto::metastore::{EntityKind, MetastoreError};
@@ -175,6 +176,12 @@ impl From<MetastoreError> for SearchError {
175176
}
176177
}
177178

179+
impl Retryable for SearchError {
180+
fn is_retryable(&self) -> bool {
181+
matches!(self, SearchError::TooManyRequests | SearchError::Timeout(_))
182+
}
183+
}
184+
178185
impl From<JoinError> for SearchError {
179186
fn from(join_error: JoinError) -> SearchError {
180187
SearchError::Internal(format!("spawned task in root join failed: {join_error}"))

quickwit/quickwit-search/src/invoker.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,27 @@ pub trait LambdaLeafSearchInvoker: Send + Sync + 'static {
3636
request: LeafSearchRequest,
3737
) -> Result<Vec<LambdaSingleSplitResult>, SearchError>;
3838
}
39+
40+
#[async_trait]
41+
impl<T> LambdaLeafSearchInvoker for Box<T>
42+
where T: LambdaLeafSearchInvoker + ?Sized
43+
{
44+
async fn invoke_leaf_search(
45+
&self,
46+
request: LeafSearchRequest,
47+
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
48+
(**self).invoke_leaf_search(request).await
49+
}
50+
}
51+
52+
#[async_trait]
53+
impl<T> LambdaLeafSearchInvoker for std::sync::Arc<T>
54+
where T: LambdaLeafSearchInvoker + ?Sized
55+
{
56+
async fn invoke_leaf_search(
57+
&self,
58+
request: LeafSearchRequest,
59+
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
60+
(**self).invoke_leaf_search(request).await
61+
}
62+
}

quickwit/quickwit-search/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ pub async fn single_node_search(
287287
let search_job_placer = SearchJobPlacer::new(searcher_pool.clone());
288288
let cluster_client = ClusterClient::new(search_job_placer);
289289
let searcher_config = SearcherConfig::default();
290-
let searcher_context = Arc::new(SearcherContext::new(searcher_config, None, None));
290+
let searcher_context = Arc::new(SearcherContext::new_without_invoker(searcher_config, None));
291291
let search_service = Arc::new(SearchServiceImpl::new(
292292
metastore.clone(),
293293
storage_resolver,

quickwit/quickwit-search/src/service.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,14 +436,26 @@ impl SearcherContext {
436436
#[cfg(test)]
437437
pub fn for_test() -> SearcherContext {
438438
let searcher_config = SearcherConfig::default();
439-
SearcherContext::new(searcher_config, None, None)
439+
SearcherContext::new_without_invoker(searcher_config, None)
440+
}
441+
442+
/// Creates a new searcher context without a lambda invoker.
443+
pub fn new_without_invoker(
444+
searcher_config: SearcherConfig,
445+
split_cache_opt: Option<Arc<SplitCache>>,
446+
) -> Self {
447+
Self::new(
448+
searcher_config,
449+
split_cache_opt,
450+
None::<Box<dyn LambdaLeafSearchInvoker>>,
451+
)
440452
}
441453

442454
/// Creates a new searcher context, given a searcher config, and an optional `SplitCache`.
443455
pub fn new(
444456
searcher_config: SearcherConfig,
445457
split_cache_opt: Option<Arc<SplitCache>>,
446-
lambda_invoker: Option<Arc<dyn LambdaLeafSearchInvoker>>,
458+
lambda_invoker: Option<impl LambdaLeafSearchInvoker + 'static>,
447459
) -> Self {
448460
let global_split_footer_cache = MemorySizedCache::from_config(
449461
&searcher_config.split_footer_cache,
@@ -463,6 +475,9 @@ impl SearcherContext {
463475
Some(searcher_config.aggregation_bucket_limit),
464476
);
465477

478+
let lambda_invoker =
479+
lambda_invoker.map(|invoker| Arc::new(invoker) as Arc<dyn LambdaLeafSearchInvoker>);
480+
466481
Self {
467482
searcher_config,
468483
fast_fields_cache: storage_long_term_cache,

quickwit/quickwit-search/src/tests.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,8 +1028,10 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec<u32> {
10281028
max_hits: 100,
10291029
..Default::default()
10301030
});
1031-
let searcher_context: Arc<SearcherContext> =
1032-
Arc::new(SearcherContext::new(SearcherConfig::default(), None, None));
1031+
let searcher_context: Arc<SearcherContext> = Arc::new(SearcherContext::new_without_invoker(
1032+
SearcherConfig::default(),
1033+
None,
1034+
));
10331035

10341036
let search_response = single_doc_mapping_leaf_search(
10351037
searcher_context,
@@ -1666,7 +1668,10 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> {
16661668
.into_iter()
16671669
.map(|split| extract_split_and_footer_offsets(&split.split_metadata))
16681670
.collect();
1669-
let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None, None));
1671+
let searcher_context = Arc::new(SearcherContext::new_without_invoker(
1672+
SearcherConfig::default(),
1673+
None,
1674+
));
16701675

16711676
{
16721677
let request = ListTermsRequest {

quickwit/quickwit-serve/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ quickwit-opentelemetry = { workspace = true }
7272
quickwit-proto = { workspace = true }
7373
quickwit-query = { workspace = true }
7474
quickwit-search = { workspace = true }
75-
quickwit-lambda-client = { workspace = true }
75+
quickwit-lambda-client = { workspace = true, optional = true }
7676
quickwit-storage = { workspace = true }
7777
quickwit-telemetry = { workspace = true }
7878

@@ -115,3 +115,6 @@ sqs-for-tests = [
115115
"quickwit-indexing/sqs",
116116
"quickwit-indexing/sqs-test-helpers"
117117
]
118+
lambda = [
119+
"quickwit-lambda-client"
120+
]

0 commit comments

Comments
 (0)