From 922d6d10f7cd14bb39a61ce3419cfc893573701f Mon Sep 17 00:00:00 2001 From: Enam Mijbah Noor Date: Tue, 21 Apr 2026 13:07:30 +0600 Subject: [PATCH] chore: remove legacy async organic jobs The validators have been switched to the new sync code path. Impacts: validator --- .../src/compute_horde_validator/settings.py | 1 - .../commands/debug_run_organic_job.py | 35 +- .../validator/organic_jobs/miner_client.py | 45 -- .../validator/organic_jobs/miner_driver.py | 533 ------------------ .../tests/test_job_routing_incident.py | 87 ++- .../validator/tasks.py | 9 +- .../validator/tests/conftest.py | 19 +- .../validator/tests/helpers.py | 194 ++++--- .../tests/test_organic_jobs/conftest.py | 26 +- .../test_facilitator_client.py | 20 +- .../test_organic_jobs/test_happy_path.py | 41 +- .../test_organic_jobs/test_jobs_namespace.py | 21 +- .../test_organic_jobs/test_miner_driver.py | 219 ------- .../tests/test_other/test_commands.py | 4 +- 14 files changed, 243 insertions(+), 1011 deletions(-) delete mode 100644 validator/app/src/compute_horde_validator/validator/organic_jobs/miner_client.py delete mode 100644 validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py delete mode 100644 validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver.py diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index 368eb4081..6dd7356f9 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -139,7 +139,6 @@ def wrapped(*args, **kwargs): "Whether this validator is serving jobs and setting weights", bool, ), - "DYNAMIC_SYNC_ORGANIC_JOBS": (False, "Run the sync implementation of organic jobs", bool), "DYNAMIC_DUMMY": (0, "Dummy config for the purpose of testing", int), "DYNAMIC_MANIFEST_SCORE_MULTIPLIER": ( 1.05, diff --git a/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py b/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py index 699e05b6f..59a9571fc 100644 --- a/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py +++ b/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py @@ -3,7 +3,6 @@ import time import uuid -from asgiref.sync import async_to_sync from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS from compute_horde.fv_protocol.facilitator_requests import V2JobRequest from compute_horde.fv_protocol.validator_requests import JobStatusUpdate @@ -12,14 +11,15 @@ from django.core.management.base import BaseCommand from compute_horde_validator.validator.allowance.default import allowance +from compute_horde_validator.validator.dynamic_config import get_config from compute_horde_validator.validator.models import ( Miner, MinerBlacklist, OrganicJob, ) -from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient -from compute_horde_validator.validator.organic_jobs.miner_driver import ( - drive_organic_job, +from compute_horde_validator.validator.organic_jobs.miner_driver_sync import ( + MinerClient, + SyncOrganicJobDriver, ) @@ -27,7 +27,7 @@ def get_keypair(): return settings.BITTENSOR_WALLET().get_hotkey() -async def notify_job_status_update(msg: JobStatusUpdate): +def notify_job_status_update(msg: JobStatusUpdate): print(f"\njob status: {msg.status}") if msg.metadata: if details := ( @@ -143,24 +143,29 @@ def handle(self, *args, **options): block=block, ) - async def _run_job(): + try: keypair = get_keypair() + ws_url = ( + f"ws://{miner_address}:{miner_port}/v0.1/validator_interface/{keypair.ss58_address}" + ) miner_client = MinerClient( + url=ws_url, miner_hotkey=miner.hotkey, - miner_address=miner_address, - miner_port=miner_port, - job_uuid=str(job.job_uuid), - my_keypair=keypair, + validator_keypair=keypair, ) - await drive_organic_job( + driver = SyncOrganicJobDriver( miner_client, job, job_request, - notify_callback=notify_job_status_update, + miner_hotkey=miner.hotkey, + my_keypair=keypair, + allowed_leeway=get_config("DYNAMIC_ORGANIC_JOB_ALLOWED_LEEWAY_TIME"), + reservation_time_limit=get_config("DYNAMIC_EXECUTOR_RESERVATION_TIME_LIMIT"), + executor_startup_time_limit=get_config("DYNAMIC_EXECUTOR_STARTUP_TIME_LIMIT"), + max_overall_time_limit=get_config("DYNAMIC_MAX_OVERALL_ORGANIC_JOB_TIME_LIMIT"), + status_callback=notify_job_status_update, ) - - try: - async_to_sync(_run_job)() + driver.run() except Exception as e: print(f"Failed to run job {job.job_uuid}: {e}") sys.exit(1) diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_client.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_client.py deleted file mode 100644 index f33083471..000000000 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_client.py +++ /dev/null @@ -1,45 +0,0 @@ -import logging - -from compute_horde.miner_client.organic import OrganicMinerClient -from compute_horde.protocol_messages import GenericError, UnauthorizedError -from django.conf import settings - -from compute_horde_validator.validator.models import SystemEvent - -logger = logging.getLogger(__name__) - - -class MinerClient(OrganicMinerClient): - async def notify_generic_error(self, msg: GenericError) -> None: - desc = f"Received error message from miner {self.miner_name}: {msg.model_dump_json()}" - await SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).acreate( - type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, - subtype=SystemEvent.EventSubType.GENERIC_ERROR, - long_description=desc, - data={}, - ) - - async def notify_unauthorized_error(self, msg: UnauthorizedError) -> None: - desc = f"Unauthorized in {self.miner_name}: {msg.code}, details: {msg.details}" - await SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).acreate( - type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, - subtype=SystemEvent.EventSubType.UNAUTHORIZED, - long_description=desc, - data={}, - ) - - async def notify_receipt_failure(self, comment: str) -> None: - await SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).acreate( - type=SystemEvent.EventType.RECEIPT_FAILURE, - subtype=SystemEvent.EventSubType.RECEIPT_SEND_ERROR, - long_description=comment, - data={"job_uuid": self.job_uuid, "miner_hotkey": self.miner_hotkey}, - ) - - async def notify_send_failure(self, msg: str) -> None: - await SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).acreate( - type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, - subtype=SystemEvent.EventSubType.MINER_SEND_ERROR, - long_description=msg, - data={"job_uuid": self.job_uuid, "miner_hotkey": self.miner_hotkey}, - ) diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py deleted file mode 100644 index 14a7ae007..000000000 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py +++ /dev/null @@ -1,533 +0,0 @@ -import logging -import time -from collections.abc import Awaitable, Callable -from functools import partial - -import sentry_sdk -from asgiref.sync import sync_to_async -from channels.layers import get_channel_layer -from compute_horde.executor_class import EXECUTOR_CLASS -from compute_horde.fv_protocol.facilitator_requests import OrganicJobRequest -from compute_horde.fv_protocol.validator_requests import ( - HordeFailureDetails, - JobFailureDetails, - JobRejectionDetails, - JobResultDetails, - JobStatusMetadata, - JobStatusUpdate, - StreamingServerDetails, -) -from compute_horde.job_errors import HordeError -from compute_horde.miner_client.organic import ( - MinerConnectionFailed, - MinerRejectedJob, - MinerReportedHordeFailed, - MinerReportedJobFailed, - MinerTimedOut, - OrganicJobDetails, - execute_organic_job_on_miner, -) -from compute_horde.protocol_consts import ( - HordeFailureReason, - JobFailureReason, - JobParticipantType, - JobRejectionReason, - JobStatus, -) -from compute_horde.protocol_messages import ( - MinerToValidatorMessage, - V0StreamingJobReadyRequest, -) -from compute_horde.receipts.models import JobStartedReceipt -from compute_horde_core.executor_class import ExecutorClass -from django.conf import settings -from pydantic import JsonValue - -from compute_horde_validator.validator import job_excuses -from compute_horde_validator.validator.allowance.default import allowance -from compute_horde_validator.validator.allowance.types import ValidatorModel -from compute_horde_validator.validator.allowance.utils.supertensor import supertensor -from compute_horde_validator.validator.dynamic_config import aget_config -from compute_horde_validator.validator.models import ( - Miner, - OrganicJob, - SystemEvent, -) -from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient -from compute_horde_validator.validator.routing.default import routing -from compute_horde_validator.validator.routing.types import JobRoute, MinerIncidentType -from compute_horde_validator.validator.utils import TRUSTED_MINER_FAKE_KEY - -logger = logging.getLogger(__name__) - - -def _get_current_block() -> int: - return allowance().get_current_block() - - -@sync_to_async -def _get_active_validators(block: int | None) -> list[ValidatorModel]: - if block is None: - block = supertensor().get_current_block() - return supertensor().list_validators(block) - - -def status_update_from_success(job: OrganicJob) -> JobStatusUpdate: - metadata = JobStatusMetadata( - miner_response=JobResultDetails( - docker_process_stdout=job.stdout, - docker_process_stderr=job.stderr, - artifacts=job.artifacts, - upload_results=job.upload_results, - ), - ) - return JobStatusUpdate( - uuid=str(job.job_uuid), - status=JobStatus.COMPLETED, - metadata=metadata, - ) - - -def status_update_from_miner_rejection( - job: OrganicJob, rejection: MinerRejectedJob, comment: str | None = None -) -> JobStatusUpdate: - comment = comment or rejection.msg.message - metadata = JobStatusMetadata( - job_rejection_details=JobRejectionDetails( - rejected_by=JobParticipantType.MINER, - reason=rejection.msg.reason, - message=comment, - context=rejection.msg.context, - ), - ) - return JobStatusUpdate( - uuid=str(job.job_uuid), - status=JobStatus.REJECTED, - metadata=metadata, - ) - - -def status_update_from_miner_job_failure( - job: OrganicJob, failure: MinerReportedJobFailed -) -> JobStatusUpdate: - metadata = JobStatusMetadata( - job_failure_details=JobFailureDetails( - reason=failure.msg.reason, - stage=failure.msg.stage, - message=failure.msg.message, - context=failure.msg.context, - docker_process_exit_status=failure.msg.docker_process_exit_status, - docker_process_stdout=failure.msg.docker_process_stdout, - docker_process_stderr=failure.msg.docker_process_stderr, - ), - ) - return JobStatusUpdate( - uuid=str(job.job_uuid), - status=JobStatus.FAILED, - metadata=metadata, - ) - - -def status_update_from_miner_horde_failure( - job: OrganicJob, failure: MinerReportedHordeFailed -) -> JobStatusUpdate: - metadata = JobStatusMetadata( - horde_failure_details=HordeFailureDetails( - reported_by=failure.msg.reported_by, - reason=failure.msg.reason, - message=failure.msg.message, - context=failure.msg.context, - ), - ) - return JobStatusUpdate( - uuid=str(job.job_uuid), - status=JobStatus.HORDE_FAILED, - metadata=metadata, - ) - - -def status_update_from_horde_error(job: OrganicJob, error: HordeError) -> JobStatusUpdate: - metadata = JobStatusMetadata( - horde_failure_details=HordeFailureDetails( - reported_by=JobParticipantType.VALIDATOR, - reason=error.reason, - message=error.message, - context=error.context, - ), - ) - return JobStatusUpdate( - uuid=str(job.job_uuid), - status=JobStatus.FAILED, - metadata=metadata, - ) - - -async def save_job_execution_event( - subtype: str, long_description: str, data: JsonValue = None, success: bool = False -) -> None: - await SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).acreate( - type=SystemEvent.EventType.MINER_ORGANIC_JOB_SUCCESS - if success - else SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, - subtype=subtype, - long_description=long_description, - data=data or {}, - ) - - -async def _dummy_notify_callback(_: JobStatusUpdate) -> None: - pass - - -async def execute_organic_job_request( - job_request: OrganicJobRequest, job_route: JobRoute -) -> OrganicJob: - if ( - job_route.miner.hotkey_ss58 == settings.DEBUG_MINER_KEY - and settings.DEBUG_MINER_ADDRESS - and settings.DEBUG_MINER_PORT - ): - miner_ip = settings.DEBUG_MINER_ADDRESS - miner_port = settings.DEBUG_MINER_PORT - ip_type = 4 - on_trusted_miner = False - elif job_route.miner.hotkey_ss58 == TRUSTED_MINER_FAKE_KEY: - miner_ip = settings.TRUSTED_MINER_ADDRESS - miner_port = settings.TRUSTED_MINER_PORT - ip_type = 4 - on_trusted_miner = True - else: - miner_ip = job_route.miner.address - miner_port = job_route.miner.port - ip_type = job_route.miner.ip_version - on_trusted_miner = False - - if settings.DEBUG_USE_MOCK_BLOCK_NUMBER: - block = 5136476 + int((time.time() - 1742076533) / 12) - else: - block = await sync_to_async(_get_current_block, thread_sensitive=False)() - - miner = await Miner.objects.aget(hotkey=job_route.miner.hotkey_ss58) - job = await OrganicJob.objects.acreate( - job_uuid=str(job_request.uuid), - miner=miner, - miner_address=miner_ip, - miner_address_ip_version=ip_type, - miner_port=miner_port, - namespace=job_request.job_namespace or job_request.docker_image or None, - executor_class=job_request.executor_class, - job_description="User job from facilitator", - block=block, - on_trusted_miner=on_trusted_miner, - streaming_details=job_request.streaming_details.model_dump() - if job_request.streaming_details - else None, - allowance_blocks=job_route.allowance_blocks, - allowance_reservation_id=job_route.allowance_reservation_id, - allowance_job_value=job_route.allowance_job_value or 0, - ) - - miner_client = MinerClient( - miner_hotkey=job_route.miner.hotkey_ss58, - miner_address=job.miner_address, - miner_port=job.miner_port, - job_uuid=str(job.job_uuid), - my_keypair=settings.BITTENSOR_WALLET().hotkey, - ) - - async def job_status_callback(status_update: JobStatusUpdate): - await get_channel_layer().send( - f"job_status_updates__{status_update.uuid}", - {"type": "job_status_update", "payload": status_update.model_dump(mode="json")}, - ) - - await drive_organic_job( - miner_client, - job, - job_request, - notify_callback=job_status_callback, - ) - - return job - - -async def drive_organic_job( - miner_client: MinerClient, - job: OrganicJob, - job_request: OrganicJobRequest, - notify_callback: Callable[[JobStatusUpdate], Awaitable[None]] | None = None, -) -> bool: - """ - Execute an organic job on a miner client. - Returns True if the job was successfully executed, False otherwise. - """ - if notify_callback is None: - notify_callback = _dummy_notify_callback - - if job.on_trusted_miner and await aget_config("DYNAMIC_DISABLE_TRUSTED_ORGANIC_JOB_EVENTS"): - # ignore trusted system events - async def save_event(*args, **kwargs): - pass - else: - data: JsonValue = {"job_uuid": str(job.job_uuid), "miner_hotkey": miner_client.my_hotkey} - save_event = partial(save_job_execution_event, data=data) - - def status_callback(status: JobStatus): - async def relay(msg: MinerToValidatorMessage) -> None: - await notify_callback(JobStatusUpdate(uuid=str(job.job_uuid), status=status)) - - return relay - - async def streaming_ready_callback(msg: V0StreamingJobReadyRequest) -> None: - await notify_callback( - JobStatusUpdate( - uuid=str(job.job_uuid), - status=JobStatus.STREAMING_READY, - metadata=JobStatusMetadata( - streaming_details=StreamingServerDetails( - streaming_server_cert=msg.public_key, - streaming_server_address=msg.ip, - streaming_server_port=msg.port, - ), - ), - ) - ) - - miner_client.notify_job_accepted = status_callback(JobStatus.ACCEPTED) # type: ignore[method-assign] - miner_client.notify_executor_ready = status_callback(JobStatus.EXECUTOR_READY) # type: ignore[method-assign] - miner_client.notify_volumes_ready = status_callback(JobStatus.VOLUMES_READY) # type: ignore[method-assign] - miner_client.notify_execution_done = status_callback(JobStatus.EXECUTION_DONE) # type: ignore[method-assign] - miner_client.notify_streaming_readiness = streaming_ready_callback # type: ignore[method-assign] - # TODO: remove method assignment above and properly handle notify_* cases - - executor_class = ExecutorClass(job_request.executor_class) - has_gpu = EXECUTOR_CLASS[executor_class].has_gpu - - job_details = OrganicJobDetails( - job_uuid=str(job.job_uuid), - executor_class=executor_class, - docker_image=job_request.docker_image, - docker_run_options_preset="nvidia_all" if has_gpu else "none", - docker_run_cmd=job_request.get_args(), - env=job_request.env, - total_job_timeout=OrganicJobDetails.total_job_timeout, - volume=job_request.volume, - output=job_request.output_upload, - artifacts_dir=job_request.artifacts_dir, - job_timing=OrganicJobDetails.TimingDetails( - allowed_leeway=await aget_config("DYNAMIC_ORGANIC_JOB_ALLOWED_LEEWAY_TIME"), - download_time_limit=job_request.download_time_limit, - execution_time_limit=job_request.execution_time_limit, - upload_time_limit=job_request.upload_time_limit, - streaming_start_time_limit=job_request.streaming_start_time_limit, - ), - streaming_details=job.streaming_details, - allowance_blocks=job.allowance_blocks or [], - allowance_job_value=job.allowance_job_value, - ) - - try: - stdout, stderr, artifacts, upload_results = await execute_organic_job_on_miner( - miner_client, - job_details, - reservation_time_limit=await aget_config("DYNAMIC_EXECUTOR_RESERVATION_TIME_LIMIT"), - executor_startup_time_limit=await aget_config("DYNAMIC_EXECUTOR_STARTUP_TIME_LIMIT"), - ) - - if job.allowance_reservation_id is not None: - try: - await sync_to_async(allowance().spend_allowance)(job.allowance_reservation_id) - logger.info( - "Successfully spent allowance for reservation %s for job %s", - job.allowance_reservation_id, - job.job_uuid, - ) - except Exception as e: - logger.error( - "Failed to spend allowance for reservation %s for job %s: %s", - job.allowance_reservation_id, - job.job_uuid, - e, - exc_info=True, - ) - - comment = f"Miner {miner_client.miner_name} hotkey={job.miner.hotkey} finished: {stdout=} {stderr=}" - job.stdout = stdout - job.stderr = stderr - job.artifacts = artifacts - job.upload_results = upload_results - job.status = OrganicJob.Status.COMPLETED - job.comment = comment - await job.asave() - logger.info(comment) - await save_event( - subtype=SystemEvent.EventSubType.SUCCESS, long_description=comment, success=True - ) - await notify_callback(status_update_from_success(job)) - return True - - except MinerRejectedJob as rejection: - # The only valid reason for rejection is being busy and providing the receipts to prove it - if rejection.msg.reason != JobRejectionReason.BUSY: - comment = rejection.msg.message - status = ( - OrganicJob.Status.FAILED - ) # As far as the validator is concerned, the job is as good as failed - system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED - else: # rejection.msg.reason == JobRejectionReason.BUSY - job_started_receipt = await JobStartedReceipt.objects.aget(job_uuid=str(job.job_uuid)) - job_request_time = job_started_receipt.timestamp - active_validators = await _get_active_validators(job.block) - valid_excuses = job_excuses.filter_valid_excuse_receipts( - receipts_to_check=rejection.msg.receipts or [], - check_time=job_request_time, - declined_job_uuid=str(job.job_uuid), - declined_job_executor_class=ExecutorClass(job.executor_class), - declined_job_is_synthetic=False, - minimum_validator_stake_for_excuse=await aget_config( - "DYNAMIC_MINIMUM_VALIDATOR_STAKE_FOR_EXCUSE" - ), - miner_hotkey=job.miner.hotkey, - active_validators=active_validators, - ) - expected_executor_count = await job_excuses.get_expected_miner_executor_count( - check_time=job_request_time, - miner_hotkey=job.miner.hotkey, - executor_class=ExecutorClass(job.executor_class), - ) - if len(valid_excuses) >= expected_executor_count: - comment = "Miner properly excused job" - status = OrganicJob.Status.EXCUSED - system_event_subtype = SystemEvent.EventSubType.JOB_EXCUSED - else: - comment = "Miner failed to excuse job" - status = OrganicJob.Status.FAILED - system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED - - logger.info(comment) - job.comment = comment - job.status = status - await job.asave() - if status != OrganicJob.Status.EXCUSED: - await sync_to_async(routing().report_miner_incident)( - MinerIncidentType.MINER_JOB_REJECTED, - hotkey_ss58address=job.miner.hotkey, - job_uuid=str(job.job_uuid), - executor_class=ExecutorClass(job.executor_class), - ) - await save_event(subtype=system_event_subtype, long_description=comment) - status_update = status_update_from_miner_rejection(job, rejection, comment) - await notify_callback(status_update) - - except MinerReportedJobFailed as failure: - job.status = OrganicJob.Status.FAILED - job.comment = failure.msg.message - await job.asave() - await sync_to_async(routing().report_miner_incident)( - MinerIncidentType.MINER_JOB_FAILED, - hotkey_ss58address=job.miner.hotkey, - job_uuid=str(job.job_uuid), - executor_class=ExecutorClass(job.executor_class), - ) - await save_event( - subtype=_job_event_subtype_map.get( - failure.msg.reason, SystemEvent.EventSubType.GENERIC_JOB_FAILURE - ), - long_description=failure.msg.message, - ) - status_update = status_update_from_miner_job_failure(job, failure) - await notify_callback(status_update) - - except MinerReportedHordeFailed as failure: - job.status = OrganicJob.Status.FAILED - job.comment = failure.msg.message - await job.asave() - await sync_to_async(routing().report_miner_incident)( - MinerIncidentType.MINER_HORDE_FAILED, - hotkey_ss58address=job.miner.hotkey, - job_uuid=str(job.job_uuid), - executor_class=ExecutorClass(job.executor_class), - ) - await save_event( - subtype=_horde_event_subtype_map.get( - failure.msg.reason, SystemEvent.EventSubType.GENERIC_ERROR - ), - long_description=failure.msg.message, - ) - status_update = status_update_from_miner_horde_failure(job, failure) - await notify_callback(status_update) - - except (MinerConnectionFailed, MinerTimedOut) as e: - comment = str(e) - logger.warning(comment) - job.status = OrganicJob.Status.FAILED - job.comment = comment - await job.asave() - event_subtype = _horde_event_subtype_map.get( - e.reason, SystemEvent.EventSubType.GENERIC_ERROR - ) - await save_event(subtype=event_subtype, long_description=comment) - status_update = status_update_from_horde_error(job, e) - await notify_callback(status_update) - - except Exception as e: - sentry_sdk.capture_exception(e) - e = HordeError.wrap_unhandled(e) - comment = str(e) - logger.warning(comment) - job.status = OrganicJob.Status.FAILED - job.comment = comment - await job.asave() - - event_subtype = _horde_event_subtype_map.get( - e.reason, SystemEvent.EventSubType.GENERIC_ERROR - ) - await save_event(subtype=event_subtype, long_description=comment) - - status_update = status_update_from_horde_error(job, e) - await notify_callback(status_update) - - # Undo allowance reservation for any job failure - if job.allowance_reservation_id is not None: - try: - await sync_to_async(allowance().undo_allowance_reservation)( - job.allowance_reservation_id - ) - logger.info( - "Successfully undid allowance reservation %s for failed job %s", - job.allowance_reservation_id, - job.job_uuid, - ) - except Exception as e: - logger.error( - "Failed to undo allowance reservation %s for failed job %s: %s", - job.allowance_reservation_id, - job.job_uuid, - e, - exc_info=True, - ) - - return False - - -_job_event_subtype_map: dict[JobFailureReason, str] = { - JobFailureReason.UNKNOWN: SystemEvent.EventSubType.GENERIC_ERROR, - JobFailureReason.TIMEOUT: SystemEvent.EventSubType.JOB_TIMEOUT, - JobFailureReason.NONZERO_RETURN_CODE: SystemEvent.EventSubType.JOB_PROCESS_NONZERO_EXIT_CODE, - JobFailureReason.DOWNLOAD_FAILED: SystemEvent.EventSubType.JOB_VOLUME_DOWNLOAD_FAILED, - JobFailureReason.UPLOAD_FAILED: SystemEvent.EventSubType.JOB_RESULT_UPLOAD_FAILED, -} - -_horde_event_subtype_map: dict[HordeFailureReason, str] = { - HordeFailureReason.MINER_CONNECTION_FAILED: SystemEvent.EventSubType.MINER_CONNECTION_ERROR, - HordeFailureReason.INITIAL_RESPONSE_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, - HordeFailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, - HordeFailureReason.STREAMING_JOB_READY_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, - HordeFailureReason.VOLUMES_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, - HordeFailureReason.EXECUTION_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, - HordeFailureReason.FINAL_RESPONSE_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, - HordeFailureReason.SECURITY_CHECK_FAILED: SystemEvent.EventSubType.ERROR_FAILED_SECURITY_CHECK, - HordeFailureReason.STREAMING_FAILED: SystemEvent.EventSubType.GENERIC_ERROR, - HordeFailureReason.UNHANDLED_EXCEPTION: SystemEvent.EventSubType.GENERIC_ERROR, - HordeFailureReason.UNKNOWN: SystemEvent.EventSubType.GENERIC_ERROR, -} diff --git a/validator/app/src/compute_horde_validator/validator/routing/tests/test_job_routing_incident.py b/validator/app/src/compute_horde_validator/validator/routing/tests/test_job_routing_incident.py index daf6acf13..94ce6999a 100644 --- a/validator/app/src/compute_horde_validator/validator/routing/tests/test_job_routing_incident.py +++ b/validator/app/src/compute_horde_validator/validator/routing/tests/test_job_routing_incident.py @@ -10,7 +10,6 @@ from asgiref.sync import async_to_sync from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS from compute_horde.fv_protocol.facilitator_requests import V2JobRequest -from compute_horde.miner_client.organic import OrganicMinerClient from compute_horde.protocol_messages import V0DeclineJobRequest from compute_horde.transport import AbstractTransport from compute_horde_core.executor_class import ExecutorClass as CoreExecutorClass @@ -33,7 +32,7 @@ BlockAllowance as _DbgBlockAllowance, ) from compute_horde_validator.validator.organic_jobs.facilitator_client import FacilitatorClient -from compute_horde_validator.validator.organic_jobs.miner_driver import drive_organic_job +from compute_horde_validator.validator.organic_jobs.miner_driver_sync import SyncOrganicJobDriver from compute_horde_validator.validator.routing.default import routing from compute_horde_validator.validator.routing.types import JobRoute from compute_horde_validator.validator.tests.transport import SimulationTransport @@ -64,6 +63,25 @@ class MinerScenario: pytestmark = pytest.mark.override_config(DYNAMIC_MINIMUM_COLLATERAL_AMOUNT_WEI=0) +class DeclineMinerClient: + ws = None + + def __init__(self, job_uuid: str): + self._job_uuid = job_uuid + + def connect(self) -> None: + pass + + def send(self, msg) -> None: + pass + + def recv(self, timeout: float): + return V0DeclineJobRequest(job_uuid=self._job_uuid) + + def close(self) -> None: + pass + + def reliability_env( *, monkeypatch, @@ -96,25 +114,25 @@ def _mk_neuron(uid: int, hotkey: str, stake: float, port: int): # routing builds miners from manifests/supertensor neuron list, where we set port=8000+idx port_by_hotkey = {m.hotkey: 8000 + idx for idx, m in enumerate(miners, start=1)} - async def _report_incidents(miner_hotkey: str, incidents: int, executor_class): + def _report_incidents(miner_hotkey: str, incidents: int, executor_class): """Simulate miner incidents by running organic job flows that are declined. - Uses SimulationTransport to feed a V0DeclineJobRequest during the reservation stage - which triggers the incident reporting path in miner_driver (MinerRejectedJob). + Uses DeclineMinerClient to feed a V0DeclineJobRequest during the reservation stage + which triggers the incident reporting path in miner_driver. """ - # Ensure Miner model exists (minimal fields for OrganicJob FK) expected_port = port_by_hotkey[miner_hotkey] - miner_model, created = await Miner.objects.aget_or_create( + miner_model, created = Miner.objects.get_or_create( hotkey=miner_hotkey, defaults={"address": "127.0.0.1", "port": expected_port, "ip_version": 4}, ) # If it already existed (shouldn't normally) but with a mismatching port, align it if not created and miner_model.port != expected_port: miner_model.port = expected_port - await miner_model.asave(update_fields=["port"]) - for i in range(incidents): + miner_model.save(update_fields=["port"]) + + for _ in range(incidents): job_uuid = str(uuid.uuid4()) - job = await OrganicJob.objects.acreate( + job = OrganicJob.objects.create( job_uuid=job_uuid, miner=miner_model, miner_address=miner_model.address or "127.0.0.1", @@ -125,14 +143,6 @@ async def _report_incidents(miner_hotkey: str, incidents: int, executor_class): block=base_block, ) - transport = SimulationTransport(f"decline_sim_{miner_hotkey}_{i}") - # Miner will decline right after the initial job request is sent by the validator - await transport.add_message( - V0DeclineJobRequest(job_uuid=job_uuid), - send_before=1, - ) - - # Build a V2JobRequest mirroring JOB_REQUEST but with unique uuid simulated_request = V2JobRequest( uuid=job_uuid, executor_class=JOB_REQUEST.executor_class, @@ -145,22 +155,19 @@ async def _report_incidents(miner_hotkey: str, incidents: int, executor_class): upload_time_limit=JOB_REQUEST.upload_time_limit, ) - miner_client = OrganicMinerClient( + client = DeclineMinerClient(job_uuid) + driver = SyncOrganicJobDriver( + client, + job, + simulated_request, miner_hotkey=miner_hotkey, - miner_address=miner_model.address or "127.0.0.1", - miner_port=miner_model.port or 9000, - job_uuid=job_uuid, my_keypair=settings.BITTENSOR_WALLET().hotkey, - transport=transport, - ) - - # Run the job driver; this will catch MinerRejectedJob and record an incident - # Use default internal async dummy notify callback (omit custom one) - await drive_organic_job( - miner_client=miner_client, - job=job, - job_request=simulated_request, + allowed_leeway=10, + reservation_time_limit=10, + executor_startup_time_limit=10, + max_overall_time_limit=60, ) + driver.run() target_executor_class = CoreExecutorClass(JOB_REQUEST.executor_class) @@ -234,7 +241,7 @@ def _advance_current_block(_st, bn): for m in miners: if m.incidents: - async_to_sync(_report_incidents)(m.hotkey, m.incidents, target_executor_class) + _report_incidents(m.hotkey, m.incidents, target_executor_class) def _dbg_read(): return list( @@ -313,22 +320,6 @@ def test_excused_job_no_incident(monkeypatch): job_uuid = str(uuid.uuid4()) - # Monkeypatch excuse helpers to simulate a valid excuse: 1 expected executor, 1 valid receipt - async def fake_filter_valid_excuse_receipts(**_kwargs): - return [object()] - - async def fake_get_expected_miner_executor_count(**_kwargs): - return 1 - - monkeypatch.setattr( - "compute_horde_validator.validator.organic_jobs.miner_driver.job_excuses.filter_valid_excuse_receipts", - fake_filter_valid_excuse_receipts, - ) - monkeypatch.setattr( - "compute_horde_validator.validator.organic_jobs.miner_driver.job_excuses.get_expected_miner_executor_count", - fake_get_expected_miner_executor_count, - ) - # Facilitator transport (auth success + job request) faci_transport = SimulationTransport("facilitator_excused_case") async_to_sync(faci_transport.add_message)( diff --git a/validator/app/src/compute_horde_validator/validator/tasks.py b/validator/app/src/compute_horde_validator/validator/tasks.py index 997376ab5..a63b39f0c 100644 --- a/validator/app/src/compute_horde_validator/validator/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tasks.py @@ -29,9 +29,6 @@ OrganicJob, SystemEvent, ) -from compute_horde_validator.validator.organic_jobs.miner_driver import ( - execute_organic_job_request, -) from compute_horde_validator.validator.routing.types import JobRoute from . import ( @@ -277,11 +274,7 @@ async def execute_organic_job_request_on_worker( def _execute_organic_job_on_worker(job_request: JsonValue, job_route: JsonValue) -> None: request: OrganicJobRequest = TypeAdapter(OrganicJobRequest).validate_python(job_request) route: JobRoute = TypeAdapter(JobRoute).validate_python(job_route) - - if config.DYNAMIC_SYNC_ORGANIC_JOBS: - execute_organic_job_request_sync(request, route) - else: - async_to_sync(execute_organic_job_request)(request, route) + execute_organic_job_request_sync(request, route) @app.task(bind=True, max_retries=SLASH_COLLATERAL_TASK_MAX_RETRIES) diff --git a/validator/app/src/compute_horde_validator/validator/tests/conftest.py b/validator/app/src/compute_horde_validator/validator/tests/conftest.py index d62fc739e..d21783b85 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/conftest.py +++ b/validator/app/src/compute_horde_validator/validator/tests/conftest.py @@ -1,3 +1,4 @@ +import asyncio import logging import uuid from collections.abc import Generator @@ -9,7 +10,7 @@ from compute_horde_core.executor_class import ExecutorClass from pylon_client.v1 import PylonClient -from ..organic_jobs.miner_driver import execute_organic_job_request +from ..organic_jobs.miner_driver_sync import execute_organic_job_request_sync from .helpers import MockNeuron logger = logging.getLogger(__name__) @@ -25,7 +26,7 @@ def some() -> Generator[int, None, None]: @pytest.fixture(autouse=True) def _patch_current_block(): with patch( - "compute_horde_validator.validator.organic_jobs.miner_driver._get_current_block", + "compute_horde_validator.validator.organic_jobs.miner_driver_sync._get_current_block_sync", return_value=1337, ): yield @@ -33,9 +34,21 @@ def _patch_current_block(): @pytest.fixture(autouse=True) def _patch_celery_job_execution(): + async def _patched(job_request, job_route): + # RedisPubSubChannelLayer is fire-and-forget: messages published before a subscriber + # is ready are silently dropped. handle_job_status_updates must SUBSCRIBE before the + # sync driver fires its first channel send, so we yield here to let it establish the + # subscription first. + await asyncio.sleep(0.1) + # asyncio.to_thread (not asgiref.sync_to_async) is intentional: the sync driver calls + # async_to_sync(channel_layer.send) internally. With sync_to_async(thread_sensitive=True) + # the function runs in the event loop's own thread, blocking all coroutines, and that + # inner async_to_sync deadlocks trying to schedule back onto the blocked loop. + return await asyncio.to_thread(execute_organic_job_request_sync, job_request, job_route) + with patch( "compute_horde_validator.validator.organic_jobs.facilitator_client.execute_organic_job_request_on_worker", - execute_organic_job_request, + _patched, ): yield diff --git a/validator/app/src/compute_horde_validator/validator/tests/helpers.py b/validator/app/src/compute_horde_validator/validator/tests/helpers.py index babc883ae..e8aa91aef 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/helpers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/helpers.py @@ -1,9 +1,12 @@ +import asyncio import glob import logging import numbers import os import shlex import subprocess +import threading +from collections import deque from datetime import timedelta from pathlib import Path from time import monotonic @@ -17,21 +20,22 @@ from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS from compute_horde.fv_protocol.facilitator_requests import V0JobCheated, V2JobRequest from compute_horde.protocol_messages import ( + MinerToValidatorMessage, V0AcceptJobRequest, V0ExecutionDoneRequest, V0ExecutorReadyRequest, + V0InitialJobRequest, V0JobFailedRequest, V0JobFinishedRequest, V0VolumesReadyRequest, - ValidatorToMinerMessage, ) -from compute_horde.utils import ValidatorInfo from compute_horde_core.signature import Signature from django.conf import settings -from pydantic import TypeAdapter from compute_horde_validator.validator.models import SystemEvent -from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient +from compute_horde_validator.validator.organic_jobs.miner_driver_sync import ( + MinerClient as SyncMinerClient, +) NUM_NEURONS = 5 @@ -47,99 +51,116 @@ def get_keypair(): return settings.BITTENSOR_WALLET().get_hotkey() -def get_miner_client(MINER_CLIENT, job_uuid: str) -> MinerClient: - return MINER_CLIENT( - miner_hotkey="miner_hotkey", - miner_address="ignore", - miner_port=9999, - job_uuid=job_uuid, - my_keypair=get_keypair(), - ) - +class SyncMockMinerClient(SyncMinerClient): + ws = None -class MockAxonInfo: - def __init__(self, ip="0.0.0.0", port=8000, ip_type=0, hotkey="hotkey"): - self.ip = ip - self.port = port - self.ip_type = ip_type - hotkey = (hotkey,) - - def is_serving(self): - return self.ip == "0.0.0.0" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._responses: list = [] - async def connect(self) -> None: + def connect(self) -> None: pass - async def send(self, data: str | bytes, error_event_callback=None): - msg = TypeAdapter(ValidatorToMinerMessage).validate_json(data) - self._sent_models.append(msg) + def recv(self, timeout: float): + return self._responses.pop(0) - def _query_sent_models(self, condition=None, model_class=None): - result = [] - for model in self._sent_models: - if model_class is not None and not isinstance(model, model_class): - continue - if not condition(model): - continue - result.append(model) - return result + def close(self) -> None: + pass -class MockMinerClient(MinerClient): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._sent_models = [] +class SyncMockSuccessfulMinerClient(SyncMockMinerClient): + def send(self, msg) -> None: + if isinstance(msg, V0InitialJobRequest): + job_uuid = msg.job_uuid + self._responses = [ + V0AcceptJobRequest(job_uuid=job_uuid), + V0ExecutorReadyRequest(job_uuid=job_uuid), + V0VolumesReadyRequest(job_uuid=job_uuid), + V0ExecutionDoneRequest(job_uuid=job_uuid), + V0JobFinishedRequest( + job_uuid=job_uuid, + docker_process_stdout="", + docker_process_stderr="", + artifacts={}, + ), + ] - def miner_url(self) -> str: - return "ws://miner" - async def connect(self): - return +class SyncMockFaillingMinerClient(SyncMockMinerClient): + def send(self, msg) -> None: + if isinstance(msg, V0InitialJobRequest): + job_uuid = msg.job_uuid + self._responses = [ + V0AcceptJobRequest(job_uuid=job_uuid), + V0ExecutorReadyRequest(job_uuid=job_uuid), + V0VolumesReadyRequest(job_uuid=job_uuid), + V0ExecutionDoneRequest(job_uuid=job_uuid), + V0JobFailedRequest( + job_uuid=job_uuid, + docker_process_stdout="", + docker_process_stderr="", + docker_process_exit_status=1, + ), + ] - async def send_model(self, model, error_event_callback=None): - self._sent_models.append(model) - - def _query_sent_models(self, condition=None, model_class=None): - result = [] - for model in self._sent_models: - if model_class is not None and not isinstance(model, model_class): - continue - if not condition(model): - continue - result.append(model) - return result +class SyncSimulationMinerClient(SyncMockMinerClient): + """ + Sync counterpart to SimulationTransport for use in test_happy_path.py. + Responses are queued via add_message() with send_before semantics identical + to SimulationTransport: a response is only delivered once at least + send_before messages have been sent by the driver to the miner. + """ -class MockSuccessfulMinerClient(MockMinerClient): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.job_accepted_future.set_result(V0AcceptJobRequest(job_uuid=self.job_uuid)) - self.executor_ready_future.set_result(V0ExecutorReadyRequest(job_uuid=self.job_uuid)) - self.volumes_ready_future.set_result(V0VolumesReadyRequest(job_uuid=self.job_uuid)) - self.execution_done_future.set_result(V0ExecutionDoneRequest(job_uuid=self.job_uuid)) - self.job_finished_future.set_result( - V0JobFinishedRequest( - job_uuid=self.job_uuid, - docker_process_stdout="", - docker_process_stderr="", - artifacts={}, - ) - ) + def __init__(self, name: str, loop: asyncio.AbstractEventLoop) -> None: + # Bypass SyncMinerClient.__init__ — we don't have real connection params. + self._name = name + self._loop = loop + self._responses: list = [] + self._threading_condition = threading.Condition() + self.sent: list[str] = [] + self._to_receive: deque[tuple[int, MinerToValidatorMessage]] = deque() + self._receive_at_counter = 0 + self.receive_condition = asyncio.Condition() + + def connect(self) -> None: + pass + def close(self) -> None: + pass -class MockFaillingMinerClient(MockMinerClient): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.job_accepted_future.set_result(V0AcceptJobRequest(job_uuid=self.job_uuid)) - self.executor_ready_future.set_result(V0ExecutorReadyRequest(job_uuid=self.job_uuid)) - self.job_finished_future.set_result( - V0JobFailedRequest( - job_uuid=self.job_uuid, - docker_process_stdout="", - docker_process_stderr="", - docker_process_exit_status=1, - ) - ) + def send(self, msg) -> None: + with self._threading_condition: + self.sent.append(msg.model_dump_json()) + self._threading_condition.notify_all() + asyncio.run_coroutine_threadsafe(self._notify_async(), self._loop) + + async def _notify_async(self) -> None: + async with self.receive_condition: + self.receive_condition.notify_all() + + def recv(self, timeout: float) -> MinerToValidatorMessage: + if not self._to_receive: + raise TimeoutError(f"[{self._name}] No messages queued") + receive_at, message = self._to_receive.popleft() + with self._threading_condition: + if not self._threading_condition.wait_for( + lambda: len(self.sent) >= receive_at, timeout=timeout + ): + raise TimeoutError( + f"[{self._name}] Timed out waiting for {receive_at} sends, got {len(self.sent)}" + ) + return message + + async def add_message( + self, + message: str | MinerToValidatorMessage, + send_before: int = 0, + ) -> None: + if isinstance(message, str): + message = MinerToValidatorMessage.model_validate_json(message) + self._receive_at_counter += send_before + self._to_receive.append((self._receive_at_counter, message)) def get_dummy_signature() -> Signature: @@ -316,13 +337,6 @@ def __init__(self, hotkey, uid, axon_info=None): self.axon_info = axon_info -def neurons_to_validator_infos(neurons: list[MockNeuron]) -> list[ValidatorInfo]: - return [ - ValidatorInfo(uid=neuron.uid, hotkey=neuron.hotkey, stake=neuron.stake.tao) - for neuron in neurons - ] - - class MockBlock: def __init__(self, value=1000): self.value = value diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/conftest.py b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/conftest.py index 36fbc7099..ed224c6b2 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/conftest.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/conftest.py @@ -8,12 +8,12 @@ import pytest_asyncio from compute_horde.executor_class import EXECUTOR_CLASS from compute_horde.fv_protocol.facilitator_requests import V2JobRequest -from compute_horde.miner_client.organic import OrganicMinerClient from compute_horde.transport import AbstractTransport from compute_horde_core.executor_class import ExecutorClass from compute_horde_validator.validator.models import Miner from compute_horde_validator.validator.organic_jobs.facilitator_client import FacilitatorClient +from compute_horde_validator.validator.tests.helpers import SyncSimulationMinerClient from compute_horde_validator.validator.tests.transport import SimulationTransport @@ -66,29 +66,25 @@ async def faci_transport(): @pytest_asyncio.fixture async def miner_transports(): """ - In case of multiple job attempts within a single test, the transports will be used sequentially. + In case of multiple job attempts within a single test, the clients will be used sequentially. This does not mean each one will use a different miner - the miner used depends on actual job routing. """ - - transports = [ - SimulationTransport("miner_connection_1"), - SimulationTransport("miner_connection_2"), - SimulationTransport("miner_connection_3"), + loop = asyncio.get_running_loop() + clients = [ + SyncSimulationMinerClient("miner_connection_1", loop), + SyncSimulationMinerClient("miner_connection_2", loop), + SyncSimulationMinerClient("miner_connection_3", loop), ] - - transports_iter = iter(transports) + clients_iter = iter(clients) def fake_miner_client_factory(*args, **kwargs): - """ - Creates a real organic miner client, but replaces the WS transport with a pre-programmed sequence. - """ - return OrganicMinerClient(*args, **kwargs, transport=next(transports_iter)) + return next(clients_iter) with patch( - "compute_horde_validator.validator.organic_jobs.miner_driver.MinerClient", + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.MinerClient", fake_miner_client_factory, ): - yield transports + yield clients @pytest_asyncio.fixture diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_facilitator_client.py b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_facilitator_client.py index 9a295db12..f392b7c57 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_facilitator_client.py @@ -37,8 +37,8 @@ from compute_horde_validator.validator.utils import MACHINE_SPEC_CHANNEL, TRUSTED_MINER_FAKE_KEY from ..helpers import ( - MockFaillingMinerClient, - MockSuccessfulMinerClient, + SyncMockFaillingMinerClient, + SyncMockSuccessfulMinerClient, get_dummy_job_cheated_request_v0, get_dummy_job_request_v2, get_keypair, @@ -273,8 +273,8 @@ async def serve(self, ws): ], ) @patch( - "compute_horde_validator.validator.organic_jobs.miner_driver.MinerClient", - MockSuccessfulMinerClient, + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.MinerClient", + SyncMockSuccessfulMinerClient, ) async def test_facilitator_client__job_completed(ws_server_cls): await setup_db() @@ -363,8 +363,8 @@ async def test_facilitator_client__cheated_job(): @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) @pytest.mark.skip(reason="Validator-side job retry is disabled for now") @patch( - "compute_horde_validator.validator.organic_jobs.miner_driver.MinerClient", - MockFaillingMinerClient, + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.MinerClient", + SyncMockFaillingMinerClient, ) async def test_facilitator_client__failed_job_retries(): await setup_db() @@ -423,8 +423,8 @@ async def serve(self, ws): @pytest.mark.asyncio @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) @patch( - "compute_horde_validator.validator.organic_jobs.miner_driver.MinerClient", - MockSuccessfulMinerClient, + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.MinerClient", + SyncMockSuccessfulMinerClient, ) async def test_wait_for_specs(specs_msg: dict): layer = get_channel_layer() @@ -447,8 +447,8 @@ async def test_wait_for_specs(specs_msg: dict): @pytest.mark.asyncio @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) @patch( - "compute_horde_validator.validator.organic_jobs.miner_driver.MinerClient", - MockSuccessfulMinerClient, + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.MinerClient", + SyncMockSuccessfulMinerClient, ) async def test_routing_to_trusted_miner(): await setup_db() diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_happy_path.py b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_happy_path.py index 25dcc72ec..3ff9f799e 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_happy_path.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_happy_path.py @@ -116,7 +116,7 @@ async def test_two_jobs( # Job 1 await faci_transport.add_message(job_request, send_before=0) - await miner_transport.add_message(V0AcceptJobRequest(job_uuid=job_request.uuid), send_before=2) + await miner_transport.add_message(V0AcceptJobRequest(job_uuid=job_request.uuid), send_before=1) await miner_transport.add_message( V0ExecutorReadyRequest(job_uuid=job_request.uuid), send_before=1 ) @@ -139,7 +139,7 @@ async def test_two_jobs( await faci_transport.add_message(another_job_request, send_before=2) await miner_transport_2.add_message( - V0AcceptJobRequest(job_uuid=another_job_request.uuid), send_before=2 + V0AcceptJobRequest(job_uuid=another_job_request.uuid), send_before=1 ) await miner_transport_2.add_message( V0ExecutorReadyRequest(job_uuid=another_job_request.uuid), send_before=1 @@ -157,16 +157,33 @@ async def test_two_jobs( send_before=0, ) - # Expected messages: auth, job1 status=accepted, job1 status=finished, job2 status=accepted await execute_scenario(until=lambda: len(faci_transport.sent) >= 13, timeout_seconds=3) assert len(faci_transport.sent) >= 13 - j1_accepted_msg = JobStatusUpdate.model_validate_json(faci_transport.sent[2]) - j1_finished_msg = JobStatusUpdate.model_validate_json(faci_transport.sent[6]) - j2_accepted_msg = JobStatusUpdate.model_validate_json(faci_transport.sent[8]) - j2_finished_msg = JobStatusUpdate.model_validate_json(faci_transport.sent[12]) - - assert j1_accepted_msg.status == "accepted" - assert j1_finished_msg.status == "completed" - assert j2_accepted_msg.status == "accepted" - assert j2_finished_msg.status == "completed" + j1_updates = [ + JobStatusUpdate.model_validate_json(msg).status + for msg in faci_transport.sent + if job_request.uuid in msg + ] + j2_updates = [ + JobStatusUpdate.model_validate_json(msg).status + for msg in faci_transport.sent + if another_job_request.uuid in msg + ] + + assert j1_updates == [ + "received", + "accepted", + "executor_ready", + "volumes_ready", + "execution_done", + "completed", + ] + assert j2_updates == [ + "received", + "accepted", + "executor_ready", + "volumes_ready", + "execution_done", + "completed", + ] diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_jobs_namespace.py b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_jobs_namespace.py index 429942de4..b87413fe6 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_jobs_namespace.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_jobs_namespace.py @@ -1,25 +1,26 @@ import uuid -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from compute_horde_validator.validator.allowance.types import Miner as AllowanceMiner from compute_horde_validator.validator.models import Miner -from compute_horde_validator.validator.organic_jobs.miner_driver import execute_organic_job_request +from compute_horde_validator.validator.organic_jobs.miner_driver_sync import ( + execute_organic_job_request_sync, +) from compute_horde_validator.validator.routing.types import JobRoute from compute_horde_validator.validator.tests.helpers import get_dummy_job_request_v2 -@pytest.mark.asyncio @pytest.mark.django_db @pytest.mark.parametrize( "job_namespace,namespace_value", [("SN123.1.0", "SN123.1.0"), ("", "docker_image")] ) -async def test_organic_job_namespace_priority(job_namespace, namespace_value): +def test_organic_job_namespace_priority(job_namespace, namespace_value): """ Test OrganicJob uses namespace with fallback to docker_image. """ - miner_model = await Miner.objects.acreate( + miner_model = Miner.objects.create( hotkey=f"test-miner-{str(uuid.uuid4())[:8]}", address="127.0.0.1", port=8000, @@ -43,13 +44,13 @@ async def test_organic_job_namespace_priority(job_namespace, namespace_value): job_request.docker_image = "docker_image" with patch( - "compute_horde_validator.validator.organic_jobs.miner_driver._get_current_block" + "compute_horde_validator.validator.organic_jobs.miner_driver_sync._get_current_block_sync" ) as mock_block: mock_block.return_value = 1000 with patch( - "compute_horde_validator.validator.organic_jobs.miner_driver.drive_organic_job" - ) as mock_drive: - mock_drive.return_value = True + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.SyncOrganicJobDriver" + ) as mock_driver_cls: + mock_driver_cls.return_value = MagicMock() - job = await execute_organic_job_request(job_request, job_route) + job = execute_organic_job_request_sync(job_request, job_route) assert job.namespace == namespace_value diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver.py b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver.py deleted file mode 100644 index 133d1705c..000000000 --- a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver.py +++ /dev/null @@ -1,219 +0,0 @@ -import uuid -from functools import partial - -import pytest -from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS -from compute_horde.fv_protocol.validator_requests import JobStatusUpdate -from compute_horde.protocol_consts import JobStatus -from compute_horde.protocol_messages import ( - V0AcceptJobRequest, - V0DeclineJobRequest, - V0ExecutionDoneRequest, - V0ExecutorFailedRequest, - V0ExecutorReadyRequest, - V0JobAcceptedReceiptRequest, - V0JobFailedRequest, - V0JobFinishedReceiptRequest, - V0JobFinishedRequest, - V0VolumesReadyRequest, -) - -from compute_horde_validator.validator.models import Miner -from compute_horde_validator.validator.organic_jobs.facilitator_client import OrganicJob -from compute_horde_validator.validator.organic_jobs.miner_driver import drive_organic_job - -from ..helpers import ( - MockMinerClient, - get_dummy_job_request_v2, - get_miner_client, -) - -WEBSOCKET_TIMEOUT = 10 - - -@pytest.mark.asyncio -@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) -@pytest.mark.parametrize( - ( - "messages_from_miner", - "expected_job_status_updates", - "organic_job_status", - "dummy_job_factory", - "expected_job_accepted_receipt", - "expected_job_finished_receipt", - ), - [ - ( - [], - [JobStatus.FAILED], - OrganicJob.Status.FAILED, - get_dummy_job_request_v2, - False, - False, - ), - ( - [ - V0DeclineJobRequest, - ], - [JobStatus.REJECTED], - OrganicJob.Status.FAILED, - get_dummy_job_request_v2, - False, - False, - ), - ( - [ - V0AcceptJobRequest, - V0ExecutorFailedRequest, - ], - [JobStatus.ACCEPTED, JobStatus.HORDE_FAILED], - OrganicJob.Status.FAILED, - get_dummy_job_request_v2, - True, - False, - ), - ( - [ - V0AcceptJobRequest, - V0ExecutorReadyRequest, - ], - [JobStatus.ACCEPTED, JobStatus.EXECUTOR_READY, JobStatus.FAILED], - OrganicJob.Status.FAILED, - get_dummy_job_request_v2, - True, - False, - ), - ( - [ - V0AcceptJobRequest, - V0ExecutorReadyRequest, - V0VolumesReadyRequest, - V0ExecutionDoneRequest, - V0JobFailedRequest, - ], - [ - JobStatus.ACCEPTED, - JobStatus.EXECUTOR_READY, - JobStatus.VOLUMES_READY, - JobStatus.EXECUTION_DONE, - JobStatus.FAILED, - ], - OrganicJob.Status.FAILED, - get_dummy_job_request_v2, - True, - False, - ), - ( - [ - V0AcceptJobRequest, - V0ExecutorReadyRequest, - V0VolumesReadyRequest, - V0ExecutionDoneRequest, - partial( - V0JobFinishedRequest, - docker_process_stdout="mocked stdout", - docker_process_stderr="mocked stderr", - artifacts={}, - ), - ], - [ - JobStatus.ACCEPTED, - JobStatus.EXECUTOR_READY, - JobStatus.VOLUMES_READY, - JobStatus.EXECUTION_DONE, - JobStatus.COMPLETED, - ], - OrganicJob.Status.COMPLETED, - get_dummy_job_request_v2, - True, - True, - ), - ( - [ - V0AcceptJobRequest, - V0ExecutorReadyRequest, - V0VolumesReadyRequest, - V0ExecutionDoneRequest, - partial( - V0JobFinishedRequest, - docker_process_stdout="mocked stdout", - docker_process_stderr="mocked stderr", - artifacts={}, - ), - ], - [ - JobStatus.ACCEPTED, - JobStatus.EXECUTOR_READY, - JobStatus.VOLUMES_READY, - JobStatus.EXECUTION_DONE, - JobStatus.COMPLETED, - ], - OrganicJob.Status.COMPLETED, - get_dummy_job_request_v2, - True, - True, - ), - ], -) -async def test_miner_driver( - messages_from_miner, - expected_job_status_updates, - organic_job_status, - dummy_job_factory, - expected_job_accepted_receipt, - expected_job_finished_receipt, - settings, -): - miner, _ = await Miner.objects.aget_or_create(hotkey="miner_client") - validator, _ = await Miner.objects.aget_or_create( - hotkey=settings.BITTENSOR_WALLET().hotkey.ss58_address - ) - job_uuid = str(uuid.uuid4()) - job_request = dummy_job_factory(job_uuid) - job = await OrganicJob.objects.acreate( - job_uuid=job_uuid, - miner=miner, - miner_address="irrelevant", - miner_address_ip_version=4, - miner_port=9999, - executor_class=DEFAULT_EXECUTOR_CLASS, - job_description="User job from facilitator", - block=42, - ) - miner_client = get_miner_client(MockMinerClient, job_uuid) - - for msg_factory in messages_from_miner: - await miner_client.handle_message(msg_factory(job_uuid=job_uuid)) - - job_status_updates: list[JobStatusUpdate] = [] - - async def track_job_status_updates(x: JobStatusUpdate): - job_status_updates.append(x) - - await drive_organic_job( - miner_client, - job, - job_request, - notify_callback=track_job_status_updates, - ) - - assert len(job_status_updates) == len(expected_job_status_updates), ( - f"{[u.status.value for u in job_status_updates]} != {expected_job_status_updates}" - ) - for job_status, expected_status in zip(job_status_updates, expected_job_status_updates): - assert job_status.status == expected_status - assert job_status.uuid == job_uuid - - job = await OrganicJob.objects.aget(job_uuid=job_uuid) - assert job.status == organic_job_status - if organic_job_status == OrganicJob.Status.COMPLETED: - assert job.stdout == "mocked stdout" - assert job.stderr == "mocked stderr" - - def condition(_): - return True - - if expected_job_accepted_receipt: - assert miner_client._query_sent_models(condition, V0JobAcceptedReceiptRequest) - if expected_job_finished_receipt: - assert miner_client._query_sent_models(condition, V0JobFinishedReceiptRequest) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_other/test_commands.py b/validator/app/src/compute_horde_validator/validator/tests/test_other/test_commands.py index f80b285f0..9df498ac5 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_other/test_commands.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_other/test_commands.py @@ -9,7 +9,7 @@ from compute_horde_validator.validator.models import Miner, OrganicJob, SystemEvent from ..helpers import ( - MockSuccessfulMinerClient, + SyncMockSuccessfulMinerClient, check_system_events, throw_error, ) @@ -21,7 +21,7 @@ def patch_miner_client(): with patch( "compute_horde_validator.validator.management.commands.debug_run_organic_job.MinerClient", - MockSuccessfulMinerClient, + SyncMockSuccessfulMinerClient, ): yield