@@ -72,15 +72,26 @@ async def handle(self, lease_name, tg):
7272
7373 listen_tx , listen_rx = create_memory_object_stream ()
7474
75- async def listen ():
75+ async def listen (retries = 5 , backoff = 3 ):
76+ retries_left = retries
7677 while True :
7778 try :
7879 controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
7980 async for request in controller .Listen (jumpstarter_pb2 .ListenRequest (lease_name = lease_name )):
8081 await listen_tx .send (request )
8182 except Exception as e :
82- logger .info ("Listen stream interrupted, restarting: {}" .format (e ))
83- await sleep (3 )
83+ if retries_left > 0 :
84+ retries_left -= 1
85+ logger .info (
86+ "Listen stream interrupted, restarting in {}s, {} retries left: {}" .format (
87+ backoff , retries_left , e
88+ )
89+ )
90+ await sleep (backoff )
91+ else :
92+ raise
93+ else :
94+ retries_left = retries
8495
8596 tg .start_soon (listen )
8697
@@ -91,22 +102,33 @@ async def listen():
91102 self .__handle , path , request .router_endpoint , request .router_token , self .tls , self .grpc_options
92103 )
93104
94- async def serve (self ):
105+ async def serve (self ): # noqa: C901
95106 # initial registration
96107 async with self .session ():
97108 pass
98109 started = False
99110 status_tx , status_rx = create_memory_object_stream ()
100111
101- async def status ():
112+ async def status (retries = 5 , backoff = 3 ):
113+ retries_left = retries
102114 while True :
103115 try :
104116 controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
105117 async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
106118 await status_tx .send (status )
107119 except Exception as e :
108- logger .info ("Status stream interrupted, restarting: {}" .format (e ))
109- await sleep (3 )
120+ if retries_left > 0 :
121+ retries_left -= 1
122+ logger .info (
123+ "Status stream interrupted, restarting in {}s, {} retries left: {}" .format (
124+ backoff , retries_left , e
125+ )
126+ )
127+ await sleep (backoff )
128+ else :
129+ raise
130+ else :
131+ retries_left = retries
110132
111133 async with create_task_group () as tg :
112134 tg .start_soon (status )
0 commit comments