Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 121 additions & 3 deletions extensions/business/chain_dist/chain_dist_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"""
from naeural_core.business.base import BasePluginExecutor as BasePlugin
from extensions.business.deeploy.deeploy_mixin import _DeeployMixin
from extensions.business.deeploy.deeploy_job_mixin import _DeeployJobMixin


__VER__ = '0.2.0'
Expand All @@ -51,6 +52,8 @@

# our overwritten props
'PROCESS_DELAY' : 10,
'JOB_RESTART_DELAY' : 100,
'CHECK_ALL_JOBS_EVERY' : 6, # check all active jobs every N process cycles

# Plugin Sleep period in case of an error.
'SLEEP_PERIOD' : 30,
Expand All @@ -62,7 +65,7 @@
'MAX_THRESHOLD_CLOSE_JOB' : 250,
}

class ChainDistMonitorPlugin(BasePlugin, _DeeployMixin):
class ChainDistMonitorPlugin(BasePlugin, _DeeployMixin, _DeeployJobMixin):

def Pd(self, s, *args, verbosity=0, **kwargs):
"""
Expand All @@ -77,6 +80,7 @@ def Pd(self, s, *args, verbosity=0, **kwargs):
def on_init(self):
self.epochs_closed = {}
self.jobs_to_close = {}
self.iterations_till_all_jobs_check = self.cfg_check_all_jobs_every
self.chainstore_hset(
hkey='chain_dist_monitor',
key=self.node_addr,
Expand All @@ -88,7 +92,7 @@ def on_init(self):
return


def check_all_jobs(self):
def check_unvalidated_jobs(self):
# check if there are any jobs that need to be validated bc.web3_get_unvalidated_jobs() (returns PENDING or IN-CHANGE jobs)
# for each unvalidated job:
# get all running apps via netmon.network_known_apps
Expand Down Expand Up @@ -198,6 +202,119 @@ def check_closable_jobs(self):
#endif
#endif
return

def check_all_active_jobs(self):
# check all active jobs every N process cycles
self.iterations_till_all_jobs_check -= 1
if self.iterations_till_all_jobs_check > 0:
return
self.iterations_till_all_jobs_check = self.cfg_check_all_jobs_every

def normalize_nodes(nodes):
result = set()
if not nodes:
return result
for node in nodes:
if not node:
continue
if isinstance(node, str) and node.startswith('0xai_'):
result.add(self.bc.node_addr_to_eth_addr(node))
else:
result.add(node)
return result

active_jobs = self.bc.get_all_active_jobs() or []
for job in active_jobs:
job_id = job.get('jobId')
cstore_pipeline = self.get_job_pipeline_from_cstore(job_id=job_id)
if cstore_pipeline is None:
# probably not warmed up yet - skip
continue
discovered_plugins = self._discover_plugin_instances(job_id=job_id)

blockchain_nodes = normalize_nodes(job.get('activeNodes'))
if len(blockchain_nodes) == 0:
# no active nodes on chain, probably not started yet - skip
continue

number_of_nodes_requested = job.get('numberOfNodesRequested')
if isinstance(number_of_nodes_requested, str):
try:
number_of_nodes_requested = int(number_of_nodes_requested)
except Exception:
number_of_nodes_requested = None
if isinstance(number_of_nodes_requested, int) and len(blockchain_nodes) > number_of_nodes_requested:
# probably scale-up in progress - skip
continue

cstore_nodes = normalize_nodes(cstore_pipeline.get('DEEPLOY_SPECS', {}).get('current_target_nodes', [])) #TODO use DEEPLOY_KEYS constant
online_nodes = normalize_nodes([plugin.get('NODE') for plugin in discovered_plugins])

if blockchain_nodes == cstore_nodes == online_nodes:
continue
#endif
if blockchain_nodes != cstore_nodes:
self.P(f"Mismatch between blockchain and cstore nodes for job id {job_id}.", color='r')
#endif
self.P(f"Blockchain nodes: {blockchain_nodes}, CStore nodes: {cstore_nodes}, Online nodes: {online_nodes}", verbosity=2)

if blockchain_nodes != online_nodes:
# Check for extra nodes that should not be running the job
extra_nodes = online_nodes - blockchain_nodes
if len(extra_nodes):
extra_nodes = sorted(extra_nodes)
self.Pd(f"Unexpected online nodes for job id {job_id}: {extra_nodes}. Will close the jobs.", verbosity=3)
if hasattr(self.bc, 'eth_addr_list_to_internal_addr_list'):
online_extra_nodes_internal = self.bc.eth_addr_list_to_internal_addr_list(extra_nodes)
else:
online_extra_nodes_internal = [self.bc.eth_addr_to_internal_addr(node) for node in extra_nodes]
self.delete_pipeline_from_nodes(job_id=job_id, target_nodes=online_extra_nodes_internal)
#endif

# Check for missing nodes that should be running the job
missing_nodes = blockchain_nodes - online_nodes
if len(missing_nodes):
cstore_current_value = self.chainstore_hget(
hkey='chain_dist_job_restart',
key=str(job_id),
)
node_address = self.bc.address
if cstore_current_value is None:
self.P(f"Will try to restart job {job_id} on missing nodes in {self.cfg_job_restart_delay}s.")
self.chainstore_hset(
hkey='chain_dist_job_restart',
key=str(job_id),
value=f"{node_address}|{self.time()}"
)
else:
# If someone already claimed the restart, check if it's the node itself or if the restart timed out (eg. the other oracle is down)
[node, time] = cstore_current_value.split('|')
time = float(time)
if node != node_address:
# If the other oracles has not completed the restart in time, we can take over
if time + (self.cfg_job_restart_delay * 2) < self.time():
self.P(f"Will try to restart job {job_id} on missing nodes in {self.cfg_job_restart_delay}s. Previous node {node} timed out.")
self.chainstore_hset(
hkey='chain_dist_job_restart',
key=str(job_id),
value=f"{node_address}|{self.time()}"
)
#endif
else:
# If the oracles has claimed the restart, check if it's time to do it
if time + self.cfg_job_restart_delay < self.time():
self.P(f"Restarting job {job_id} on {len(missing_nodes)} missing nodes.")
#TODO (Vitalii) start job cstore_pipeline on len(missing_nodes) new nodes
self.chainstore_hset(
hkey='chain_dist_job_restart',
key=str(job_id),
value=None
)
#endif
#endif
#endif cstore value
#endif missing nodes
#endif blockchain != online

def maybe_update_liveness(self):
# check if last update was more than 10 minutes ago
Expand All @@ -214,9 +331,10 @@ def maybe_update_liveness(self):
def process(self):
try:
self.maybe_update_liveness()
self.check_all_jobs()
self.check_unvalidated_jobs()
self.check_closable_jobs()
self.maybe_distribute_rewards()
self.check_all_active_jobs()
except Exception as e:
self.P(f"Exception during process:\n{self.trace_info()}\nSleeping for {self.cfg_sleep_period} seconds.", color='r')
self.sleep(self.cfg_sleep_period)
Expand Down
13 changes: 2 additions & 11 deletions extensions/business/deeploy/deeploy_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2713,9 +2713,9 @@ def check_running_pipelines_and_add_to_r1fs(self):
self.P(f"Checked all job IDs.")
return netmon_job_ids

def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, allow_missing=False, discovered_instances=None):
def delete_pipeline_from_nodes(self, app_id=None, job_id=None, owner=None, target_nodes=None, allow_missing=False, discovered_instances=None):
if discovered_instances is None:
discovered_instances = self._discover_plugin_instances(app_id=app_id, job_id=job_id, owner=owner)
discovered_instances = self._discover_plugin_instances(app_id=app_id, job_id=job_id, target_nodes=target_nodes, owner=owner)

if len(discovered_instances) == 0:
if allow_missing:
Expand Down Expand Up @@ -2751,15 +2751,6 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i

"""
result = self.netmon.network_known_apps(target_nodes=target_nodes)

# Count nodes and app instances
node_count = len(result)
total_pipelines = 0

for node, pipelines in result.items():
total_pipelines += len(pipelines)

self.Pd(f"Found {node_count} nodes with a total of {total_pipelines} pipelines")
if owner is not None:
filtered_result = self.defaultdict(dict)
for node, apps in result.items():
Expand Down