Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions tensorrt_llm/serve/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,39 +822,49 @@ async def get_next_server(
server for server in self._server_state.keys()
if server != exclude_server
]
# Tokenize + block-hash is CPU-bound (~50 ms p50 for a 40 k-token
# chat request with a Rust-backed tokenizer). Running it directly
# inside the async handler blocks the orchestrator's event loop and
# serializes all concurrent requests through it; with HuggingFace
# tokenizers releasing the GIL, offloading to a thread lets multiple
# tokenize calls run in parallel and frees the event loop to
# dispatch HTTP traffic to the CTX/GEN workers meanwhile.
token_lists, block_hashes = await asyncio.to_thread(
self._tokenize_and_compute_block_hashes, request)
padded_tokens = sum(
len(hash_list)
for hash_list in block_hashes) * self._tokens_per_block
# select the server by (KV match - load)
# TODO: more options
workloads = [
state.num_active_requests()
for state in self._server_state.values()
]
scores = []
block_hashes = []
token_lists = []
matches = []
for i in range(len(servers)):
server = servers[i]
# https://github.com/ai-dynamo/dynamo/blob/main/docs/kv_cache_routing.md#kv-cache-routing-and-load-balancing
matches.append(
await self._server_state[server].matched_tokens(block_hashes))
score = matches[-1] / padded_tokens - workloads[
i] / self._max_batch_size
scores.append(score)
max_score = max(scores)
tied = [i for i, s in enumerate(scores) if s == max_score]
winner = tied[self._rr_counter % len(tied)]
self._rr_counter += 1
server = servers[winner]
if len(servers) == 0:
raise RuntimeError(
f"No available servers after excluding {exclude_server}")
elif len(servers) == 1:
server = servers[0]
else:
# Tokenize + block-hash is CPU-bound (~50 ms p50 for a 40 k-token
# chat request with a Rust-backed tokenizer). Running it directly
# inside the async handler blocks the orchestrator's event loop and
# serializes all concurrent requests through it; with HuggingFace
# tokenizers releasing the GIL, offloading to a thread lets multiple
# tokenize calls run in parallel and frees the event loop to
# dispatch HTTP traffic to the CTX/GEN workers meanwhile.
token_lists, block_hashes = await asyncio.to_thread(
self._tokenize_and_compute_block_hashes, request)
padded_tokens = sum(
len(hash_list)
for hash_list in block_hashes) * self._tokens_per_block
# select the server by (KV match - load)
# TODO: more options
workloads = [
state.num_active_requests()
for state in self._server_state.values()
]
scores = []
matches = []
for i in range(len(servers)):
server = servers[i]
# https://github.com/ai-dynamo/dynamo/blob/main/docs/kv_cache_routing.md#kv-cache-routing-and-load-balancing
matches.append(
await
self._server_state[server].matched_tokens(block_hashes))
score = matches[-1] / padded_tokens - workloads[
i] / self._max_batch_size
scores.append(score)
max_score = max(scores)
tied = [i for i, s in enumerate(scores) if s == max_score]
winner = tied[self._rr_counter % len(tied)]
self._rr_counter += 1
server = servers[winner]
async with self._lock:
await self._register_request(server, request)
return server, {
Expand Down
Loading