44from dataclasses import dataclass , field
55
66import grpc
7- from anyio import connect_unix , create_task_group
7+ from anyio import connect_unix , create_memory_object_stream , create_task_group , sleep
88from google .protobuf import empty_pb2
99from jumpstarter_protocol import (
1010 jumpstarter_pb2 ,
@@ -38,9 +38,12 @@ async def __aexit__(self, exc_type, exc_value, traceback):
3838 )
3939
4040 async def __handle (self , path , endpoint , token , tls_config , grpc_options ):
41- async with await connect_unix (path ) as stream :
42- async with connect_router_stream (endpoint , token , stream , tls_config , grpc_options ):
43- pass
41+ try :
42+ async with await connect_unix (path ) as stream :
43+ async with connect_router_stream (endpoint , token , stream , tls_config , grpc_options ):
44+ pass
45+ except Exception as e :
46+ logger .info ("failed to handle connection: {}" .format (e ))
4447
4548 @asynccontextmanager
4649 async def session (self ):
@@ -65,23 +68,49 @@ async def session(self):
6568 yield path
6669
6770 async def handle (self , lease_name , tg ):
68- controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
6971 logger .info ("Listening for incoming connection requests on lease %s" , lease_name )
72+
73+ listen_tx , listen_rx = create_memory_object_stream ()
74+
75+ async def listen ():
76+ while True :
77+ try :
78+ controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
79+ async for request in controller .Listen (jumpstarter_pb2 .ListenRequest (lease_name = lease_name )):
80+ await listen_tx .send (request )
81+ except Exception as e :
82+ logger .info ("Listen stream interrupted, restarting: {}" .format (e ))
83+ await sleep (3 )
84+
85+ tg .start_soon (listen )
86+
7087 async with self .session () as path :
71- async for request in controller . Listen ( jumpstarter_pb2 . ListenRequest ( lease_name = lease_name )) :
88+ async for request in listen_rx :
7289 logger .info ("Handling new connection request on lease %s" , lease_name )
7390 tg .start_soon (
7491 self .__handle , path , request .router_endpoint , request .router_token , self .tls , self .grpc_options
7592 )
7693
7794 async def serve (self ):
78- controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
7995 # initial registration
8096 async with self .session ():
8197 pass
8298 started = False
99+ status_tx , status_rx = create_memory_object_stream ()
100+
101+ async def status ():
102+ while True :
103+ try :
104+ controller = jumpstarter_pb2_grpc .ControllerServiceStub (self .channel_factory ())
105+ async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
106+ await status_tx .send (status )
107+ except Exception as e :
108+ logger .info ("Status stream interrupted, restarting: {}" .format (e ))
109+ await sleep (3 )
110+
83111 async with create_task_group () as tg :
84- async for status in controller .Status (jumpstarter_pb2 .StatusRequest ()):
112+ tg .start_soon (status )
113+ async for status in status_rx :
85114 if self .lease_name != "" and self .lease_name != status .lease_name :
86115 self .lease_name = status .lease_name
87116 logger .info ("Lease status changed, killing existing connections" )
0 commit comments