-
Notifications
You must be signed in to change notification settings - Fork 743
[not merge] xpu glm test #7748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
[not merge] xpu glm test #7748
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -831,58 +831,71 @@ def request_match_blocks(self, task: Request, block_size, *args): | |
|
|
||
| if self.kvcache_storage_backend and no_match_token_num >= block_size and not envs.FD_AS_ONLY_FLUSH: | ||
| if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num, try_free_gpu_blocks=False): | ||
| raise Exception( | ||
| "request_match_blocks: Not enough GPU memory to allocate cache for matched Storage Cache" | ||
| logger.warning( | ||
| "request_match_blocks: skip storage cache prefetch because GPU blocks are insufficient, " | ||
| f"req_id {req_id}, need {no_match_block_num}, free {len(self.gpu_free_block_list)}" | ||
| ) | ||
|
|
||
| logger.debug( | ||
| f"request_match_blocks: req_id {req_id}, allocate {no_match_block_num} block to receive storage cache" | ||
| ) | ||
| gpu_recv_storage_block_ids = self.allocate_gpu_blocks(no_match_block_num) | ||
|
|
||
| prefix_block_key = [] if match_block_node.hash_value is None else [match_block_node.hash_value] | ||
| cur_token_idx = match_token_num | ||
| no_match_block_keys = [] | ||
| mm_idx = 0 | ||
| while cur_token_idx <= input_token_num - block_size: | ||
| cur_block_token_ids = input_token_ids[cur_token_idx : cur_token_idx + block_size] | ||
| # Get extra hash keys for multimodal content (images, videos, etc.) | ||
| mm_idx, extra_keys = self.get_block_hash_extra_keys( | ||
| request=task, | ||
| start_idx=cur_token_idx, | ||
| end_idx=cur_token_idx + block_size, | ||
| mm_idx=mm_idx, | ||
| else: | ||
| logger.debug( | ||
| f"request_match_blocks: req_id {req_id}, allocate {no_match_block_num} block to receive storage cache" | ||
| ) | ||
| prefix_block_key.extend(extra_keys) | ||
| cur_block_key = get_hash_str(cur_block_token_ids, prefix_block_key) | ||
| no_match_block_keys.append(cur_block_key) | ||
| cur_token_idx += block_size | ||
| prefix_block_key = [cur_block_key] | ||
| gpu_recv_storage_block_ids = self.allocate_gpu_blocks(no_match_block_num) | ||
|
|
||
| prefix_block_key = [] if match_block_node.hash_value is None else [match_block_node.hash_value] | ||
| cur_token_idx = match_token_num | ||
| no_match_block_keys = [] | ||
| mm_idx = 0 | ||
| while cur_token_idx <= input_token_num - block_size: | ||
| cur_block_token_ids = input_token_ids[cur_token_idx : cur_token_idx + block_size] | ||
| # Get extra hash keys for multimodal content (images, videos, etc.) | ||
| mm_idx, extra_keys = self.get_block_hash_extra_keys( | ||
| request=task, | ||
| start_idx=cur_token_idx, | ||
| end_idx=cur_token_idx + block_size, | ||
| mm_idx=mm_idx, | ||
| ) | ||
| prefix_block_key.extend(extra_keys) | ||
| cur_block_key = get_hash_str(cur_block_token_ids, prefix_block_key) | ||
| no_match_block_keys.append(cur_block_key) | ||
| cur_token_idx += block_size | ||
| prefix_block_key = [cur_block_key] | ||
|
|
||
| logger.info( | ||
| f"start prefetch cache from storage, req_id: {req_id}, block num: {len(no_match_block_keys)}" | ||
| ) | ||
| start_time = time.time() | ||
| read_storage_task = ReadStorageTask( | ||
| task_id=req_id, | ||
| keys=no_match_block_keys, | ||
| token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, | ||
| gpu_block_ids=gpu_recv_storage_block_ids, | ||
| start_read_block_idx=match_token_num // block_size, | ||
| ) | ||
| logger.debug(f"issue read storage task: {read_storage_task}") | ||
| storage_matched_block_ids = self.issue_prefetch_storage_task(read_storage_task) | ||
| storage_matched_block_num = len(storage_matched_block_ids) | ||
| storage_match_token_num = storage_matched_block_num * block_size | ||
| cost_time = time.time() - start_time | ||
| metrics["storage_cache_prepare_time"] = cost_time | ||
| logger.info( | ||
| f"finish prefetch cache from storage, req_id: {req_id}, " | ||
| f"matched block num: {storage_matched_block_num}, cost_time:{cost_time:.6f}s" | ||
| ) | ||
| try: | ||
| logger.info( | ||
| f"start prefetch cache from storage, req_id: {req_id}, block num: {len(no_match_block_keys)}" | ||
| ) | ||
| start_time = time.time() | ||
| read_storage_task = ReadStorageTask( | ||
| task_id=req_id, | ||
| keys=no_match_block_keys, | ||
| token_ids=( | ||
| input_token_ids if self.kvcache_storage_backend == "attention_store" else None | ||
| ), | ||
| gpu_block_ids=gpu_recv_storage_block_ids, | ||
| start_read_block_idx=match_token_num // block_size, | ||
| ) | ||
| logger.debug(f"issue read storage task: {read_storage_task}") | ||
| storage_matched_block_ids = self.issue_prefetch_storage_task(read_storage_task) | ||
| storage_matched_block_num = len(storage_matched_block_ids) | ||
| storage_match_token_num = storage_matched_block_num * block_size | ||
| cost_time = time.time() - start_time | ||
| metrics["storage_cache_prepare_time"] = cost_time | ||
| logger.info( | ||
| f"finish prefetch cache from storage, req_id: {req_id}, " | ||
| f"matched block num: {storage_matched_block_num}, cost_time:{cost_time:.6f}s" | ||
| ) | ||
|
|
||
| match_storage_block_ids = gpu_recv_storage_block_ids[:storage_matched_block_num] | ||
| self.recycle_gpu_blocks(gpu_recv_storage_block_ids[storage_matched_block_num:]) | ||
| match_storage_block_ids = gpu_recv_storage_block_ids[:storage_matched_block_num] | ||
| self.recycle_gpu_blocks(gpu_recv_storage_block_ids[storage_matched_block_num:]) | ||
| except Exception as e: | ||
| logger.warning( | ||
| "request_match_blocks: storage cache prefetch failed, fallback to cache miss, " | ||
| f"req_id {req_id}, error: {type(e)} {e}" | ||
| ) | ||
| self.recycle_gpu_blocks(gpu_recv_storage_block_ids, req_id) | ||
| gpu_recv_storage_block_ids = [] | ||
| storage_match_token_num = 0 | ||
| match_storage_block_ids = [] | ||
|
|
||
| # 4. update metrics | ||
| match_token_num = gpu_match_token_num + cpu_match_token_num + storage_match_token_num | ||
|
|
@@ -1127,10 +1140,7 @@ def write_cache_to_storage(self, request: Request): | |
| if isinstance(token_ids, np.ndarray): | ||
| token_ids = token_ids.tolist() | ||
|
|
||
| if self.config.cache_config.enable_output_caching: | ||
| input_token_ids = token_ids + request.output_token_ids | ||
| else: | ||
| input_token_ids = token_ids | ||
| input_token_ids = token_ids + request.output_token_ids | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Bug 移除 当 后续一行已增加截断 |
||
|
|
||
| req_id = request.request_id | ||
| keys = [] | ||
|
|
@@ -1144,6 +1154,7 @@ def write_cache_to_storage(self, request: Request): | |
|
|
||
| trace_print(LoggingEventName.WRITE_CACHE_TO_STORAGE_START, request.request_id, getattr(request, "user", "")) | ||
| gpu_block_ids = request.block_tables[: len(keys)] | ||
| input_token_ids = input_token_ids[: len(keys) * self.config.cache_config.block_size] | ||
| logger.info(f"start write cache back to storage, req_id: {req_id}, block num: {len(keys)}") | ||
| write_storage_task = WriteStorageTask( | ||
| task_id=req_id, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,7 +77,10 @@ def padding_sampling_params(top_p, top_k, infer_seed, seq_lens_this_time, seq_le | |
| top_k_padding = paddle.repeat_interleave(top_k[:real_bsz], repeats).unsqueeze(1) | ||
| topp_seed = paddle.repeat_interleave(infer_seed[:real_bsz], repeats).unsqueeze(1) | ||
|
|
||
| MAX_INFER_SEED = 9223372036854775806 | ||
| if current_platform.is_xpu(): | ||
| MAX_INFER_SEED = 2147483646 | ||
| else: | ||
| MAX_INFER_SEED = 9223372036854775806 | ||
|
|
||
| token_lens = paddle.where( | ||
| seq_lens_encoder[:real_bsz] == 0, | ||
|
|
@@ -97,7 +100,7 @@ def padding_sampling_params(top_p, top_k, infer_seed, seq_lens_this_time, seq_le | |
|
|
||
| offsets = paddle.where( | ||
| is_decoder, | ||
| local_pos * 4, | ||
| local_pos * 32, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Bug 同一函数中 如果 offsets = paddle.where(
is_decoder,
local_pos * 32 if current_platform.is_xpu() else local_pos * 4,
paddle.zeros_like(local_pos),
)如果该改动对所有平台都成立,请在 PR 描述中说明原因。 |
||
| paddle.zeros_like(local_pos), | ||
| ) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Bug
flush_token_index现在在数据写入之前执行,存在索引先于实际数据可用的时序窗口风险。原来此调用位于
write_back_storage_task的finally块中,保证在整个写入流程(无论成功与否)完成后才刷新索引。移到写入之前意味着:若后续sdk.write失败,storage 中索引已更新但数据未写入,导致其他请求命中索引却读不到有效 KV Cache。请确认此改动是否为 XPU 特定需求,若是,建议加上平台或后端判断:
或在 PR 中说明新时序的设计意图。