From 89472518a726fed7e723430e77d9f0f6ca73e35d Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 19 Mar 2026 17:43:34 +0100 Subject: [PATCH] feat: port applicable chain-dist restart logic from PR 271 --- .../business/chain_dist/chain_dist_monitor.py | 124 +++++++++++++++++- extensions/business/deeploy/deeploy_mixin.py | 13 +- 2 files changed, 123 insertions(+), 14 deletions(-) diff --git a/extensions/business/chain_dist/chain_dist_monitor.py b/extensions/business/chain_dist/chain_dist_monitor.py index fc9c9fd0..067c903c 100644 --- a/extensions/business/chain_dist/chain_dist_monitor.py +++ b/extensions/business/chain_dist/chain_dist_monitor.py @@ -36,6 +36,7 @@ """ from naeural_core.business.base import BasePluginExecutor as BasePlugin from extensions.business.deeploy.deeploy_mixin import _DeeployMixin +from extensions.business.deeploy.deeploy_job_mixin import _DeeployJobMixin __VER__ = '0.2.0' @@ -51,6 +52,8 @@ # our overwritten props 'PROCESS_DELAY' : 10, + 'JOB_RESTART_DELAY' : 100, + 'CHECK_ALL_JOBS_EVERY' : 6, # check all active jobs every N process cycles # Plugin Sleep period in case of an error. 'SLEEP_PERIOD' : 30, @@ -62,7 +65,7 @@ 'MAX_THRESHOLD_CLOSE_JOB' : 250, } -class ChainDistMonitorPlugin(BasePlugin, _DeeployMixin): +class ChainDistMonitorPlugin(BasePlugin, _DeeployMixin, _DeeployJobMixin): def Pd(self, s, *args, verbosity=0, **kwargs): """ @@ -77,6 +80,7 @@ def Pd(self, s, *args, verbosity=0, **kwargs): def on_init(self): self.epochs_closed = {} self.jobs_to_close = {} + self.iterations_till_all_jobs_check = self.cfg_check_all_jobs_every self.chainstore_hset( hkey='chain_dist_monitor', key=self.node_addr, @@ -88,7 +92,7 @@ def on_init(self): return - def check_all_jobs(self): + def check_unvalidated_jobs(self): # check if there are any jobs that need to be validated bc.web3_get_unvalidated_jobs() (returns PENDING or IN-CHANGE jobs) # for each unvalidated job: # get all running apps via netmon.network_known_apps @@ -198,6 +202,119 @@ def check_closable_jobs(self): #endif #endif return + + def check_all_active_jobs(self): + # check all active jobs every N process cycles + self.iterations_till_all_jobs_check -= 1 + if self.iterations_till_all_jobs_check > 0: + return + self.iterations_till_all_jobs_check = self.cfg_check_all_jobs_every + + def normalize_nodes(nodes): + result = set() + if not nodes: + return result + for node in nodes: + if not node: + continue + if isinstance(node, str) and node.startswith('0xai_'): + result.add(self.bc.node_addr_to_eth_addr(node)) + else: + result.add(node) + return result + + active_jobs = self.bc.get_all_active_jobs() or [] + for job in active_jobs: + job_id = job.get('jobId') + cstore_pipeline = self.get_job_pipeline_from_cstore(job_id=job_id) + if cstore_pipeline is None: + # probably not warmed up yet - skip + continue + discovered_plugins = self._discover_plugin_instances(job_id=job_id) + + blockchain_nodes = normalize_nodes(job.get('activeNodes')) + if len(blockchain_nodes) == 0: + # no active nodes on chain, probably not started yet - skip + continue + + number_of_nodes_requested = job.get('numberOfNodesRequested') + if isinstance(number_of_nodes_requested, str): + try: + number_of_nodes_requested = int(number_of_nodes_requested) + except Exception: + number_of_nodes_requested = None + if isinstance(number_of_nodes_requested, int) and len(blockchain_nodes) > number_of_nodes_requested: + # probably scale-up in progress - skip + continue + + cstore_nodes = normalize_nodes(cstore_pipeline.get('DEEPLOY_SPECS', {}).get('current_target_nodes', [])) #TODO use DEEPLOY_KEYS constant + online_nodes = normalize_nodes([plugin.get('NODE') for plugin in discovered_plugins]) + + if blockchain_nodes == cstore_nodes == online_nodes: + continue + #endif + if blockchain_nodes != cstore_nodes: + self.P(f"Mismatch between blockchain and cstore nodes for job id {job_id}.", color='r') + #endif + self.P(f"Blockchain nodes: {blockchain_nodes}, CStore nodes: {cstore_nodes}, Online nodes: {online_nodes}", verbosity=2) + + if blockchain_nodes != online_nodes: + # Check for extra nodes that should not be running the job + extra_nodes = online_nodes - blockchain_nodes + if len(extra_nodes): + extra_nodes = sorted(extra_nodes) + self.Pd(f"Unexpected online nodes for job id {job_id}: {extra_nodes}. Will close the jobs.", verbosity=3) + if hasattr(self.bc, 'eth_addr_list_to_internal_addr_list'): + online_extra_nodes_internal = self.bc.eth_addr_list_to_internal_addr_list(extra_nodes) + else: + online_extra_nodes_internal = [self.bc.eth_addr_to_internal_addr(node) for node in extra_nodes] + self.delete_pipeline_from_nodes(job_id=job_id, target_nodes=online_extra_nodes_internal) + #endif + + # Check for missing nodes that should be running the job + missing_nodes = blockchain_nodes - online_nodes + if len(missing_nodes): + cstore_current_value = self.chainstore_hget( + hkey='chain_dist_job_restart', + key=str(job_id), + ) + node_address = self.bc.address + if cstore_current_value is None: + self.P(f"Will try to restart job {job_id} on missing nodes in {self.cfg_job_restart_delay}s.") + self.chainstore_hset( + hkey='chain_dist_job_restart', + key=str(job_id), + value=f"{node_address}|{self.time()}" + ) + else: + # If someone already claimed the restart, check if it's the node itself or if the restart timed out (eg. the other oracle is down) + [node, time] = cstore_current_value.split('|') + time = float(time) + if node != node_address: + # If the other oracles has not completed the restart in time, we can take over + if time + (self.cfg_job_restart_delay * 2) < self.time(): + self.P(f"Will try to restart job {job_id} on missing nodes in {self.cfg_job_restart_delay}s. Previous node {node} timed out.") + self.chainstore_hset( + hkey='chain_dist_job_restart', + key=str(job_id), + value=f"{node_address}|{self.time()}" + ) + #endif + else: + # If the oracles has claimed the restart, check if it's time to do it + if time + self.cfg_job_restart_delay < self.time(): + self.P(f"Restarting job {job_id} on {len(missing_nodes)} missing nodes.") + #TODO (Vitalii) start job cstore_pipeline on len(missing_nodes) new nodes + self.chainstore_hset( + hkey='chain_dist_job_restart', + key=str(job_id), + value=None + ) + #endif + #endif + #endif cstore value + #endif missing nodes + #endif blockchain != online def maybe_update_liveness(self): # check if last update was more than 10 minutes ago @@ -214,9 +331,10 @@ def maybe_update_liveness(self): def process(self): try: self.maybe_update_liveness() - self.check_all_jobs() + self.check_unvalidated_jobs() self.check_closable_jobs() self.maybe_distribute_rewards() + self.check_all_active_jobs() except Exception as e: self.P(f"Exception during process:\n{self.trace_info()}\nSleeping for {self.cfg_sleep_period} seconds.", color='r') self.sleep(self.cfg_sleep_period) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index a9f3591e..b2349e76 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2713,9 +2713,9 @@ def check_running_pipelines_and_add_to_r1fs(self): self.P(f"Checked all job IDs.") return netmon_job_ids - def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, allow_missing=False, discovered_instances=None): + def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, target_nodes=None, allow_missing=False, discovered_instances=None): if discovered_instances is None: - discovered_instances = self._discover_plugin_instances(app_id=app_id, job_id=job_id, owner=owner) + discovered_instances = self._discover_plugin_instances(app_id=app_id, job_id=job_id, target_nodes=target_nodes, owner=owner) if len(discovered_instances) == 0: if allow_missing: @@ -2751,15 +2751,6 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i """ result = self.netmon.network_known_apps(target_nodes=target_nodes) - - # Count nodes and app instances - node_count = len(result) - total_pipelines = 0 - - for node, pipelines in result.items(): - total_pipelines += len(pipelines) - - self.Pd(f"Found {node_count} nodes with a total of {total_pipelines} pipelines") if owner is not None: filtered_result = self.defaultdict(dict) for node, apps in result.items():