Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions CONTRIB.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ To create a new driver scaffold, you can use the `create_driver.sh` script in th

From the root directory of the project, run the following command:
```shell
$ ./contrib/__templates__/create_driver.sh shell Shell "Miguel Angel Ajo" miguelangel@ajo.es
$ ./__templates__/create_driver.sh yepkit Ykush "Miguel Angel Ajo" "miguelangel@ajo.es"

Creating: contrib/drivers/shell/jumpstarter_driver_shell/__init__.py
Creating: contrib/drivers/shell/jumpstarter_driver_shell/client.py
Creating: contrib/drivers/shell/jumpstarter_driver_shell/driver_test.py
Creating: contrib/drivers/shell/jumpstarter_driver_shell/driver.py
Creating: contrib/drivers/shell/.gitignore
Creating: contrib/drivers/shell/pyproject.toml
Creating: contrib/drivers/shell/README.md
Creating: contrib/drivers/shell/examples/exporter.yaml
Creating: packages/jumpstarter_driver_yepkit/jumpstarter_driver_yepkit/__init__.py
Creating: packages/jumpstarter_driver_yepkit/jumpstarter_driver_yepkit/client.py
Creating: packages/jumpstarter_driver_yepkit/jumpstarter_driver_yepkit/driver_test.py
Creating: packages/jumpstarter_driver_yepkit/jumpstarter_driver_yepkit/driver.py
Creating: packages/jumpstarter_driver_yepkit/.gitignore
Creating: packages/jumpstarter_driver_yepkit/pyproject.toml
Creating: packages/jumpstarter_driver_yepkit/README.md
Creating: packages/jumpstarter_driver_yepkit/examples/exporter.yaml

$ make sync
uv sync --all-packages --all-extras
Expand Down
6 changes: 3 additions & 3 deletions __templates__/create_driver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export AUTHOR_NAME=$3
export AUTHOR_EMAIL=$4

# create the driver directory
DRIVER_DIRECTORY=packages/jumpstarter_driver_${DRIVER_NAME}
DRIVER_DIRECTORY=packages/jumpstarter-driver-${DRIVER_NAME}
MODULE_DIRECTORY=${DRIVER_DIRECTORY}/jumpstarter_driver_${DRIVER_NAME}
# create the module directories
mkdir -p ${MODULE_DIRECTORY}
Expand All @@ -30,10 +30,10 @@ mkdir -p ${DRIVER_DIRECTORY}/examples

for f in __init__.py client.py driver_test.py driver.py; do
echo "Creating: ${MODULE_DIRECTORY}/${f}"
envsubst < contrib/__templates__/driver/jumpstarter_driver/${f}.tmpl > ${MODULE_DIRECTORY}/${f}
envsubst < __templates__/driver/jumpstarter_driver/${f}.tmpl > ${MODULE_DIRECTORY}/${f}
done

for f in .gitignore pyproject.toml README.md examples/exporter.yaml; do
echo "Creating: ${DRIVER_DIRECTORY}/${f}"
envsubst < packages/__templates__/driver/${f}.tmpl > ${DRIVER_DIRECTORY}/${f}
envsubst < __templates__/driver/${f}.tmpl > ${DRIVER_DIRECTORY}/${f}
done
74 changes: 74 additions & 0 deletions docs/source/api-reference/drivers/yepkit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Yepkit

Drivers for yepkit products.

## Ykush driver

This driver provides a client for the [Ykush USB switch](https://www.yepkit.com/products/ykush).
**driver**: `jumpstarter_driver_yepkit.driver.Ykush`

### Driver configuration
```yaml
export:
power:
type: jumpstarter_driver_yepkit.driver.Ykush
config:
serial: "YK25838"
port: "1"

power2:
type: jumpstarter_driver_yepkit.driver.Ykush
config:
serial: "YK25838"
port: "2"

```
### Config parameters

| Parameter | Description | Type | Required | Default |
|-----------|-------------|------|----------|---------|
| serial | The serial number of the ykush hub, empty means auto-detection | no | None | |
| port | The port number to be managed, "0", "1", "2", "a" which means all | str | yes | "a" |


### PowerClient API

The yepkit ykush driver provides a `PowerClient` with the following API:

```{eval-rst}
.. autoclass:: jumpstarter_driver_power.client.PowerClient
:members: on, off
```

### Examples
Powering on and off a device
```{testcode}
client.power.on()
time.sleep(1)
client.power.off()
```

### CLI access
```bash
$ sudo ~/.cargo/bin/uv run jmp exporter shell -c ./packages/jumpstarter-driver-yepkit/examples/exporter.yaml
WARNING:Ykush:No serial number provided for ykush, using the first one found: YK25838
INFO:Ykush:Power OFF for Ykush YK25838 on port 1
INFO:Ykush:Power OFF for Ykush YK25838 on port 2

$$ j
Usage: j [OPTIONS] COMMAND [ARGS]...

Generic composite device

Options:
--help Show this message and exit.

Commands:
power Generic power
power2 Generic power

$$ j power on
INFO:Ykush:Power ON for Ykush YK25838 on port 1

$$ exit
```
11 changes: 11 additions & 0 deletions packages/jumpstarter-driver-yepkit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Jumpstarter Driver for the ykush USB Hub from Yepkit

This driver is for the ykush USB Hub from Yepkit. It allows you to control the power of each port of the hub.

If you want to test this locally, you can use the following commands from the root of the repository:

```bash
sudo $(which uv) run jmp exporter shell --config ./packages/jumpstarter-driver-yepkit/examples/exporter.yaml
```

Please note that sudo is necessary to gain access to the raw USB interfaces.
29 changes: 29 additions & 0 deletions packages/jumpstarter-driver-yepkit/examples/exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: jumpstarter.dev/v1alpha1
kind: ExporterConfig
metadata:
name: exporter
namespace: default
endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082
token: "<token>"
export:
power:
type: jumpstarter_driver_yepkit.driver.Ykush
config:
port: "1"

power2:
type: jumpstarter_driver_yepkit.driver.Ykush
config:
serial: "YK25838"
port: "2"

power3:
type: jumpstarter_driver_yepkit.driver.Ykush
config:
port: "3"

all:
type: jumpstarter_driver_yepkit.driver.Ykush
config:
port: "all"

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
import usb


def pytest_runtest_call(item):
try:
item.runtest()
except FileNotFoundError:
pytest.skip("yepkit not available")
except usb.core.USBError:
pytest.skip("USB not available, could need root permissions")
except usb.core.NoBackendError:
pytest.skip("No USB backend")
145 changes: 145 additions & 0 deletions packages/jumpstarter-driver-yepkit/jumpstarter_driver_yepkit/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import threading
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field

import usb.core
import usb.util
from jumpstarter_driver_power.driver import PowerInterface, PowerReading

from jumpstarter.driver import Driver, export

VID = 0x04D8
PID = 0xF2F7

PORT_UP_COMMANDS = {
'1': 0x11,
'2': 0x12,
'3': 0x13,
'all': 0x1A
}

PORT_DOWN_COMMANDS = {
'1': 0x01,
'2': 0x02,
'3': 0x03,
'all': 0x0A
}

PORT_STATUS_COMMANDS = {
'1': 0x21,
'2': 0x22,
'3': 0x23
}

VALID_DEFAULTS = ["on", "off", "keep"]

# static shared array of usb devices, interfaces on same device cannot be claimed multiple times
_USB_DEVS = {}
_USB_DEVS_LOCK = threading.Lock() # Lock for synchronizing access, we don't do multithread, but just in case..

@dataclass(kw_only=True)
class Ykush(PowerInterface, Driver):
""" driver for Yepkit Ykush USB Hub with Power control """
serial: str | None = field(default=None)
default: str = "off"
port: str = "all"

dev: usb.core.Device = field(init=False)

def __post_init__(self):
if hasattr(super(), "__post_init__"):
super().__post_init__()

keys = PORT_UP_COMMANDS.keys()
if self.port not in keys:
raise ValueError(
f"The ykush driver port must be any of the following values: {keys}")

if self.default not in VALID_DEFAULTS:
raise ValueError(
f"The ykush driver default must be any of the following values: {VALID_DEFAULTS}")

with _USB_DEVS_LOCK:
# another instance already claimed this device?
if self.serial is None and len(_USB_DEVS.keys()) > 0:
self.serial = list(_USB_DEVS.keys())[0]
self.dev = _USB_DEVS[self.serial]
return

if self.serial in _USB_DEVS:
self.dev = _USB_DEVS[self.serial]
return

for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True):
serial = usb.util.get_string(dev, dev.iSerialNumber, 0)
if serial == self.serial or self.serial is None:
_USB_DEVS[serial] = dev
if self.serial is None:
self.logger.warning(
f"No serial number provided for ykush, using the first one found: {serial}")
self.serial = serial
self.dev = dev
return

raise FileNotFoundError("failed to find ykush device")

def _send_cmd(self, cmd, report_size=64):
out_ep, in_ep = self._get_endpoints(self.dev)
out_buf = [0x00] * report_size
out_buf[0] = cmd # YKUSH command

# Write to the OUT endpoint
out_ep.write(out_buf)

# Read from the IN endpoint
in_buf = in_ep.read(report_size, timeout=2000)
return list(in_buf)

def _get_endpoints(self, dev):
"""
From the active configuration, find the first IN and OUT endpoints.
"""
cfg = self.dev.get_active_configuration()
interface = cfg[(0, 0)]

out_endpoint = usb.util.find_descriptor(
interface,
custom_match=lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
)

in_endpoint = usb.util.find_descriptor(
interface,
custom_match=lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
)

if not out_endpoint or not in_endpoint:
raise RuntimeError("Could not find both IN and OUT endpoints for ykush.")

return out_endpoint, in_endpoint

# reset function is called by the exporter to setup the default state
def reset(self):
if self.default == "on":
self.on()
elif self.default == "off":
self.off()

@export
def on(self):
self.logger.info(f"Power ON for Ykush {self.serial} on port {self.port}")
cmd = PORT_UP_COMMANDS.get(self.port)
_ = self._send_cmd(cmd)
return

@export
def off(self):
self.logger.info(f"Power OFF for Ykush {self.serial} on port {self.port}")
cmd = PORT_DOWN_COMMANDS.get(self.port)
_ = self._send_cmd(cmd)
return

@export
def read(self) -> AsyncGenerator[PowerReading, None]:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .driver import Ykush
from jumpstarter.common.utils import serve


def test_drivers_yepkit():
instance = Ykush()

with serve(instance) as client:
client.on()
client.off()
40 changes: 40 additions & 0 deletions packages/jumpstarter-driver-yepkit/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[project]
name = "jumpstarter-driver-yepkit"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
{ name = "Miguel Angel Ajo", email = "miguelangel@ajo.es" }
]
requires-python = ">=3.11"
dependencies = [
"anyio>=4.6.2.post1",
"pyusb>=1.2.1",
"jumpstarter_driver_power",
"jumpstarter",
]

[tool.hatch.version]
source = "vcs"
raw-options = { 'root' = '../../'}

[tool.hatch.metadata.hooks.vcs.urls]
Homepage = "https://jumpstarter.dev"
source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"

[tool.pytest.ini_options]
addopts = "--cov --cov-report=html --cov-report=xml"
log_cli = true
log_cli_level = "INFO"
testpaths = ["jumpstarter_driver_yepkit"]
asyncio_mode = "auto"

[build-system]
requires = ["hatchling", "hatch-vcs"]
build-backend = "hatchling.build"

[dependency-groups]
dev = [
"pytest-cov>=6.0.0",
"pytest>=8.3.3",
]
Loading