44from dataclasses import dataclass , field
55
66import grpc
7- from anyio import connect_unix , create_task_group
7+ from anyio import connect_unix , create_memory_object_stream , create_task_group , sleep
88from google .protobuf import empty_pb2
99from jumpstarter_protocol import (
1010 jumpstarter_pb2 ,
@@ -37,9 +37,12 @@ async def __aexit__(self, exc_type, exc_value, traceback):
3737 )
3838
3939 async def __handle (self , path , endpoint , token , tls_config ):
40- async with await connect_unix (path ) as stream :
41- async with connect_router_stream (endpoint , token , stream , tls_config ):
42- pass
40+ try :
41+ async with await connect_unix (path ) as stream :
42+ async with connect_router_stream (endpoint , token , stream , tls_config ):
43+ pass
44+ except Exception as e :
45+ logger .info ("failed to handle connection: {}" .format (e ))
4346
4447 @asynccontextmanager
4548 async def session (self ):
@@ -64,21 +67,47 @@ async def session(self):
6467 yield path
6568
6669 async def handle (self , lease_name , tg ):
67- controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
6870 logger .info ("Listening for incoming connection requests on lease %s" , lease_name )
71+
72+ listen_tx , listen_rx = create_memory_object_stream ()
73+
74+ async def listen ():
75+ while True :
76+ try :
77+ controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
78+ async for request in controller .Listen (jumpstarter_pb2 .ListenRequest (lease_name = lease_name )):
79+ await listen_tx .send (request )
80+ except Exception as e :
81+ logger .info ("Listen stream interrupted, restarting: {}" .format (e ))
82+ await sleep (3 )
83+
84+ tg .start_soon (listen )
85+
6986 async with self .session () as path :
70- async for request in controller . Listen ( jumpstarter_pb2 . ListenRequest ( lease_name = lease_name )) :
87+ async for request in listen_rx :
7188 logger .info ("Handling new connection request on lease %s" , lease_name )
7289 tg .start_soon (self .__handle , path , request .router_endpoint , request .router_token , self .tls )
7390
7491 async def serve (self ):
75- controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
7692 # initial registration
7793 async with self .session ():
7894 pass
7995 started = False
96+ status_tx , status_rx = create_memory_object_stream ()
97+
98+ async def status ():
99+ while True :
100+ try :
101+ controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
102+ async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
103+ await status_tx .send (status )
104+ except Exception as e :
105+ logger .info ("Status stream interrupted, restarting: {}" .format (e ))
106+ await sleep (3 )
107+
80108 async with create_task_group () as tg :
81- async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
109+ tg .start_soon (status )
110+ async for status in status_rx :
82111 if self .lease_name != "" and self .lease_name != status .lease_name :
83112 self .lease_name = status .lease_name
84113 logger .info ("Lease status changed, killing existing connections" )
0 commit comments