Skip to content

feat: provide per-operation concurrent limit#7329

Open
codetyri0n wants to merge 1 commit intoapache:mainfrom
codetyri0n:climit
Open

feat: provide per-operation concurrent limit#7329
codetyri0n wants to merge 1 commit intoapache:mainfrom
codetyri0n:climit

Conversation

@codetyri0n
Copy link
Copy Markdown

Which issue does this PR close?

Closes #7245

Rationale for this change

What changes are included in this PR?

  • checks to use global limit or per-operation limit for semaphores
  • checks for using which (global/per-operation) semaphore to be used with fallbacks
  • unit tests

Are there any user-facing changes?

yes

AI Usage Statement

Used claude opus 4.6 for code generation and manually reviewed by me, please flag if any concerns!

@codetyri0n codetyri0n requested a review from Xuanwo as a code owner March 27, 2026 16:53
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. releases-note/feat The PR implements a new feature or has a title that begins with "feat" labels Mar 27, 2026
);
}

#[tokio::test]
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test to check whether exhausting per operation semaphore is unaffected by exhausting of global semaphore

);
}

#[tokio::test]
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks whether block occurs if per-operation semaphore is exhausted

}

#[tokio::test]
async fn operations_without_per_op_limit_use_global() {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks whether only the assigned operation gets per-operation limit and usage of global for other operations

Copy link
Copy Markdown
Contributor

@dentiny dentiny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more comment on unit test: most of the comments seem unnecessary -- the code is already explanatory itself

/// # }
/// ```
///
/// Set per-operation concurrent limits to control different operations
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code example seems duplicate as with_operation_limit, do we need both?

/// # Ok(())
/// # }
/// ```
pub fn with_operation_limit(mut self, op: Operation, permits: usize) -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things here:

  • I think certain operations are not supported in the concurrent limit layer, for example, copy/rename/presign, but for your implementation it will pass through if users do with_operation_limit(Operation::Copy, 1) which is a no-op, shall we check it?
  • Should we validate permits are larger than 0? otherwise the operation is permanently blocked.

pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> {
inner: A,
semaphore: S,
operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  • Should we follow the template pattern here for customized semaphore?
  • Curious why do we need Arc wrapper for semaphore?
Suggested change
operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>,
operation_limits: Option<Arc<HashMap<Operation, S>>>,

// Stat should still work because it has a dedicated per-operation
// semaphore with 10 permits, bypassing the exhausted global one.
let stat_result = timeout(Duration::from_millis(200), op.stat("any")).await;
assert!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I usually prefer unwrap in unit tests over is_ok, since you can see the error directly.

}
}

/// A permit that can come from either the global semaphore (generic `S`) or
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I'm not convinced the enum here is necessary, does it work?

type Reader = ConcurrentLimitLayerWrapper<A::Reader, S:Permit>;
async fn acquire_for(&self, op: Operation) -> ConcurrentLimitPermit<S::Permit> {
        if let Some(limits) = &self.operation_limits {
            if let Some(sem) = limits.get(&op) {
                return sem.acquire().await;
            }
        }
        self.semaphore.acquire().await
    }

pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
operation_semaphore: S,
http_semaphore: Option<S>,
operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a HashMap, we could simply use a struct to hold read_semaphore and write_semaphore.

We can implement the functionality step by step: first implement read, write, and list operations, then later add copy and rename.

Another question is: what is the relationship between operation_semaphore and read_semaphore? When read_semaphore is set, will we simply ignore operation_semaphore?

cc @dentiny and @codetyri0n

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

releases-note/feat The PR implements a new feature or has a title that begins with "feat" size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

new feature: provide per-operation concurrent limit

3 participants