Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ def wrapped(*args, **kwargs):
"Whether this validator is serving jobs and setting weights",
bool,
),
"DYNAMIC_SYNC_ORGANIC_JOBS": (False, "Run the sync implementation of organic jobs", bool),
"DYNAMIC_DUMMY": (0, "Dummy config for the purpose of testing", int),
"DYNAMIC_MANIFEST_SCORE_MULTIPLIER": (
1.05,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
import uuid

from asgiref.sync import async_to_sync
from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS
from compute_horde.fv_protocol.facilitator_requests import V2JobRequest
from compute_horde.fv_protocol.validator_requests import JobStatusUpdate
Expand All @@ -12,22 +11,23 @@
from django.core.management.base import BaseCommand

from compute_horde_validator.validator.allowance.default import allowance
from compute_horde_validator.validator.dynamic_config import get_config
from compute_horde_validator.validator.models import (
Miner,
MinerBlacklist,
OrganicJob,
)
from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient
from compute_horde_validator.validator.organic_jobs.miner_driver import (
drive_organic_job,
from compute_horde_validator.validator.organic_jobs.miner_driver_sync import (
MinerClient,
SyncOrganicJobDriver,
)


def get_keypair():
return settings.BITTENSOR_WALLET().get_hotkey()


async def notify_job_status_update(msg: JobStatusUpdate):
def notify_job_status_update(msg: JobStatusUpdate):
print(f"\njob status: {msg.status}")
if msg.metadata:
if details := (
Expand Down Expand Up @@ -143,24 +143,29 @@ def handle(self, *args, **options):
block=block,
)

async def _run_job():
try:
keypair = get_keypair()
ws_url = (
f"ws://{miner_address}:{miner_port}/v0.1/validator_interface/{keypair.ss58_address}"
)
miner_client = MinerClient(
url=ws_url,
miner_hotkey=miner.hotkey,
miner_address=miner_address,
miner_port=miner_port,
job_uuid=str(job.job_uuid),
my_keypair=keypair,
validator_keypair=keypair,
)
await drive_organic_job(
driver = SyncOrganicJobDriver(
miner_client,
job,
job_request,
notify_callback=notify_job_status_update,
miner_hotkey=miner.hotkey,
my_keypair=keypair,
allowed_leeway=get_config("DYNAMIC_ORGANIC_JOB_ALLOWED_LEEWAY_TIME"),
reservation_time_limit=get_config("DYNAMIC_EXECUTOR_RESERVATION_TIME_LIMIT"),
executor_startup_time_limit=get_config("DYNAMIC_EXECUTOR_STARTUP_TIME_LIMIT"),
max_overall_time_limit=get_config("DYNAMIC_MAX_OVERALL_ORGANIC_JOB_TIME_LIMIT"),
status_callback=notify_job_status_update,
)

try:
async_to_sync(_run_job)()
driver.run()
except Exception as e:
print(f"Failed to run job {job.job_uuid}: {e}")
sys.exit(1)
Expand Down

This file was deleted.

Loading
Loading