Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/cli/clients.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ metadata:
name: john
endpoint: grpc.jumpstarter.192.168.1.10.nip.io:8082
token: <<token>>
grpcConfig:
# please refer to the https://grpc.github.io/grpc/core/group__grpc__arg__keys.html documentation
grpc.keepalive_time_ms: 20000
tls:
ca: ''
insecure: False
Expand Down
3 changes: 3 additions & 0 deletions docs/source/introduction/exporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ metadata:
name: demo
endpoint: grpc.jumpstarter.example.com:443
token: xxxxx
grpcConfig:
# Please refer to the https://grpc.github.io/grpc/core/group__grpc__arg__keys.html documentation
grpc.keepalive_time_ms: 20000
export:
power:
type: jumpstarter_driver_yepkit.driver.Ykush
Expand Down
6 changes: 5 additions & 1 deletion packages/jumpstarter/jumpstarter/client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any

from anyio import create_task_group, fail_after, sleep
from anyio.from_thread import BlockingPortal
Expand Down Expand Up @@ -39,6 +40,7 @@ class Lease(AbstractContextManager, AbstractAsyncContextManager):
release: bool = True # release on contexts exit
controller: jumpstarter_pb2_grpc.ControllerServiceStub = field(init=False)
tls_config: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1)
grpc_options: dict[str, Any] = field(default_factory=dict)
Comment thread
mangelajo marked this conversation as resolved.

def __post_init__(self):
if hasattr(super(), "__post_init__"):
Expand Down Expand Up @@ -149,7 +151,9 @@ def __exit__(self, exc_type, exc_value, traceback):
async def handle_async(self, stream):
logger.debug("Connecting to Lease with name %s", self.name)
response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name))
async with connect_router_stream(response.router_endpoint, response.router_token, stream, self.tls_config):
async with connect_router_stream(
response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options
):
pass

@asynccontextmanager
Expand Down
25 changes: 17 additions & 8 deletions packages/jumpstarter/jumpstarter/common/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket
import ssl
from contextlib import contextmanager
from typing import Any, Sequence, Tuple
from urllib.parse import urlparse

import grpc
Expand Down Expand Up @@ -34,20 +35,28 @@ def ssl_channel_credentials(target: str, tls_config):
return grpc.ssl_channel_credentials()


def aio_secure_channel(target: str, credentials: grpc.ChannelCredentials):
def aio_secure_channel(target: str, credentials: grpc.ChannelCredentials, grpc_options: dict[str, Any] | None):
return grpc.aio.secure_channel(
target,
credentials,
options=(
("grpc.lb_policy_name", "round_robin"),
("grpc.keepalive_time_ms", 350000),
("grpc.keepalive_timeout_ms", 5000),
("grpc.http2.max_pings_without_data", 5),
("grpc.keepalive_permit_without_calls", 1),
),
options=_override_default_grpc_options(grpc_options),
)


def _override_default_grpc_options(grpc_options: dict[str, str | int] | None) -> Sequence[Tuple[str, Any]]:
defaults = (
("grpc.lb_policy_name", "round_robin"),
# we keep a low keepalive time to avoid idle timeouts on cloud load balancers
("grpc.keepalive_time_ms", 20000),
("grpc.keepalive_timeout_ms", 5000),
("grpc.http2.max_pings_without_data", 5),
("grpc.keepalive_permit_without_calls", 1),
)
options = dict(defaults)
options.update(grpc_options or {})
return tuple(options.items())


def configure_grpc_env():
# disable informative logs by default, i.e.:
# WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
Expand Down
4 changes: 2 additions & 2 deletions packages/jumpstarter/jumpstarter/common/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class StreamRequestMetadata(BaseModel):


@asynccontextmanager
async def connect_router_stream(endpoint, token, stream, tls_config):
async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options):
credentials = grpc.composite_channel_credentials(
ssl_channel_credentials(endpoint, tls_config),
grpc.access_token_call_credentials(token),
)

async with aio_secure_channel(endpoint, credentials) as channel:
async with aio_secure_channel(endpoint, credentials, grpc_options) as channel:
router = router_pb2_grpc.RouterServiceStub(channel)
context = router.Stream(metadata=())
async with RouterStream(context=context) as s:
Expand Down
5 changes: 4 additions & 1 deletion packages/jumpstarter/jumpstarter/config/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ClientConfigV1Alpha1(BaseModel):
endpoint: str
tls: TLSConfigV1Alpha1 = Field(default_factory=TLSConfigV1Alpha1)
token: str
grpcOptions: dict[str, str | int] | None = Field(default_factory=dict)

drivers: ClientConfigV1Alpha1Drivers

Expand All @@ -57,7 +58,7 @@ async def channel(self):
call_credentials("Client", self.metadata, self.token),
)

return aio_secure_channel(self.endpoint, credentials)
return aio_secure_channel(self.endpoint, credentials, self.grpcOptions)

@contextmanager
def lease(self, metadata_filter: MetadataFilter, lease_name: str | None = None):
Expand Down Expand Up @@ -122,6 +123,7 @@ async def request_lease_async(
allow=self.drivers.allow,
unsafe=self.drivers.unsafe,
tls_config=self.tls,
grpc_options=self.grpcOptions,
)
with translate_grpc_exceptions():
return await lease.request_async()
Expand Down Expand Up @@ -161,6 +163,7 @@ async def lease_async(
unsafe=self.drivers.unsafe,
release=release_lease,
tls_config=self.tls,
grpc_options=self.grpcOptions,
) as lease:
yield lease

Expand Down
3 changes: 3 additions & 0 deletions packages/jumpstarter/jumpstarter/config/client_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def test_client_config_save(monkeypatch: pytest.MonkeyPatch):
ca: ''
insecure: false
token: dGhpc2lzYXRva2VuLTEyMzQxMjM0MTIzNEyMzQtc2Rxd3Jxd2VycXdlcnF3ZXJxd2VyLTEyMzQxMjM0MTIz
grpcOptions: {}
drivers:
allow:
- jumpstarter.drivers.*
Expand Down Expand Up @@ -241,6 +242,7 @@ def test_client_config_save_explicit_path():
ca: ''
insecure: false
token: dGhpc2lzYXRva2VuLTEyMzQxMjM0MTIzNEyMzQtc2Rxd3Jxd2VycXdlcnF3ZXJxd2VyLTEyMzQxMjM0MTIz
grpcOptions: {}
drivers:
allow:
- jumpstarter.drivers.*
Expand Down Expand Up @@ -274,6 +276,7 @@ def test_client_config_save_unsafe_drivers():
ca: ''
insecure: false
token: dGhpc2lzYXRva2VuLTEyMzQxMjM0MTIzNEyMzQtc2Rxd3Jxd2VycXdlcnF3ZXJxd2VyLTEyMzQxMjM0MTIz
grpcOptions: {}
drivers:
allow: []
unsafe: true
Expand Down
4 changes: 3 additions & 1 deletion packages/jumpstarter/jumpstarter/config/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class ExporterConfigV1Alpha1(BaseModel):
endpoint: str
tls: TLSConfigV1Alpha1 = Field(default_factory=TLSConfigV1Alpha1)
token: str
grpcOptions: dict[str, str | int] | None = Field(default_factory=dict)

export: dict[str, ExporterConfigV1Alpha1DriverInstance] = Field(default_factory=dict)

Expand Down Expand Up @@ -163,12 +164,13 @@ def channel_factory():
ssl_channel_credentials(self.endpoint, self.tls),
call_credentials("Exporter", self.metadata, self.token),
)
return aio_secure_channel(self.endpoint, credentials)
return aio_secure_channel(self.endpoint, credentials, self.grpcOptions)

async with Exporter(
channel_factory=channel_factory,
device_factory=ExporterConfigV1Alpha1DriverInstance(children=self.export).instantiate,
tls=self.tls,
grpc_options=self.grpcOptions,
) as exporter:
await exporter.serve()

Expand Down
9 changes: 6 additions & 3 deletions packages/jumpstarter/jumpstarter/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Exporter(AbstractAsyncContextManager, Metadata):
device_factory: Callable[[], Driver]
lease_name: str = field(init=False, default="")
tls: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1)
grpc_options: dict[str, str] = field(default_factory=dict)

async def __aexit__(self, exc_type, exc_value, traceback):
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
Expand All @@ -36,9 +37,9 @@ async def __aexit__(self, exc_type, exc_value, traceback):
)
)

async def __handle(self, path, endpoint, token, tls_config):
async def __handle(self, path, endpoint, token, tls_config, grpc_options):
async with await connect_unix(path) as stream:
async with connect_router_stream(endpoint, token, stream, tls_config):
async with connect_router_stream(endpoint, token, stream, tls_config, grpc_options):
pass

@asynccontextmanager
Expand Down Expand Up @@ -69,7 +70,9 @@ async def handle(self, lease_name, tg):
async with self.session() as path:
async for request in controller.Listen(jumpstarter_pb2.ListenRequest(lease_name=lease_name)):
logger.info("Handling new connection request on lease %s", lease_name)
tg.start_soon(self.__handle, path, request.router_endpoint, request.router_token, self.tls)
tg.start_soon(
self.__handle, path, request.router_endpoint, request.router_token, self.tls, self.grpc_options
)

async def serve(self):
controller = jumpstarter_pb2_grpc.ControllerServiceStub(self.channel_factory())
Expand Down