Skip to content

Commit f1cce89

Browse files
authored
fix(grpc): handle bi-stream disconnection with existing reconnect mechanism (#306)
When the gRPC bi-directional stream breaks due to AioRpcError or other exceptions, the _server_request_watcher now catches the exception, marks the client as UNHEALTHY, and triggers switch_server_async to reuse the existing reconnect mechanism in RpcClient. This ensures current_connection is properly updated and all connection listeners are notified. Closes #252 Made-with: Cursor
1 parent 998975e commit f1cce89

1 file changed

Lines changed: 19 additions & 11 deletions

File tree

v2/nacos/transport/grpc_client.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from v2.nacos.transport.model.server_info import ServerInfo
2121
from v2.nacos.transport.nacos_server_connector import NacosServerConnector
2222
from v2.nacos.transport.rec_ability_context import RecAbilityContext
23-
from v2.nacos.transport.rpc_client import RpcClient, ConnectionType
23+
from v2.nacos.transport.rpc_client import RpcClient, RpcClientStatus, ConnectionType
2424
from v2.nacos.transport.server_request_handler import SetupAckRequestHandler
2525

2626

@@ -175,16 +175,24 @@ async def _handle_server_request(self, request: Request, grpc_connection: GrpcCo
175175
f"{grpc_connection.get_connection_id()} failed to send response:{response.get_response_type()}, ackId:{request.requestId},error:{str(e)}")
176176

177177
async def _server_request_watcher(self, grpc_conn: GrpcConnection):
178-
async for payload in grpc_conn.bi_stream_send():
179-
try:
180-
self.logger.info("receive stream server request, connection_id:%s, original info: %s"
181-
% (grpc_conn.get_connection_id(), str(payload)))
182-
request = GrpcUtils.parse(payload)
183-
if request:
184-
await self._handle_server_request(request, grpc_conn)
185-
186-
except Exception as e:
187-
self.logger.error(f"[{grpc_conn.connection_id}] handle server request occur exception: {e}")
178+
try:
179+
async for payload in grpc_conn.bi_stream_send():
180+
try:
181+
self.logger.info("receive stream server request, connection_id:%s, original info: %s"
182+
% (grpc_conn.get_connection_id(), str(payload)))
183+
request = GrpcUtils.parse(payload)
184+
if request:
185+
await self._handle_server_request(request, grpc_conn)
186+
187+
except Exception as e:
188+
self.logger.error(f"[{grpc_conn.connection_id}] handle server request occur exception: {e}")
189+
except Exception as e:
190+
self.logger.warning(f"[{grpc_conn.connection_id}] bi stream broken: {e}")
191+
if not self.is_shutdown():
192+
async with self.lock:
193+
if self.is_running():
194+
self.rpc_client_status = RpcClientStatus.UNHEALTHY
195+
await self.switch_server_async(None, False)
188196

189197
@staticmethod
190198
async def _shunt_down_channel(channel):

0 commit comments

Comments
 (0)