-
Notifications
You must be signed in to change notification settings - Fork 153
fix(grpc): add error handling and auto-reconnect in _server_request_watcher #253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
gRPC客户端增强:自动重连与异常处理变更概述
变更文件
💡 小贴士与 lingma-agents 交流的方式📜 直接回复评论
📜 在代码行处标记
📜 在讨论中提问
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔎 代码评审报告
🎯 评审意见概览
| 严重度 | 数量 | 说明 |
|---|---|---|
| 🔴 Blocker | 0 | 阻断性问题,需立即修复。例如:系统崩溃、关键功能不可用或严重安全漏洞。 |
| 🟠 Critical | 0 | 严重问题,高优先级修复。例如:核心功能异常或性能瓶颈影响用户体验。 |
| 🟡 Major | 1 | 主要问题,建议修复。例如:非核心功能缺陷或代码维护性较差。 |
| 🟢 Minor | 1 | 次要问题,酬情优化。例如:代码格式不规范或注释缺失。 |
总计: 2 个问题
📋 评审意见详情
💡 代码实现建议
以下是文件级别的代码建议,聚焦于代码的可读性、可维护性和潜在问题。
🐍 v2/nacos/transport/grpc_client.py (2 💬)
- 在 gRPC 流异常时应重置退避延迟序列,避免长时间延迟重连。 (L183)
- 应在重连过程中关闭旧连接前记录连接 ID,防止连接对象失效后无法获取 ID。 (L195-L200)
🚀 架构设计建议
以下是对代码架构和设计的综合分析,聚焦于跨文件交互、系统一致性和潜在优化空间。
🔍1. 架构一致性问题:新增的 _server_request_watcher_new 方法与原有 _server_request_watcher 方法功能重复
PR 中引入了新的方法 _server_request_watcher_new 来替代原有的 _server_request_watcher 方法,但并未移除旧方法。这导致代码库中存在两个功能相似的方法,违反了单一职责原则和架构一致性。建议明确废弃旧方法或统一使用新方法以保持架构清晰。
📌 关键代码
async def _server_request_watcher_new(self, grpc_conn: GrpcConnection): ...维护成本增加、潜在的行为不一致、未来开发人员可能误用旧方法
🔍2. 可维护性问题:错误处理逻辑未充分解耦
在 _server_request_watcher_new 和 _reconnect 方法中,错误处理逻辑较为集中且嵌套较深,影响代码可读性和可维护性。建议将部分逻辑抽象为独立函数,如重试延迟计算、连接状态检查等,提升模块化程度。
📌 关键代码
except grpc.aio.AioRpcError as e:
self.logger.warning(...)
delay = next(backoff)
...代码难以理解和修改、单元测试覆盖难度大
🔍3. 技术债务:自动重连机制缺乏超时控制
当前的自动重连机制没有设置最大重试次数或总超时时间,可能导致在网络持续异常情况下无限重试,消耗系统资源。建议引入最大重试次数或总体超时限制,防止资源耗尽。
📌 关键代码
backoff = (min(60, 2 ** i) for i in itertools.count(0)) # 1,2,4,8,...,60s系统资源被无限制占用、服务响应延迟
🔍4. 测试策略不足:缺少对新引入的 _server_request_watcher_new 方法的全面测试
虽然添加了新的重连和异常处理逻辑,但从 PR 内容看,没有提供相应的单元测试或集成测试来验证这些关键功能的正确性。建议补充针对该方法的测试用例,包括正常流程、异常路径及重连场景。
📌 关键代码
async def _server_request_watcher_new(self, grpc_conn: GrpcConnection): ...潜在的运行时错误未被发现、上线后出现不可预期行为
🔍5. 性能影响:频繁的日志记录可能影响性能
在 _server_request_watcher_new 方法中,每次接收到请求或发生异常时都会进行日志记录操作,尤其在高并发场景下可能会造成性能瓶颈。建议评估日志级别和频率,必要时采用异步日志或批量处理方式优化。
📌 关键代码
self.logger.error(f"[{grpc_conn.connection_id}] handle server request error: {e}")高并发下性能下降、I/O 阻塞
🔍6. 依赖管理问题:引入 itertools 但未说明其用途合理性
PR 引入了 itertools 模块用于生成退避序列,但在实际项目中是否已有类似工具类或标准库实现未作说明。建议确认是否有必要引入新依赖,并评估是否有更合适的替代方案。
📌 关键代码
import itertools增加不必要的外部依赖、降低代码可移植性
审查详情
📒 文件清单 (1 个文件)
📝 变更: 1 个文件
📝 变更文件:
v2/nacos/transport/grpc_client.py
💡 小贴士
与 lingma-agents 交流的方式
📜 直接回复评论
直接回复本条评论,lingma-agents 将自动处理您的请求。例如:
-
在当前代码中添加详细的注释说明。
-
请详细介绍一下你说的 LRU 改造方案,并使用伪代码加以说明。
📜 在代码行处标记
在文件的特定位置创建评论并 @lingma-agents。例如:
-
@lingma-agents 分析这个方法的性能瓶颈并提供优化建议。
-
@lingma-agents 对这个方法生成优化代码。
📜 在讨论中提问
在任何讨论中 @lingma-agents 来获取帮助。例如:
-
@lingma-agents 请总结上述讨论并提出解决方案。
-
@lingma-agents 请根据讨论内容生成优化代码。
| await asyncio.sleep(delay) | ||
| try: | ||
| grpc_conn = await self._reconnect(grpc_conn) | ||
| backoff = (min(60, 2 ** i) for i in itertools.count(0)) # 重置 backoff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在 gRPC 流异常时应重置退避延迟序列,避免长时间延迟重连。
🟡 Major | 🧹 Code Smells
📋 问题详情
当前代码在重连成功后重置了退避延迟序列,但如果 _reconnect 抛出异常,退避序列不会重置,可能导致后续重连尝试间隔过长,影响服务恢复速度。应在每次进入重连逻辑时都重置退避序列,确保异常恢复的及时性。
💡 解决方案
建议将退避序列的重置操作提前到进入 _reconnect 调用前,确保即使重连失败也能正确重置退避序列。
- delay = next(backoff)
- self.logger.info(f"will reconnect after {delay}s ...")
- await asyncio.sleep(delay)
- try:
- grpc_conn = await self._reconnect(grpc_conn)
- backoff = (min(60, 2 ** i) for i in itertools.count(0)) # 重置 backoff
- self.logger.info(f"reconnected. new connection_id={grpc_conn.get_connection_id()}")
+ delay = next(backoff)
+ self.logger.info(f"will reconnect after {delay}s ...")
+ await asyncio.sleep(delay)
+ backoff = (min(60, 2 ** i) for i in itertools.count(0)) # 重置 backoff
+ try:
+ grpc_conn = await self._reconnect(grpc_conn)
+ self.logger.info(f"reconnected. new connection_id={grpc_conn.get_connection_id()}")您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)
| try: | ||
| await self._shunt_down_channel(old_conn.channel) | ||
| except Exception: | ||
| pass | ||
|
|
||
| server_info = old_conn.server_info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应在重连过程中关闭旧连接前记录连接 ID,防止连接对象失效后无法获取 ID。
🟢 Minor | 🧹 Code Smells
📋 问题详情
在 _reconnect 方法中,关闭旧连接的代码在获取 server_info 之前,如果 old_conn 在关闭后失效,后续获取 server_info 可能失败或获取到错误信息。应在关闭前记录必要的连接信息,确保后续流程的稳定性。
💡 解决方案
建议在关闭旧连接前先记录 server_info 和 connection_id,防止连接对象失效后无法获取相关信息。
- try:
- await self._shunt_down_channel(old_conn.channel)
- except Exception:
- pass
-
- server_info = old_conn.server_info
+ server_info = old_conn.server_info
+ try:
+ await self._shunt_down_channel(old_conn.channel)
+ except Exception:
+ pass您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)
|
这边我注意到 reconnect 之后会创建一个全新的grpc_connection ,但是上层的 RpcClient中的current_connection没有做替换。另外 RpcClient中也有一个 reconnect 方法提供重连机制,两个是否会产生冲突?有做过测试吗 |
|
@Sunrisea 目前我改动的这个版本已经在我们测试&生产环境跑了一个多月了,暂时还没发现异常。重新创建的连接是在自动重连,改动之前的逻辑好像是如果断开连接了,就不会自动重连了。 |
|
@Sunrisea 同样碰到这个问题了,能合么 |
What is the purpose of the change
Fixes #252
Brief changelog
fix(grpc): improve _server_request_watcher with error handling and auto-reconnect
This prevents ephemeral instances from being removed when the gRPC stream is interrupted in Kubernetes / LB environments.