Skip to content

Conversation

@cxhello
Copy link
Contributor

@cxhello cxhello commented Sep 22, 2025

What is the purpose of the change

Fixes #252

Brief changelog

fix(grpc): improve _server_request_watcher with error handling and auto-reconnect

  • wrap bi_stream_send loop with exception handling to prevent watcher exit
  • add exponential backoff (1s → 2s → ... → 60s) for reconnect attempts
  • implement _reconnect() to close old channel, create new channel, run server check, and resend ConnectionSetupRequest
  • ensure watcher recovers from socket closed / UNAVAILABLE errors instead of exiting permanently

This prevents ephemeral instances from being removed when the gRPC stream is interrupted in Kubernetes / LB environments.

@lingma-agents
Copy link

lingma-agents bot commented Sep 22, 2025

gRPC客户端增强:自动重连与异常处理

变更概述
  • 问题修复
    • 修复了 _server_request_watcher 在流中断时无法自动恢复的问题
    • 添加了对 gRPC 流异常的全面捕获和处理机制
    • 实现了指数退避重连策略(1s → 2s → ... → 60s)
  • 新功能
    • 新增 _server_request_watcher_new 方法,提供更健壮的流监听能力
    • 引入 _reconnect 方法实现完整的重连逻辑:关闭旧连接、创建新通道、重新发送连接设置请求
    • 增加了对连接断开后的自动恢复机制,防止临时实例被误删
  • 重构
    • 重构了 gRPC 连接管理逻辑,将重连功能模块化
    • 改进了错误日志记录,提供更清晰的调试信息
变更文件
文件路径 变更说明
v2/​nacos/​transport/​grpc_​client.​py 新增了 gRPC 流的自动重连和异常处理机制,包括指数退避重连、连接重建和请求重发功能。

💡 小贴士

与 lingma-agents 交流的方式

📜 直接回复评论
直接回复本条评论,lingma-agents 将自动处理您的请求。例如:

  • 在当前代码中添加详细的注释说明。

  • 请详细介绍一下你说的 LRU 改造方案,并使用伪代码加以说明。

📜 在代码行处标记
在文件的特定位置创建评论并 @lingma-agents。例如:

  • @lingma-agents 分析这个方法的性能瓶颈并提供优化建议。

  • @lingma-agents 对这个方法生成优化代码。

📜 在讨论中提问
在任何讨论中 @lingma-agents 来获取帮助。例如:

  • @lingma-agents 请总结上述讨论并提出解决方案。

  • @lingma-agents 请根据讨论内容生成优化代码。

Copy link

@lingma-agents lingma-agents bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔎 代码评审报告

🎯 评审意见概览
严重度 数量 说明
🔴 Blocker 0 阻断性问题,需立即修复。例如:系统崩溃、关键功能不可用或严重安全漏洞。
🟠 Critical 0 严重问题,高优先级修复。例如:核心功能异常或性能瓶颈影响用户体验。
🟡 Major 1 主要问题,建议修复。例如:非核心功能缺陷或代码维护性较差。
🟢 Minor 1 次要问题,酬情优化。例如:代码格式不规范或注释缺失。

总计: 2 个问题

📋 评审意见详情
💡 代码实现建议
以下是文件级别的代码建议,聚焦于代码的可读性、可维护性和潜在问题。
🐍 v2/nacos/transport/grpc_client.py (2 💬)
🚀 架构设计建议
以下是对代码架构和设计的综合分析,聚焦于跨文件交互、系统一致性和潜在优化空间。
🔍1. 架构一致性问题:新增的 _server_request_watcher_new 方法与原有 _server_request_watcher 方法功能重复

PR 中引入了新的方法 _server_request_watcher_new 来替代原有的 _server_request_watcher 方法,但并未移除旧方法。这导致代码库中存在两个功能相似的方法,违反了单一职责原则和架构一致性。建议明确废弃旧方法或统一使用新方法以保持架构清晰。

📌 关键代码

async def _server_request_watcher_new(self, grpc_conn: GrpcConnection): ...

⚠️ 潜在风险

维护成本增加、潜在的行为不一致、未来开发人员可能误用旧方法

🔍2. 可维护性问题:错误处理逻辑未充分解耦

在 _server_request_watcher_new 和 _reconnect 方法中,错误处理逻辑较为集中且嵌套较深,影响代码可读性和可维护性。建议将部分逻辑抽象为独立函数,如重试延迟计算、连接状态检查等,提升模块化程度。

📌 关键代码

except grpc.aio.AioRpcError as e:
    self.logger.warning(...)
    delay = next(backoff)
    ...

⚠️ 潜在风险

代码难以理解和修改、单元测试覆盖难度大

🔍3. 技术债务:自动重连机制缺乏超时控制

当前的自动重连机制没有设置最大重试次数或总超时时间,可能导致在网络持续异常情况下无限重试,消耗系统资源。建议引入最大重试次数或总体超时限制,防止资源耗尽。

📌 关键代码

backoff = (min(60, 2 ** i) for i in itertools.count(0))  # 1,2,4,8,...,60s

⚠️ 潜在风险

系统资源被无限制占用、服务响应延迟

🔍4. 测试策略不足:缺少对新引入的 _server_request_watcher_new 方法的全面测试

虽然添加了新的重连和异常处理逻辑,但从 PR 内容看,没有提供相应的单元测试或集成测试来验证这些关键功能的正确性。建议补充针对该方法的测试用例,包括正常流程、异常路径及重连场景。

📌 关键代码

async def _server_request_watcher_new(self, grpc_conn: GrpcConnection): ...

⚠️ 潜在风险

潜在的运行时错误未被发现、上线后出现不可预期行为

🔍5. 性能影响:频繁的日志记录可能影响性能

在 _server_request_watcher_new 方法中,每次接收到请求或发生异常时都会进行日志记录操作,尤其在高并发场景下可能会造成性能瓶颈。建议评估日志级别和频率,必要时采用异步日志或批量处理方式优化。

📌 关键代码

self.logger.error(f"[{grpc_conn.connection_id}] handle server request error: {e}")

⚠️ 潜在风险

高并发下性能下降、I/O 阻塞

🔍6. 依赖管理问题:引入 itertools 但未说明其用途合理性

PR 引入了 itertools 模块用于生成退避序列,但在实际项目中是否已有类似工具类或标准库实现未作说明。建议确认是否有必要引入新依赖,并评估是否有更合适的替代方案。

📌 关键代码

import itertools

⚠️ 潜在风险

增加不必要的外部依赖、降低代码可移植性

审查详情
📒 文件清单 (1 个文件)
📝 变更: 1 个文件

📝 变更文件:

  • v2/nacos/transport/grpc_client.py

💡 小贴士

与 lingma-agents 交流的方式

📜 直接回复评论
直接回复本条评论,lingma-agents 将自动处理您的请求。例如:

  • 在当前代码中添加详细的注释说明。

  • 请详细介绍一下你说的 LRU 改造方案,并使用伪代码加以说明。

📜 在代码行处标记
在文件的特定位置创建评论并 @lingma-agents。例如:

  • @lingma-agents 分析这个方法的性能瓶颈并提供优化建议。

  • @lingma-agents 对这个方法生成优化代码。

📜 在讨论中提问
在任何讨论中 @lingma-agents 来获取帮助。例如:

  • @lingma-agents 请总结上述讨论并提出解决方案。

  • @lingma-agents 请根据讨论内容生成优化代码。

await asyncio.sleep(delay)
try:
grpc_conn = await self._reconnect(grpc_conn)
backoff = (min(60, 2 ** i) for i in itertools.count(0)) # 重置 backoff
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在 gRPC 流异常时应重置退避延迟序列,避免长时间延迟重连。

🟡 Major | 🧹 Code Smells

📋 问题详情

当前代码在重连成功后重置了退避延迟序列,但如果 _reconnect 抛出异常,退避序列不会重置,可能导致后续重连尝试间隔过长,影响服务恢复速度。应在每次进入重连逻辑时都重置退避序列,确保异常恢复的及时性。

💡 解决方案

建议将退避序列的重置操作提前到进入 _reconnect 调用前,确保即使重连失败也能正确重置退避序列。

-                delay = next(backoff)
-                self.logger.info(f"will reconnect after {delay}s ...")
-                await asyncio.sleep(delay)
-                try:
-                    grpc_conn = await self._reconnect(grpc_conn)
-                    backoff = (min(60, 2 ** i) for i in itertools.count(0))  # 重置 backoff
-                    self.logger.info(f"reconnected. new connection_id={grpc_conn.get_connection_id()}")
+                delay = next(backoff)
+                self.logger.info(f"will reconnect after {delay}s ...")
+                await asyncio.sleep(delay)
+                backoff = (min(60, 2 ** i) for i in itertools.count(0))  # 重置 backoff
+                try:
+                    grpc_conn = await self._reconnect(grpc_conn)
+                    self.logger.info(f"reconnected. new connection_id={grpc_conn.get_connection_id()}")

您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)

有用意见👍无用意见👎错误意见❌

Comment on lines +195 to +200
try:
await self._shunt_down_channel(old_conn.channel)
except Exception:
pass

server_info = old_conn.server_info
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应在重连过程中关闭旧连接前记录连接 ID,防止连接对象失效后无法获取 ID。

🟢 Minor | 🧹 Code Smells

📋 问题详情

_reconnect 方法中,关闭旧连接的代码在获取 server_info 之前,如果 old_conn 在关闭后失效,后续获取 server_info 可能失败或获取到错误信息。应在关闭前记录必要的连接信息,确保后续流程的稳定性。

💡 解决方案

建议在关闭旧连接前先记录 server_infoconnection_id,防止连接对象失效后无法获取相关信息。

-        try:
-            await self._shunt_down_channel(old_conn.channel)
-        except Exception:
-            pass
-
-        server_info = old_conn.server_info
+        server_info = old_conn.server_info
+        try:
+            await self._shunt_down_channel(old_conn.channel)
+        except Exception:
+            pass

您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)

有用意见👍无用意见👎错误意见❌

@Sunrisea
Copy link
Contributor

这边我注意到 reconnect 之后会创建一个全新的grpc_connection ,但是上层的 RpcClient中的current_connection没有做替换。另外 RpcClient中也有一个 reconnect 方法提供重连机制,两个是否会产生冲突?有做过测试吗

@cxhello
Copy link
Contributor Author

cxhello commented Oct 29, 2025

@Sunrisea 目前我改动的这个版本已经在我们测试&生产环境跑了一个多月了,暂时还没发现异常。重新创建的连接是在自动重连,改动之前的逻辑好像是如果断开连接了,就不会自动重连了。

@jay376
Copy link

jay376 commented Dec 25, 2025

@Sunrisea 同样碰到这个问题了,能合么

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

UNKNOWN:Error received from peer {grpc_message:"Socket closed", grpc_status:14}

3 participants