diff --git a/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py b/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py index 3c3494b90..8f70dea2b 100644 --- a/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py +++ b/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import time from collections.abc import AsyncGenerator from dataclasses import dataclass, field @@ -8,14 +7,14 @@ import pyudev import usb.core import usb.util -from anyio import fail_after, sleep -from anyio.streams.file import FileReadStream, FileWriteStream +from anyio import sleep from jumpstarter_driver_composite.driver import CompositeInterface from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface from jumpstarter_driver_power.driver import PowerInterface, PowerReading from jumpstarter_driver_pyserial.driver import PySerial from serial.serialutil import SerialException +from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device from jumpstarter.driver import Driver, export @@ -23,6 +22,9 @@ class DutlinkConfig: serial: str | None = field(default=None) timeout_s: int = field(default=20) # 20 seconds, power control sequences can block USB for a long time + storage_timeout: int = field(default=10) + storage_leeway: int = field(default=6) + storage_fsync_timeout: int = field(default=900) dev: usb.core.Device = field(init=False) itf: usb.core.Interface = field(init=False) @@ -187,44 +189,29 @@ def dut(self): def off(self): return self.control("off") - async def wait_for_storage_device(self): - with fail_after(20): - while True: - self.logger.debug(f"waiting for storage device {self.storage_device}") - if os.path.exists(self.storage_device): - self.logger.debug(f"storage device {self.storage_device} is ready") - # https://stackoverflow.com/a/2774125 - fd = os.open(self.storage_device, os.O_WRONLY) - try: - if os.lseek(fd, 0, os.SEEK_END) > 0: - break - finally: - os.close(fd) - await sleep(1) - @export async def write(self, src: str): self.host() - await self.wait_for_storage_device() - async with await FileWriteStream.from_path(self.storage_device) as stream: - async with self.resource(src) as res: - total_bytes = 0 - next_print = 0 - async for chunk in res: - await stream.send(chunk) - if total_bytes > next_print: - self.logger.debug(f"{self.storage_device} written {total_bytes / (1024 * 1024)} MB") - next_print += 50 * 1024 * 1024 - total_bytes += len(chunk) + async with self.resource(src) as res: + await write_to_storage_device( + self.storage_device, + res, + timeout=self.storage_timeout, + leeway=self.storage_leeway, + fsync_timeout=self.storage_fsync_timeout, + logger=self.logger, + ) @export async def read(self, dst: str): self.host() - await self.wait_for_storage_device() - async with await FileReadStream.from_path(self.storage_device) as stream: - async with self.resource(dst) as res: - async for chunk in stream: - await res.send(chunk) + async with self.resource(dst) as res: + await read_from_storage_device( + self.storage_device, + res, + timeout=self.storage_timeout, + logger=self.logger, + ) @dataclass(kw_only=True) diff --git a/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py b/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py index 28bfaeb7e..7f33d5d2b 100644 --- a/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py +++ b/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py @@ -1,15 +1,13 @@ from __future__ import annotations -import os from dataclasses import dataclass, field import pyudev import usb.core import usb.util -from anyio import fail_after, sleep -from anyio.streams.file import FileReadStream, FileWriteStream from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface +from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device from jumpstarter.driver import Driver, export @@ -20,6 +18,9 @@ class SDWire(StorageMuxFlasherInterface, Driver): itf: usb.core.Interface = field(init=False) storage_device: str | None = field(default=None) + storage_timeout: int = field(default=10) + storage_leeway: int = field(default=6) + storage_fsync_timeout: int = field(default=900) def effective_storage_device(self): if self.storage_device is None: @@ -101,45 +102,26 @@ def dut(self): def off(self): self.host() - async def wait_for_storage_device(self): - with fail_after(10): - storage_device = self.effective_storage_device() - - while True: - # https://stackoverflow.com/a/2774125 - try: - fd = os.open(storage_device, os.O_WRONLY) - if os.lseek(fd, 0, os.SEEK_END) > 0: - break - except OSError as e: - match e.errno: - case 123: # No medium found - pass - case 5: # Input/output error - pass - case _: - raise - finally: - if "fd" in locals(): - os.close(fd) - await sleep(1) - - return storage_device - @export async def write(self, src: str): self.host() - storage_device = await self.wait_for_storage_device() - async with await FileWriteStream.from_path(storage_device) as stream: - async with self.resource(src) as res: - async for chunk in res: - await stream.send(chunk) + async with self.resource(src) as res: + await write_to_storage_device( + self.effective_storage_device(), + res, + timeout=self.storage_timeout, + leeway=self.storage_leeway, + fsync_timeout=self.storage_fsync_timeout, + logger=self.logger, + ) @export async def read(self, dst: str): self.host() - storage_device = await self.wait_for_storage_device() - async with await FileReadStream.from_path(storage_device) as stream: - async with self.resource(dst) as res: - async for chunk in stream: - await res.send(chunk) + async with self.resource(dst) as res: + await read_from_storage_device( + self.effective_storage_device(), + res, + timeout=self.storage_timeout, + logger=self.logger, + ) diff --git a/packages/jumpstarter/jumpstarter/common/storage.py b/packages/jumpstarter/jumpstarter/common/storage.py new file mode 100644 index 000000000..0518a503e --- /dev/null +++ b/packages/jumpstarter/jumpstarter/common/storage.py @@ -0,0 +1,130 @@ +import errno +import os +from logging import Logger +from typing import Literal + +from anyio import fail_after, sleep +from anyio.abc import AnyByteStream +from anyio.streams.file import FileReadStream, FileWriteStream + + +async def wait_for_storage_device( # noqa: C901 + storage_device: os.PathLike, + mode: Literal["wb", "rb"], + timeout: int = 10, + *, + logger: Logger | None = None, +) -> os.PathLike: + with fail_after(timeout): + while True: + # https://stackoverflow.com/a/2774125 + try: + match mode: + case "wb": + fd = os.open(storage_device, os.O_WRONLY) + case "rb": + fd = os.open(storage_device, os.O_RDONLY) + case _: + raise ValueError("invalid mode: {}".format(mode)) + with os.fdopen(fd, mode): # to prevent fd from leaking + if os.lseek(fd, 0, os.SEEK_END) > 0: + if logger: + logger.info("storage device {} is ready".format(storage_device)) + break + if logger: + logger.debug("waiting for storage device {} to have a nonzero size".format(storage_device)) + except FileNotFoundError: + if logger: + logger.debug("waiting for storage device {} to appear".format(storage_device)) + except OSError as e: + match e.errno: + case errno.ENOMEDIUM | errno.EIO: + if logger: + logger.debug("waiting for storage device {} to be ready".format(storage_device)) + case _: + raise + + await sleep(1) + + return storage_device + + +async def write_to_storage_device( + storage_device: os.PathLike, + resource: AnyByteStream, + timeout: int = 10, + fsync_timeout: int = 900, + leeway: int = 6, + *, + logger: Logger | None = None, +): + path = await wait_for_storage_device( + storage_device, + mode="wb", + timeout=timeout, + logger=logger, + ) + with os.fdopen(os.open(path, os.O_WRONLY), "wb") as file: + async with FileWriteStream(file) as stream: + total_bytes = 0 + next_print = 0 + async for chunk in resource: + await stream.send(chunk) + if logger: + total_bytes += len(chunk) + if total_bytes > next_print: + logger.info( + "written {} MB to storage device {}".format( + total_bytes / (1024 * 1024), + storage_device, + ) + ) + next_print += 50 * 1024 * 1024 + + with fail_after(fsync_timeout): + while True: + try: + if logger: + logger.info("fsyncing storage device {}, please wait".format(storage_device)) + os.fsync(file.fileno()) + except OSError as e: + if e.errno == errno.EIO: + await sleep(1) + continue + else: + raise + else: + break + + await sleep(leeway) + + +async def read_from_storage_device( + storage_device: os.PathLike, + resource: AnyByteStream, + timeout: int = 10, + *, + logger: Logger | None = None, +): + path = await wait_for_storage_device( + storage_device, + mode="rb", + timeout=timeout, + logger=logger, + ) + with os.fdopen(os.open(path, os.O_RDONLY), "rb") as file: + async with FileReadStream(file) as stream: + total_bytes = 0 + next_print = 0 + async for chunk in stream: + await resource.send(chunk) + if logger: + total_bytes += len(chunk) + if total_bytes > next_print: + logger.info( + "read {} MB from storage device {}".format( + total_bytes / (1024 * 1024), + storage_device, + ) + ) + next_print += 50 * 1024 * 1024