Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/cocaine-app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,14 @@ def init_minions():


def init_smart_scheduler(job_processor):
from sched.defrag_starter import DefragStarter
from sched.recover_starter import RecoveryStarter
from sched.ttl_cleanup_starter import TtlCleanupStarter

scheduler = Scheduler(meta_db, job_processor)
defrag_starter = DefragStarter(job_processor, scheduler)
recovery_starter = RecoveryStarter(job_processor, scheduler)
ttl_cleanup_starter = TtlCleanupStarter(scheduler)
return scheduler


Expand Down
21 changes: 12 additions & 9 deletions src/cocaine-app/jobs/couple_defrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,22 @@ def report_resources(params):
"""
Report resources supposed usage for specified params
:param params: params to be passed on creating the job instance
:return: dict={'groups':[], 'resources':{ Job.RESOURCE_HOST_IN: [], etc}}
:return: a dict where keys are tuples (res_type, group) or (res_type, node_addr) or (res_type, node_addr, fs_id)
and values are consumption level
"""

# XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose
# to minimize changes to test
res = {}

from mastermind_core.config import config
config_params = config.get('scheduler', {}).get('couple_defrag', {})
resource_limits = config_params.get("resource_limits", {}) # common, no specific roles

couple = params.get('couple', '')
res['groups'] = [int(gid) for gid in couple.split(':')]
res['resources'] = {
Job.RESOURCE_FS: [],
Job.RESOURCE_CPU: [],
}
for gid in couple.split(':'):
res[("group", int(gid))] = 100 # Lock groups for a while

couples = (storage.cache_couples
if params.get('is_cache_couple', False) else
storage.replicas_groupsets)
Expand All @@ -145,7 +148,7 @@ def report_resources(params):
couple = couples[couple]

for g in couple.groups:
res['resources'][Job.RESOURCE_FS].append(
(g.node_backends[0].node.host.addr, str(g.node_backends[0].fs.fsid)))
res['resources'][Job.RESOURCE_CPU].append(g.node_backends[0].node.host.addr)
res[(Job.RESOURCE_FS, g.node_backends[0].node.host.addr, str(g.node_backends[0].fs.fsid))] = 100
res[(Job.RESOURCE_CPU, g.node_backends[0].node.host.addr)] = resource_limits.get(Job.RESOURCE_CPU, 25)

return res
28 changes: 15 additions & 13 deletions src/cocaine-app/jobs/recover_dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,32 +167,34 @@ def report_resources(params):
"""
Report resources supposed usage for specified params
:param params: params to be passed on creating the job instance
:return: dict={'groups':[], 'resources':{ Job.RESOURCE_HOST_IN: [], etc}}
:return: a dict where keys are tuples (res_type, group) or (res_type, node_addr) or (res_type, node_addr, fs_id)
and values are consumption level
"""

# XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose
# to minimize changes to test
res = {}
groups = []

from mastermind_core.config import config
config_params = config.get('scheduler', {}).get('recover_dc', {})
resource_limits = config_params.get("resource_limits", {}) # common, no specific roles

couple = params.get('couple')
if couple:
res['groups'] = [int(gid) for gid in couple.split(':')]
groups = [gid in couple.split(':')]
else:
group_id = params.get('group')
group = storage.groups[group_id]
couple = group.couple
res['groups'] = [g.group_id for g in couple.groups]

res['resources'] = {
Job.RESOURCE_HOST_IN: [],
Job.RESOURCE_HOST_OUT: [],
Job.RESOURCE_FS: [],
}
groups = [g.group_id in couple.groups]

for group_id in res['groups']:
for group_id in groups:
res[("Group", group_id)] = 100
g = storage.groups[group_id]
res['resources'][Job.RESOURCE_HOST_IN].append(g.node_backends[0].node.host.addr)
res['resources'][Job.RESOURCE_HOST_OUT].append(g.node_backends[0].node.host.addr)
res['resources'][Job.RESOURCE_FS].append((g.node_backends[0].node.host.addr, str(g.node_backends[0].fs.fsid)))
nb = g.node_backends[0]
res[(Job.RESOURCE_HOST_IN, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_IN, 20)
res[(Job.RESOURCE_HOST_OUT, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_OUT, 20)
res[(Job.RESOURCE_FS, nb.node.host.addr, str(nb.fs.fsid))] = 100

return res
48 changes: 47 additions & 1 deletion src/cocaine-app/jobs/ttl_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TtlCleanupJob(Job):
'nproc',
'wait_timeout',
'dry_run',
'remove_all_older',
'remove_all_older',
'remove_permanent_older',
'resources'
)
Expand Down Expand Up @@ -104,3 +104,49 @@ def _involved_couples(self):
def on_complete(self, processor):
couple = str(storage.groups[self.iter_group].couple)
processor.planner.update_cleanup_ts(couple, time.time())

@staticmethod
def report_resources(params):
"""
Report resources supposed usage for specified params
:param params: params to be passed on creating the job instance
:return: a dict where keys are tuples (res_type, group) or (res_type, node_addr) or (res_type, node_addr, fs_id)
and values are consumption level
"""

# XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose
# to minimize changes to test
res = {}

from mastermind_core.config import config
config_params = config.get('scheduler', {}).get('ttl_cleanup', {})

iter_group = params['iter_group']

if iter_group not in storage.groups:
raise ValueError("Invalid params {}".format(iter_group))

couple = storage.groups[iter_group].couple
nb = storage.groups[iter_group].node_backends[0]

for g in couple.groups:
res[("Group", g)] = 100 # lock all groups

resource_limits = config_params.get("resource_limits", {})
res[(Job.RESOURCE_HOST_IN, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_IN, 20)
res[(Job.RESOURCE_HOST_OUT, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_OUT, 2)
res[(Job.RESOURCE_FS, nb.node.host.addr, str(nb.fs.fsid))] = \
resource_limits.get(Job.RESOURCE_FS, 100) # lock the entire disk

resource_limits = config_params.get("resource_limits_neighbour", {})
for g in couple.groups:
# skip iter group - it is already configured within resource_limits_main
if g == iter_group:
continue
nb = storage.groups[g].node_backends[0]
res[(Job.RESOURCE_HOST_IN, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_IN, 2)
res[(Job.RESOURCE_HOST_OUT, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_OUT, 5)
res[(Job.RESOURCE_FS, nb.node.host.addr, str(nb.fs.fsid))] = \
resource_limits.get(Job.RESOURCE_FS, 100) # lock the entire disk

return res
116 changes: 70 additions & 46 deletions src/cocaine-app/sched/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def cancel_crossing_jobs(self, job_type, sched_params, demand):
for job in existing_jobs:

if self.job_processor.JOB_PRIORITIES[job.type] >= self.job_processor.JOB_PRIORITIES[job_type] and not force:
logger.info('Cannot stop job {} type {} since it has >= priority and no force'.format(job.id, job.type))
logger.debug('Cannot stop job {} type {} since it has >= priority and no force'.format(job.id, job.type))
continue

# The time has passed since self.res was built. existing_jobs have more actual statuses
Expand All @@ -219,15 +219,15 @@ def cancel_crossing_jobs(self, job_type, sched_params, demand):
# These jobs could be cancelled, but there is a potential race between making decision
# and actual job status. Since we are prohibited to cancel running jobs of not STOP_ALLOWED_TYPES
# skip cancellation for a while. That would be fixed as soon as job queries would be introduced
logger.info('Job was not running {} of type {}'.format(job.id, job.type))
logger.debug('Job was not running {} of type {}'.format(job.id, job.type))

logger.info('Cannot stop job {} of type {}'.format(job.id, job.type))
logger.debug('Cannot stop job {} of type {}'.format(job.id, job.type))
continue

cancellable_jobs_ids.append(job.id)

if len(cancellable_jobs_ids) == 0 and len(completed_jobs_ids) == 0:
logger.info("No jobs to cancel while {} are blocking".format(len(job_ids)))
logger.info("No jobs to cancel while ({}) are blocking".format(job_ids))
return False

logger.info("Analyzing demand {} while cancellable_jobs are {}".format(demand, cancellable_jobs_ids))
Expand Down Expand Up @@ -260,10 +260,9 @@ def cancel_crossing_jobs(self, job_type, sched_params, demand):
logger.info("Successfully cancelled {}".format(cancellable_jobs_ids))
return True

# TODO: rename process_jobs into create_jobs
def process_jobs(self, job_type, jobs_param_list, sched_params):
def create_jobs(self, job_type, jobs_param_list, sched_params):
"""
Scheduler creates jobs with specified params of specified type.
Scheduler creates jobs of the specified type with the specified params .
:param job_type: job type
:param jobs_param_list: a list of dictionaries with params for jobs created.
A number of job created <= len(jobs_param_list)
Expand All @@ -272,18 +271,16 @@ def process_jobs(self, job_type, jobs_param_list, sched_params):
For example, one may want to increase priority or decrease max_deadline_time
:return: a number of generated jobs
"""
created_jobs = []

self.update_resource_stat()

max_jobs = sched_params.get('max_executing_jobs')
if max_jobs:
job_count = self.job_count[job_type]
if job_count >= max_jobs:
logger.info('Found {} unfinished jobs (>= {}) of {} type'.format(job_count, max_jobs, job_type))
return 0
max_jobs -= job_count

created_job = 0
max_jobs = sched_params.get('max_executing_jobs', 100)
job_count = self.job_count[job_type]
if job_count >= max_jobs:
logger.info('Found {} unfinished jobs (>= {}) of {} type'.format(job_count, max_jobs, job_type))
return created_jobs
max_jobs -= job_count

# common param for all types of jobs
need_approving = not sched_params.get('autoapprove', False)
Expand All @@ -292,15 +289,14 @@ def process_jobs(self, job_type, jobs_param_list, sched_params):

if not hasattr(job_class, 'report_resources'):
logger.error("Couldn't schedule the job {}. Add static report_resources function".format(job_type))
return 0
return created_jobs

job_report_resources = job_class.report_resources

for job_param in jobs_param_list:

# Get resource demand and verify whether run is possible
old_res = job_report_resources(job_param)
res = self.convert_resource_representation(old_res['resources'], old_res['groups'], job_type)
res = job_report_resources(job_param)

if not self.cancel_crossing_jobs(job_type, sched_params, res):
continue
Expand All @@ -323,64 +319,70 @@ def process_jobs(self, job_type, jobs_param_list, sched_params):
logger.exception("Creating job {} with params {} has excepted".format(job_type, job_param))
continue

created_job += 1
if max_jobs and created_job >= max_jobs:
return created_job
max_jobs -= 1
if max_jobs == 0:
return created_jobs

created_jobs.append(job)
self.add_to_resource_stat(res, job.id)

# do not update self.job_count, since they would be rewritten on next update & they don't influence calculations

# TODO: return a list of created jobs
return created_job
return created_jobs

def get_history(self):

if len(self.history_data) != len(storage.couples):
if len(self.history_data) != len(storage.groupsets.replicas):
self.sync_history()
return self.history_data

def sync_history(self):
"""
Sync mongo representation with storage.groupsets (statistical representation), then update
internal object representation
:return:
"""

cursor = self.collection.find()

logger.info('Sync recover data is required: {} records/{} couples, cursor {}'.format(
len(self.history_data), len(storage.couples), cursor.count()))
len(self.history_data), len(storage.groupsets.replicas), cursor.count()))

recover_data_couples = set()
history = {}
recover_data_groupsets = set()
history = defaultdict(dict)

for data_record in cursor:
couple_str = data_record['couple']
history[couple_str] = {'recover_ts': data_record.get('recover_ts')}
recover_data_couples.add(couple_str)
history[couple_str] = {'recover_ts': data_record.get('recover_ts'),
'ttl_cleanup_ts': data_record.get('ttl_cleanup_ts')}
recover_data_groupsets.add(couple_str)

ts = int(time.time())

# XXX: rework, it is too expensive
storage_couples = set(str(c) for c in storage.couples.keys())
add_couples = list(storage_couples - recover_data_couples)
remove_couples = list(recover_data_couples - storage_couples)
storage_groupsets = set(str(c) for c in storage.groupsets.replicas.keys())
add_groupsets = list(storage_groupsets - recover_data_groupsets)
remove_groupsets = list(recover_data_groupsets - storage_groupsets)

logger.info('Couples to add {}, couple to remove {}'.format(add_couples, remove_couples))
logger.info('Couples to add {}, couple to remove {}'.format(add_groupsets, remove_groupsets))

offset = 0
OP_SIZE = 200
while offset < len(add_couples):
while offset < len(add_groupsets):
bulk_op = self.collection.initialize_unordered_bulk_op()
bulk_add_couples = add_couples[offset:offset + OP_SIZE]
bulk_add_couples = add_groupsets[offset:offset + OP_SIZE]
for couple in bulk_add_couples:
bulk_op.insert({'couple': couple, 'recover_ts': ts})
# these couples are new. No need to cleanup or recover
bulk_op.insert({'couple': couple, 'recover_ts': ts, 'ttl_cleanup_ts': ts})
res = bulk_op.execute()
if res['nInserted'] != len(bulk_add_couples):
raise ValueError('Failed to add couples recover data: {}/{} ({})'.format(
res['nInserted'], len(bulk_add_couples), res))
offset += res['nInserted']

offset = 0
while offset < len(remove_couples):
while offset < len(remove_groupsets):
bulk_op = self.collection.initialize_unordered_bulk_op()
bulk_remove_couples = remove_couples[offset:offset + OP_SIZE]
bulk_remove_couples = remove_groupsets[offset:offset + OP_SIZE]
bulk_op.find({'couple': {'$in': bulk_remove_couples}}).remove()
res = bulk_op.execute()
if res['nRemoved'] != len(bulk_remove_couples):
Expand All @@ -390,15 +392,37 @@ def sync_history(self):

self.history_data = history

def update_recover_ts(self, couple_id, ts):
ts = int(ts)

def update_historic_ts(self, couple_id, recover_ts=None, cleanup_ts=None):
"""
Update records in mongo with corresponding times
:param couple_id: couple_id
:param recover_ts: epoch time if we need to update recover_ts time else None
:param cleanup_ts: epoch time if we need to update ttl_cleanup time else None
"""

updated_values = {}
if recover_ts:
updated_values['recover_ts'] = int(recover_ts)
if cleanup_ts:
updated_values['ttl_cleanup_ts'] = int(cleanup_ts)

if len(updated_values) == 0:
return

res = self.collection.update(
{'couple': couple_id},
{'couple': couple_id, 'recover_ts': ts},
{"$set": updated_values},
upsert=True)
if res['ok'] != 1:
logger.error('Unexpected mongo response during recover ts update: {}'.format(res))
raise RuntimeError('Mongo operation result: {}'.format(res['ok']))
logger.error('Unexpected mongo response during update of historic data: {0}'.format(res))
raise RuntimeError('Mongo operation result: {0}'.format(res['ok']))

# update cached representation
self.history_data[couple_id] = {'recover_ts': ts}
self.history_data[couple_id].update(updated_values)

def update_recover_ts(self, couple_id, ts):
self.update_historic_ts(couple_id, recover_ts=ts)

def update_cleanup_ts(self, couple_id, ts):
self.update_historic_ts(couple_id, cleanup_ts=ts)
Loading