Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions docs/source/api-reference/adapters/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@
Network adapters are for transforming network connections exposed by drivers

```{eval-rst}
.. autoclass:: jumpstarter_driver_network.adapters.TcpPortforwardAdapter
:members:
.. autofunction:: jumpstarter_driver_network.adapters.TcpPortforwardAdapter
```

```{eval-rst}
.. autoclass:: jumpstarter_driver_network.adapters.UnixPortforwardAdapter
:members:
.. autofunction:: jumpstarter_driver_network.adapters.UnixPortforwardAdapter
```

```{eval-rst}
.. autoclass:: jumpstarter_driver_network.adapters.NovncAdapter
:members:
.. autofunction:: jumpstarter_driver_network.adapters.NovncAdapter
```

```{eval-rst}
.. autoclass:: jumpstarter_driver_network.adapters.PexpectAdapter
:members:
.. autofunction:: jumpstarter_driver_network.adapters.PexpectAdapter
```

```{eval-rst}
.. autoclass:: jumpstarter_driver_network.adapters.FabricAdapter
:members:
.. autofunction:: jumpstarter_driver_network.adapters.FabricAdapter
```

## Examples
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
from dataclasses import dataclass
from contextlib import contextmanager
from os import environ, getenv

from .portforward import TcpPortforwardAdapter
from jumpstarter.client import DriverClient


@dataclass(kw_only=True)
class DbusAdapter(TcpPortforwardAdapter):
async def __aenter__(self):
addr = await super().__aenter__()
match self.client.kind:
case "system":
self.varname = "DBUS_SYSTEM_BUS_ADDRESS"
pass
case "session":
self.varname = "DBUS_SESSION_BUS_ADDRESS"
pass
case _:
raise ValueError(f"invalid bus type: {self.client.kind}")
self.oldenv = getenv(self.varname)
environ[self.varname] = f"tcp:host={addr[0]},port={addr[1]}"
@contextmanager
def DbusAdapter(*, client: DriverClient):
match client.kind:
case "system":
varname = "DBUS_SYSTEM_BUS_ADDRESS"
pass
case "session":
varname = "DBUS_SESSION_BUS_ADDRESS"
pass
case _:
raise ValueError(f"invalid bus type: {client.kind}")

async def __aexit__(self, exc_type, exc_value, traceback):
await super().__aexit__(exc_type, exc_value, traceback)
if self.oldenv is None:
del environ[self.varname]
else:
environ[self.varname] = self.oldenv
oldenv = getenv(varname)

with TcpPortforwardAdapter(client=client) as addr:
environ[varname] = f"tcp:host={addr[0]},port={addr[1]}"

try:
yield
finally:
if oldenv is None:
del environ[varname]
else:
environ[varname] = oldenv
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
from dataclasses import dataclass
from contextlib import asynccontextmanager
from functools import partial
from typing import Any

from fabric.config import Config
from fabric.connection import Connection

from .portforward import TcpPortforwardAdapter
from .portforward import handler
from jumpstarter.client import DriverClient
from jumpstarter.client.adapters import blocking
from jumpstarter.common import TemporaryTcpListener


@dataclass(kw_only=True)
class FabricAdapter(TcpPortforwardAdapter):
user: str | None = None
config: Config | None = None
forward_agent: bool | None = None
connect_timeout: int | None = None
connect_kwargs: dict[str, Any] | None = None
inline_ssh_env: bool | None = None

async def __aenter__(self):
addr = await super().__aenter__()
return Connection(
@blocking
@asynccontextmanager
async def FabricAdapter(
*,
Comment thread
NickCao marked this conversation as resolved.
client: DriverClient,
method: str = "connect",
user: str | None = None,
config: Config | None = None,
forward_agent: bool | None = None,
connect_timeout: int | None = None,
connect_kwargs: dict[str, Any] | None = None,
inline_ssh_env: bool | None = None,
):
async with TemporaryTcpListener(partial(handler, client, method)) as addr:
yield Connection(
addr[0],
user=self.user,
user=user,
port=addr[1],
config=self.config,
forward_agent=self.forward_agent,
connect_timeout=self.connect_timeout,
connect_kwargs=self.connect_kwargs,
inline_ssh_env=self.inline_ssh_env,
config=config,
forward_agent=forward_agent,
connect_timeout=connect_timeout,
connect_kwargs=connect_kwargs,
inline_ssh_env=inline_ssh_env,
)
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
from dataclasses import dataclass
from contextlib import asynccontextmanager
from urllib.parse import urlencode, urlunparse

from ..streams import WebsocketServerStream
from .portforward import TcpPortforwardAdapter
from jumpstarter.client import DriverClient
from jumpstarter.client.adapters import blocking
from jumpstarter.common import TemporaryTcpListener
from jumpstarter.streams import forward_stream


@dataclass(kw_only=True)
class NovncAdapter(TcpPortforwardAdapter):
async def __aenter__(self):
addr = await super().__aenter__()
return urlunparse(
@blocking
@asynccontextmanager
async def NovncAdapter(*, client: DriverClient, method: str = "connect"):
async def handler(conn):
async with conn:
async with client.stream_async(method) as stream:
async with WebsocketServerStream(stream=stream) as stream:
async with forward_stream(conn, stream):
pass

async with TemporaryTcpListener(handler) as addr:
yield urlunparse(
(
"https",
"novnc.com",
Expand All @@ -20,10 +29,3 @@ async def __aenter__(self):
"",
)
)

async def handler(self, conn):
async with conn:
async with self.client.stream_async(self.method) as stream:
async with WebsocketServerStream(stream=stream) as stream:
async with forward_stream(conn, stream):
pass
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import socket
from dataclasses import dataclass
from contextlib import contextmanager

from pexpect.fdpexpect import fdspawn

from .portforward import TcpPortforwardAdapter
from jumpstarter.client import DriverClient


@dataclass(kw_only=True)
class PexpectAdapter(TcpPortforwardAdapter):
async def __aenter__(self):
addr = await super().__aenter__()
@contextmanager
def PexpectAdapter(*, client: DriverClient, method: str = "connect"):
with TcpPortforwardAdapter(client=client, method=method) as addr:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(addr)

self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(addr)

return fdspawn(self.socket)

async def __aexit__(self, exc_type, exc_value, traceback):
self.socket.close()

await super().__aexit__(exc_type, exc_value, traceback)
try:
yield fdspawn(sock)
finally:
sock.close()
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
from dataclasses import dataclass
from contextlib import asynccontextmanager
from functools import partial

from jumpstarter.client.adapters import ClientAdapter
from jumpstarter.client import DriverClient
from jumpstarter.client.adapters import blocking
from jumpstarter.common import TemporaryTcpListener, TemporaryUnixListener
from jumpstarter.streams import forward_stream


@dataclass(kw_only=True)
class PortforwardAdapter(ClientAdapter):
method: str = "connect"

async def __aexit__(self, exc_type, exc_value, traceback):
return await self.listener.__aexit__(exc_type, exc_value, traceback)

async def handler(self, conn):
async with conn:
async with self.client.stream_async(self.method) as stream:
async with forward_stream(conn, stream):
pass


@dataclass(kw_only=True)
class TcpPortforwardAdapter(PortforwardAdapter):
local_host: str = "127.0.0.1"
local_port: int = 0

async def __aenter__(self):
self.listener = TemporaryTcpListener(
self.handler, local_host=self.local_host, local_port=self.local_port, reuse_port=True
)

return await self.listener.__aenter__()


@dataclass(kw_only=True)
class UnixPortforwardAdapter(PortforwardAdapter):
async def __aenter__(self):
self.listener = TemporaryUnixListener(self.handler)

return await self.listener.__aenter__()
async def handler(client, method, conn):
async with conn:
async with client.stream_async(method) as stream:
async with forward_stream(conn, stream):
pass


@blocking
@asynccontextmanager
async def TcpPortforwardAdapter(
*,
client: DriverClient,
method: str = "connect",
local_host: str = "127.0.0.1",
local_port: int = 0,
):
async with TemporaryTcpListener(
partial(handler, client, method),
local_host=local_host,
local_port=local_port,
) as addr:
yield addr


@blocking
@asynccontextmanager
async def UnixPortforwardAdapter(
*,
client: DriverClient,
method: str = "connect",
):
async with TemporaryUnixListener(partial(handler, client, method)) as addr:
yield addr
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from contextlib import suppress
from contextlib import asynccontextmanager, suppress
from dataclasses import dataclass
from typing import Literal

Expand All @@ -7,7 +7,8 @@
from opendal import AsyncFile, Operator
from opendal.exceptions import Error

from jumpstarter.client.adapters import ClientAdapter
from jumpstarter.client import DriverClient
from jumpstarter.client.adapters import blocking
from jumpstarter.common.resources import PresignedRequestResource


Expand Down Expand Up @@ -44,26 +45,24 @@ async def aclose(self):
await self.file.close()


@dataclass(kw_only=True)
class OpendalAdapter(ClientAdapter):
operator: Operator # opendal.Operator for the storage backend
path: str # file path in storage backend relative to the storage root
mode: Literal["rb", "wb"] = "rb" # binary read or binary write mode

async def __aenter__(self):
# if the access mode is binary read, and the storage backend supports presigned read requests
if self.mode == "rb" and self.operator.capability().presign_read:
# create presigned url for the specified file with a 60 second expiration
presigned = await self.operator.to_async_operator().presign_read(self.path, expire_second=60)
return PresignedRequestResource(
headers=presigned.headers, url=presigned.url, method=presigned.method
).model_dump(mode="json")
# otherwise stream the file content from the client to the exporter
else:
file = await self.operator.to_async_operator().open(self.path, self.mode)
self.resource = self.client.resource_async(AsyncFileStream(file=file))
return await self.resource.__aenter__()

async def __aexit__(self, exc_type, exc_value, traceback):
if hasattr(self, "resource"):
await self.resource.__aexit__(exc_type, exc_value, traceback)
@blocking
@asynccontextmanager
async def OpendalAdapter(
*,
client: DriverClient,
operator: Operator, # opendal.Operator for the storage backend
path: str, # file path in storage backend relative to the storage root
mode: Literal["rb", "wb"] = "rb", # binary read or binary write mode
):
# if the access mode is binary read, and the storage backend supports presigned read requests
if mode == "rb" and operator.capability().presign_read:
# create presigned url for the specified file with a 60 second expiration
presigned = await operator.to_async_operator().presign_read(path, expire_second=60)
yield PresignedRequestResource(
headers=presigned.headers, url=presigned.url, method=presigned.method
).model_dump(mode="json")
# otherwise stream the file content from the client to the exporter
else:
file = await operator.to_async_operator().open(path, mode)
async with client.resource_async(AsyncFileStream(file=file)) as res:
yield res
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ def open(self) -> fdspawn:
Returns:
fdspawn: The pexpect session object.
"""
self._context_manager = self.pexpect()
return self._context_manager.__enter__()

def close(self):
if hasattr(self, "_context_manager"):
self._context_manager.__exit__(None, None, None)
return self.stack.enter_context(self.pexpect())

@contextmanager
def pexpect(self):
Expand Down
Loading