From 221d3ee6bf53ca246ef43647fa66579859d42a9f Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Tue, 18 Mar 2025 09:23:57 -0400 Subject: [PATCH 1/3] treewide: improved handling of storage device --- .../jumpstarter_driver_dutlink/driver.py | 40 +----- .../jumpstarter_driver_sdwire/driver.py | 43 +------ .../jumpstarter/jumpstarter/common/storage.py | 118 ++++++++++++++++++ 3 files changed, 129 insertions(+), 72 deletions(-) create mode 100644 packages/jumpstarter/jumpstarter/common/storage.py diff --git a/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py b/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py index 3c3494b90..a97c08cb2 100644 --- a/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py +++ b/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import time from collections.abc import AsyncGenerator from dataclasses import dataclass, field @@ -8,14 +7,14 @@ import pyudev import usb.core import usb.util -from anyio import fail_after, sleep -from anyio.streams.file import FileReadStream, FileWriteStream +from anyio import sleep from jumpstarter_driver_composite.driver import CompositeInterface from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface from jumpstarter_driver_power.driver import PowerInterface, PowerReading from jumpstarter_driver_pyserial.driver import PySerial from serial.serialutil import SerialException +from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device from jumpstarter.driver import Driver, export @@ -187,44 +186,17 @@ def dut(self): def off(self): return self.control("off") - async def wait_for_storage_device(self): - with fail_after(20): - while True: - self.logger.debug(f"waiting for storage device {self.storage_device}") - if os.path.exists(self.storage_device): - self.logger.debug(f"storage device {self.storage_device} is ready") - # https://stackoverflow.com/a/2774125 - fd = os.open(self.storage_device, os.O_WRONLY) - try: - if os.lseek(fd, 0, os.SEEK_END) > 0: - break - finally: - os.close(fd) - await sleep(1) - @export async def write(self, src: str): self.host() - await self.wait_for_storage_device() - async with await FileWriteStream.from_path(self.storage_device) as stream: - async with self.resource(src) as res: - total_bytes = 0 - next_print = 0 - async for chunk in res: - await stream.send(chunk) - if total_bytes > next_print: - self.logger.debug(f"{self.storage_device} written {total_bytes / (1024 * 1024)} MB") - next_print += 50 * 1024 * 1024 - total_bytes += len(chunk) + async with self.resource(src) as res: + await write_to_storage_device(self.storage_device, res, logger=self.logger) @export async def read(self, dst: str): self.host() - await self.wait_for_storage_device() - async with await FileReadStream.from_path(self.storage_device) as stream: - async with self.resource(dst) as res: - async for chunk in stream: - await res.send(chunk) + async with self.resource(dst) as res: + await read_from_storage_device(self.storage_device, res, logger=self.logger) @dataclass(kw_only=True) diff --git a/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py b/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py index 28bfaeb7e..8cd0009e1 100644 --- a/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py +++ b/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py @@ -1,15 +1,13 @@ from __future__ import annotations -import os from dataclasses import dataclass, field import pyudev import usb.core import usb.util -from anyio import fail_after, sleep -from anyio.streams.file import FileReadStream, FileWriteStream from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface +from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device from jumpstarter.driver import Driver, export @@ -101,45 +99,14 @@ def dut(self): def off(self): self.host() - async def wait_for_storage_device(self): - with fail_after(10): - storage_device = self.effective_storage_device() - - while True: - # https://stackoverflow.com/a/2774125 - try: - fd = os.open(storage_device, os.O_WRONLY) - if os.lseek(fd, 0, os.SEEK_END) > 0: - break - except OSError as e: - match e.errno: - case 123: # No medium found - pass - case 5: # Input/output error - pass - case _: - raise - finally: - if "fd" in locals(): - os.close(fd) - await sleep(1) - - return storage_device - @export async def write(self, src: str): self.host() - storage_device = await self.wait_for_storage_device() - async with await FileWriteStream.from_path(storage_device) as stream: - async with self.resource(src) as res: - async for chunk in res: - await stream.send(chunk) + async with self.resource(src) as res: + await write_to_storage_device(self.effective_storage_device(), res, logger=self.logger) @export async def read(self, dst: str): self.host() - storage_device = await self.wait_for_storage_device() - async with await FileReadStream.from_path(storage_device) as stream: - async with self.resource(dst) as res: - async for chunk in stream: - await res.send(chunk) + async with self.resource(dst) as res: + await read_from_storage_device(self.effective_storage_device(), res, logger=self.logger) diff --git a/packages/jumpstarter/jumpstarter/common/storage.py b/packages/jumpstarter/jumpstarter/common/storage.py new file mode 100644 index 000000000..4bbea94a8 --- /dev/null +++ b/packages/jumpstarter/jumpstarter/common/storage.py @@ -0,0 +1,118 @@ +import errno +import os +from logging import Logger +from typing import Literal + +from anyio import fail_after, sleep +from anyio.abc import AnyByteStream +from anyio.streams.file import FileReadStream, FileWriteStream + + +async def wait_for_storage_device( # noqa: C901 + storage_device: os.PathLike, + mode: Literal["wb", "rb"], + timeout: int = 10, + *, + logger: Logger | None = None, +) -> os.PathLike: + with fail_after(timeout): + while True: + # https://stackoverflow.com/a/2774125 + try: + match mode: + case "wb": + fd = os.open(storage_device, os.O_WRONLY) + case "rb": + fd = os.open(storage_device, os.O_RDONLY) + case _: + raise ValueError("invalid mode: {}".format(mode)) + with os.fdopen(fd, mode): # to prevent fd from leaking + if os.lseek(fd, 0, os.SEEK_END) > 0: + if logger: + logger.info("storage device {} is ready".format(storage_device)) + break + if logger: + logger.debug("waiting for storage device {} to have a nonzero size".format(storage_device)) + except FileNotFoundError: + if logger: + logger.debug("waiting for storage device {} to appear".format(storage_device)) + except OSError as e: + match e.errno: + case errno.ENOMEDIUM | errno.EIO: + if logger: + logger.debug("waiting for storage device {} to be ready".format(storage_device)) + case _: + raise + + await sleep(1) + + return storage_device + + +async def write_to_storage_device( + storage_device: os.PathLike, + resource: AnyByteStream, + timeout: int = 10, + fsync_timeout: int = 900, + leeway: int = 6, + *, + logger: Logger | None = None, +): + path = await wait_for_storage_device( + storage_device, + mode="wb", + timeout=timeout, + logger=logger, + ) + with os.fdopen(os.open(path, os.O_WRONLY), "wb") as file: + async with FileWriteStream(file) as stream: + total_bytes = 0 + next_print = 0 + async for chunk in resource: + await stream.send(chunk) + if logger: + total_bytes += len(chunk) + if total_bytes > next_print: + logger.info( + "written {} MB to storage device {}".format( + total_bytes / (1024 * 1024), + storage_device, + ) + ) + next_print += 50 * 1024 * 1024 + + with fail_after(fsync_timeout): + while True: + try: + if logger: + logger.info("fsyncing storage device {}, please wait".format(storage_device)) + os.fsync(file.fileno()) + except OSError as e: + if e.errno == errno.EIO: + await sleep(1) + continue + else: + raise + else: + break + + await sleep(leeway) + + +async def read_from_storage_device( + storage_device: os.PathLike, + resource: AnyByteStream, + timeout: int = 10, + *, + logger: Logger | None = None, +): + path = await wait_for_storage_device( + storage_device, + mode="rb", + timeout=timeout, + logger=logger, + ) + with os.fdopen(os.open(path, os.O_RDONLY), "rb") as file: + async with FileReadStream(file) as stream: + async for chunk in stream: + await resource.send(chunk) From 44170c80853c9897effde9e4b7e86bbc695b5e8a Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Tue, 18 Mar 2025 11:57:36 -0400 Subject: [PATCH 2/3] Make storage timeout configurable --- .../jumpstarter_driver_dutlink/driver.py | 19 +++++++++++++++++-- .../jumpstarter_driver_sdwire/driver.py | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py b/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py index a97c08cb2..8f70dea2b 100644 --- a/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py +++ b/packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py @@ -22,6 +22,9 @@ class DutlinkConfig: serial: str | None = field(default=None) timeout_s: int = field(default=20) # 20 seconds, power control sequences can block USB for a long time + storage_timeout: int = field(default=10) + storage_leeway: int = field(default=6) + storage_fsync_timeout: int = field(default=900) dev: usb.core.Device = field(init=False) itf: usb.core.Interface = field(init=False) @@ -190,13 +193,25 @@ def off(self): async def write(self, src: str): self.host() async with self.resource(src) as res: - await write_to_storage_device(self.storage_device, res, logger=self.logger) + await write_to_storage_device( + self.storage_device, + res, + timeout=self.storage_timeout, + leeway=self.storage_leeway, + fsync_timeout=self.storage_fsync_timeout, + logger=self.logger, + ) @export async def read(self, dst: str): self.host() async with self.resource(dst) as res: - await read_from_storage_device(self.storage_device, res, logger=self.logger) + await read_from_storage_device( + self.storage_device, + res, + timeout=self.storage_timeout, + logger=self.logger, + ) @dataclass(kw_only=True) diff --git a/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py b/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py index 8cd0009e1..7f33d5d2b 100644 --- a/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py +++ b/packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py @@ -18,6 +18,9 @@ class SDWire(StorageMuxFlasherInterface, Driver): itf: usb.core.Interface = field(init=False) storage_device: str | None = field(default=None) + storage_timeout: int = field(default=10) + storage_leeway: int = field(default=6) + storage_fsync_timeout: int = field(default=900) def effective_storage_device(self): if self.storage_device is None: @@ -103,10 +106,22 @@ def off(self): async def write(self, src: str): self.host() async with self.resource(src) as res: - await write_to_storage_device(self.effective_storage_device(), res, logger=self.logger) + await write_to_storage_device( + self.effective_storage_device(), + res, + timeout=self.storage_timeout, + leeway=self.storage_leeway, + fsync_timeout=self.storage_fsync_timeout, + logger=self.logger, + ) @export async def read(self, dst: str): self.host() async with self.resource(dst) as res: - await read_from_storage_device(self.effective_storage_device(), res, logger=self.logger) + await read_from_storage_device( + self.effective_storage_device(), + res, + timeout=self.storage_timeout, + logger=self.logger, + ) From 9c1b7d8f8c59c62f67da7933f471c1428b229aa6 Mon Sep 17 00:00:00 2001 From: Nick Cao Date: Tue, 18 Mar 2025 12:25:23 -0400 Subject: [PATCH 3/3] Also log read progress --- packages/jumpstarter/jumpstarter/common/storage.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/jumpstarter/jumpstarter/common/storage.py b/packages/jumpstarter/jumpstarter/common/storage.py index 4bbea94a8..0518a503e 100644 --- a/packages/jumpstarter/jumpstarter/common/storage.py +++ b/packages/jumpstarter/jumpstarter/common/storage.py @@ -114,5 +114,17 @@ async def read_from_storage_device( ) with os.fdopen(os.open(path, os.O_RDONLY), "rb") as file: async with FileReadStream(file) as stream: + total_bytes = 0 + next_print = 0 async for chunk in stream: await resource.send(chunk) + if logger: + total_bytes += len(chunk) + if total_bytes > next_print: + logger.info( + "read {} MB from storage device {}".format( + total_bytes / (1024 * 1024), + storage_device, + ) + ) + next_print += 50 * 1024 * 1024