Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def launch_cache_manager(
# Start additional threads
if cache_config.kvcache_storage_backend or self.num_cpu_blocks > 0:
logger.info("Enable hierarchical cache.")
threading.Thread(target=self.recv_data_transfer_result).start()
threading.Thread(target=self.recv_data_transfer_result, daemon=True).start()
if cache_config.enable_prefix_caching:
threading.Thread(target=self.clear_prefix_cache, daemon=True).start()

Expand Down
14 changes: 11 additions & 3 deletions fastdeploy/engine/expert_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ def _exit_sub_services(self):
for p in self.cache_manager_processes:
self.llm_logger.info(f"Killing cache manager process {p.pid}")
try:
os.killpg(p.pid, signal.SIGTERM)
except:
pass
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
except Exception as e:
console_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只有 EP 场景会杀不干净吗

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只有 EP 场景会杀不干净吗

@liyonghua0910 目前看残留进程主要是cache_transfer_manager进程,其余场景没发现cache_transfer_manager进程残留


if hasattr(self, "zmq_server") and self.zmq_server is not None:
self.zmq_server.close()
Expand Down Expand Up @@ -218,3 +221,8 @@ def deamon_thread():
t_deamon.join()
except Exception as e:
llm_logger.exception(f"Expert service failed to start: {e}, {str(traceback.format_exc())}")
finally:
try:
expert_service._exit_sub_services()
except Exception:
pass
5 changes: 4 additions & 1 deletion tests/engine/test_expert_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def test_reset_kvcache_blocks(self, mock_time, mock_ipc_signal, mock_engine_serv
def test_exit_sub_services(self, mock_signal, mock_os, mock_engine_service):
"""测试退出子服务功能"""
local_data_parallel_id = 0
pgid = 5678

# 创建 ExpertService 实例
expert_service = ExpertService(self.mock_cfg, local_data_parallel_id)
Expand All @@ -164,6 +165,7 @@ def test_exit_sub_services(self, mock_signal, mock_os, mock_engine_service):
mock_process = Mock()
mock_process.pid = 1234
expert_service.cache_manager_processes = [mock_process]
mock_os.getpgid.return_value = pgid

# 设置模拟引擎资源管理器
expert_service.engine = Mock()
Expand All @@ -179,7 +181,8 @@ def test_exit_sub_services(self, mock_signal, mock_os, mock_engine_service):

# 验证缓存管理器清理
expert_service.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear.assert_called_once()
mock_os.killpg.assert_called_once_with(1234, mock_signal.SIGTERM)
mock_os.getpgid.assert_called_once_with(1234)
mock_os.killpg.assert_called_once_with(pgid, mock_signal.SIGTERM)

# 验证ZMQ服务器关闭
expert_service.zmq_server.close.assert_called_once()
Expand Down
Loading