Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
from __future__ import annotations

import os
import time
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field

import pyudev
import usb.core
import usb.util
from anyio import fail_after, sleep
from anyio.streams.file import FileReadStream, FileWriteStream
from anyio import sleep
from jumpstarter_driver_composite.driver import CompositeInterface
from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface
from jumpstarter_driver_power.driver import PowerInterface, PowerReading
from jumpstarter_driver_pyserial.driver import PySerial
from serial.serialutil import SerialException

from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device
from jumpstarter.driver import Driver, export


@dataclass(kw_only=True)
class DutlinkConfig:
serial: str | None = field(default=None)
timeout_s: int = field(default=20) # 20 seconds, power control sequences can block USB for a long time
storage_timeout: int = field(default=10)
storage_leeway: int = field(default=6)
storage_fsync_timeout: int = field(default=900)

dev: usb.core.Device = field(init=False)
itf: usb.core.Interface = field(init=False)
Expand Down Expand Up @@ -187,44 +189,29 @@ def dut(self):
def off(self):
return self.control("off")

async def wait_for_storage_device(self):
with fail_after(20):
while True:
self.logger.debug(f"waiting for storage device {self.storage_device}")
if os.path.exists(self.storage_device):
self.logger.debug(f"storage device {self.storage_device} is ready")
# https://stackoverflow.com/a/2774125
fd = os.open(self.storage_device, os.O_WRONLY)
try:
if os.lseek(fd, 0, os.SEEK_END) > 0:
break
finally:
os.close(fd)
await sleep(1)

@export
async def write(self, src: str):
self.host()
await self.wait_for_storage_device()
async with await FileWriteStream.from_path(self.storage_device) as stream:
async with self.resource(src) as res:
total_bytes = 0
next_print = 0
async for chunk in res:
await stream.send(chunk)
if total_bytes > next_print:
self.logger.debug(f"{self.storage_device} written {total_bytes / (1024 * 1024)} MB")
next_print += 50 * 1024 * 1024
total_bytes += len(chunk)
async with self.resource(src) as res:
await write_to_storage_device(
self.storage_device,
res,
timeout=self.storage_timeout,
leeway=self.storage_leeway,
fsync_timeout=self.storage_fsync_timeout,
logger=self.logger,
)

@export
async def read(self, dst: str):
self.host()
await self.wait_for_storage_device()
async with await FileReadStream.from_path(self.storage_device) as stream:
async with self.resource(dst) as res:
async for chunk in stream:
await res.send(chunk)
async with self.resource(dst) as res:
await read_from_storage_device(
self.storage_device,
res,
timeout=self.storage_timeout,
logger=self.logger,
)


@dataclass(kw_only=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from __future__ import annotations

import os
from dataclasses import dataclass, field

import pyudev
import usb.core
import usb.util
from anyio import fail_after, sleep
from anyio.streams.file import FileReadStream, FileWriteStream
from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface

from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device
from jumpstarter.driver import Driver, export


Expand All @@ -20,6 +18,9 @@ class SDWire(StorageMuxFlasherInterface, Driver):
itf: usb.core.Interface = field(init=False)

storage_device: str | None = field(default=None)
storage_timeout: int = field(default=10)
storage_leeway: int = field(default=6)
storage_fsync_timeout: int = field(default=900)

def effective_storage_device(self):
if self.storage_device is None:
Expand Down Expand Up @@ -101,45 +102,26 @@ def dut(self):
def off(self):
self.host()

async def wait_for_storage_device(self):
with fail_after(10):
storage_device = self.effective_storage_device()

while True:
# https://stackoverflow.com/a/2774125
try:
fd = os.open(storage_device, os.O_WRONLY)
if os.lseek(fd, 0, os.SEEK_END) > 0:
break
except OSError as e:
match e.errno:
case 123: # No medium found
pass
case 5: # Input/output error
pass
case _:
raise
finally:
if "fd" in locals():
os.close(fd)
await sleep(1)

return storage_device

@export
async def write(self, src: str):
self.host()
storage_device = await self.wait_for_storage_device()
async with await FileWriteStream.from_path(storage_device) as stream:
async with self.resource(src) as res:
async for chunk in res:
await stream.send(chunk)
async with self.resource(src) as res:
await write_to_storage_device(
self.effective_storage_device(),
res,
timeout=self.storage_timeout,
leeway=self.storage_leeway,
fsync_timeout=self.storage_fsync_timeout,
logger=self.logger,
)

@export
async def read(self, dst: str):
self.host()
storage_device = await self.wait_for_storage_device()
async with await FileReadStream.from_path(storage_device) as stream:
async with self.resource(dst) as res:
async for chunk in stream:
await res.send(chunk)
async with self.resource(dst) as res:
await read_from_storage_device(
self.effective_storage_device(),
res,
timeout=self.storage_timeout,
logger=self.logger,
)
130 changes: 130 additions & 0 deletions packages/jumpstarter/jumpstarter/common/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import errno
import os
from logging import Logger
from typing import Literal

from anyio import fail_after, sleep
from anyio.abc import AnyByteStream
from anyio.streams.file import FileReadStream, FileWriteStream


async def wait_for_storage_device( # noqa: C901
storage_device: os.PathLike,
mode: Literal["wb", "rb"],
timeout: int = 10,
*,
logger: Logger | None = None,
) -> os.PathLike:
with fail_after(timeout):
while True:
# https://stackoverflow.com/a/2774125
try:
match mode:
case "wb":
fd = os.open(storage_device, os.O_WRONLY)
case "rb":
fd = os.open(storage_device, os.O_RDONLY)
case _:
raise ValueError("invalid mode: {}".format(mode))
with os.fdopen(fd, mode): # to prevent fd from leaking
if os.lseek(fd, 0, os.SEEK_END) > 0:
if logger:
logger.info("storage device {} is ready".format(storage_device))
break
if logger:
logger.debug("waiting for storage device {} to have a nonzero size".format(storage_device))
except FileNotFoundError:
if logger:
logger.debug("waiting for storage device {} to appear".format(storage_device))
except OSError as e:
match e.errno:
case errno.ENOMEDIUM | errno.EIO:
if logger:
logger.debug("waiting for storage device {} to be ready".format(storage_device))
case _:
raise

await sleep(1)

return storage_device


async def write_to_storage_device(
storage_device: os.PathLike,
resource: AnyByteStream,
timeout: int = 10,
fsync_timeout: int = 900,
leeway: int = 6,
*,
logger: Logger | None = None,
):
path = await wait_for_storage_device(
storage_device,
mode="wb",
timeout=timeout,
logger=logger,
)
with os.fdopen(os.open(path, os.O_WRONLY), "wb") as file:
async with FileWriteStream(file) as stream:
total_bytes = 0
next_print = 0
async for chunk in resource:
await stream.send(chunk)
if logger:
total_bytes += len(chunk)
if total_bytes > next_print:
logger.info(
"written {} MB to storage device {}".format(
total_bytes / (1024 * 1024),
storage_device,
)
)
next_print += 50 * 1024 * 1024

with fail_after(fsync_timeout):
while True:
try:
if logger:
logger.info("fsyncing storage device {}, please wait".format(storage_device))
os.fsync(file.fileno())
Comment thread
NickCao marked this conversation as resolved.
except OSError as e:
if e.errno == errno.EIO:
await sleep(1)
continue
else:
raise
else:
break

await sleep(leeway)


async def read_from_storage_device(
storage_device: os.PathLike,
resource: AnyByteStream,
timeout: int = 10,
*,
logger: Logger | None = None,
):
path = await wait_for_storage_device(
storage_device,
mode="rb",
timeout=timeout,
logger=logger,
)
with os.fdopen(os.open(path, os.O_RDONLY), "rb") as file:
async with FileReadStream(file) as stream:
total_bytes = 0
next_print = 0
async for chunk in stream:
await resource.send(chunk)
if logger:
total_bytes += len(chunk)
if total_bytes > next_print:
logger.info(
"read {} MB from storage device {}".format(
total_bytes / (1024 * 1024),
storage_device,
)
)
next_print += 50 * 1024 * 1024