From 71b8eaa3bd13a3fabb63fc05b08d6149d67c3d01 Mon Sep 17 00:00:00 2001 From: Anna Melekhova Date: Fri, 3 Mar 2017 14:01:23 +0300 Subject: [PATCH 1/4] Ttl cleanup starter --- src/cocaine-app/jobs/ttl_cleanup.py | 28 +++++ src/cocaine-app/sched/ttl_cleanup_starter.py | 117 +++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 src/cocaine-app/sched/ttl_cleanup_starter.py diff --git a/src/cocaine-app/jobs/ttl_cleanup.py b/src/cocaine-app/jobs/ttl_cleanup.py index 519decd7..dc295a8a 100644 --- a/src/cocaine-app/jobs/ttl_cleanup.py +++ b/src/cocaine-app/jobs/ttl_cleanup.py @@ -104,3 +104,31 @@ def _involved_couples(self): def on_complete(self, processor): couple = str(storage.groups[self.iter_group].couple) processor.planner.update_cleanup_ts(couple, time.time()) + + @staticmethod + def report_resources(params): + """ + Report resources supposed usage for specified params + :param params: params to be passed on creating the job instance + :return: dict={'groups':[], 'resources':{ Job.RESOURCE_HOST_IN: [], etc}} + """ + + # XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose + # to minimize changes to test + res = {} + + iter_group = params['iter_group'] + + if iter_group not in storage.groups: + raise ValueError("Invalid params {}".format(iter_group)) + + couple = storage.groups[iter_group].couple + nb = storage.groups[iter_group].node_backends[0] + res['resources'] = {} + res['resources'][Job.RESOURCE_HOST_IN] = [] + res['resources'][Job.RESOURCE_FS] = [] + res['resources'][Job.RESOURCE_HOST_IN].append(nb.node.host.addr) + res['resources'][Job.RESOURCE_FS].append((nb.node.host.addr, str(nb.fs.fsid))) + res['groups'] = [g.group_id for g in couple.groups] + + return res diff --git a/src/cocaine-app/sched/ttl_cleanup_starter.py b/src/cocaine-app/sched/ttl_cleanup_starter.py new file mode 100644 index 00000000..7af132c5 --- /dev/null +++ b/src/cocaine-app/sched/ttl_cleanup_starter.py @@ -0,0 +1,117 @@ +import logging + +from infrastructure import infrastructure +import jobs +from mastermind_core.config import config +import storage +import time +import datetime +from yt_worker import YqlWrapper + + +logger = logging.getLogger('mm.sched.ttl_cleanup') + +class TtlCleanupStarter(object): + def __init__(self, scheduler): + scheduler.register_periodic_func(self._do_ttl_cleanup, 60*15, starter_name="ttl_cleanup") + self.params = config.get('scheduler', {}) + self.scheduler = scheduler + + def _get_yt_stat(self): + """ + Extract statistics from YT logs + :return: list of couples ids with expired records volume above specified + """ + try: + # Configure parameters for work with YT + yt_cluster = self.params.get('ttl_cleanup', {}).get('yt_cluster', "") + yt_token = self.params.get('ttl_cleanup', {}).get('yt_token', "") + yt_attempts = self.params.get('ttl_cleanup', {}).get('yt_attempts', 3) + yt_delay = self.params.get('ttl_cleanup', {}).get('yt_delay', 10) + + yt_wrapper = YqlWrapper(cluster=yt_cluster, token=yt_token, attempts=yt_attempts, delay=yt_delay) + + aggregation_table = self.params.get('ttl_cleanup', {}).get('aggregation_table', "") + base_table = self.params.get('ttl_cleanup', {}).get('tskv_log_table', "") + expired_threshold = self.params.get('ttl_cleanup', {}).get('ttl_threshold', 10 * float(1024 ** 3)) # 10GB + + yt_wrapper.prepare_aggregate_for_yesterday(base_table, aggregation_table) + + couple_list = yt_wrapper.request_expired_stat(aggregation_table, expired_threshold) + logger.info("YT request has completed") + return couple_list + except: + logger.exception("Work with YQL failed") + return [] + + def _get_idle_groups(self, days_of_idle): + """ + Iterates all over couples. Find couples where ttl_cleanup hasn't run for more than 'days of idle' + :param days_of_idle: how long the group could be idle + :return: list of groups[0] from couples + """ + + idle_groups = [] + + # the epoch time when executed jobs are considered meaningful + idleness_threshold = time.time() - datetime.timedelta(days=days_of_idle).total_seconds() + + couples_data = self.scheduler.get_history() + # couples data is not sorted, so we need to iterate through it all. + # But it may be faster then creation of a sorted representation + + for couple_id, couple_data in couples_data.iteritems(): + + # if couple_data doesn't contain cleanup_ts field then cleanup_ts has never been run on this couple + # and None < idleness_threshold + ts = couple_data.get('ttl_cleanup_ts') + if ts > idleness_threshold: + continue + + # couple has format "gr0:gr1:...:grN". We are interested only in the group #0 + idle_groups.append(int(couple_id.split(":")[0])) + + return idle_groups + + def _do_ttl_cleanup(self): + logger.info('Run ttl cleanup') + + job_params = [] + + allowed_idleness_period = config.get('jobs', {}).get('ttl_cleanup_job', {}).get( + 'max_idle_days', 270) + + # get couples where ttl_cleanup wasn't run for long time (or never) + time_group_list = self._get_idle_groups(days_of_idle=allowed_idleness_period) + + # get information from mds-proxy Yt logs + yt_group_list = self._get_yt_stat() + + # remove dups + couple_list = set(yt_group_list + time_group_list) + + for couple in couple_list: + + iter_group = couple # in tskv coupld id is actually group[0] from couple id + if iter_group not in storage.groups: + logger.error("Not valid group is extracted from aggregation log {}".format(iter_group)) + continue + iter_group = storage.groups[iter_group] + if not iter_group.couple: + logger.error("Iter group is uncoupled {}".format(str(iter_group))) + continue + + job_params.append( + { + 'iter_group': iter_group.group_id, + 'couple': str(iter_group.couple), + 'namespace': iter_group.couple.namespace.id, + 'batch_size': None, # get from config + 'attempts': None, # get from config + 'nproc': None, # get from config + 'wait_timeout': None, # get from config + 'dry_run': False + }) + + sched_params = self.params.get('ttl_cleanup') + self.scheduler.create_jobs(jobs.JobTypes.TYPE_TTL_CLEANUP_JOB, job_params, sched_params) From e8fafd8a5ad1e8e448babdc7f98346563414c6fc Mon Sep 17 00:00:00 2001 From: Anna Melekhova Date: Fri, 3 Mar 2017 14:01:45 +0300 Subject: [PATCH 2/4] Starter initialization --- src/cocaine-app/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/cocaine-app/__init__.py b/src/cocaine-app/__init__.py index 13679a15..4e7a60ed 100755 --- a/src/cocaine-app/__init__.py +++ b/src/cocaine-app/__init__.py @@ -213,7 +213,14 @@ def init_minions(): def init_smart_scheduler(job_processor): + from sched.defrag_starter import DefragStarter + from sched.recover_starter import RecoveryStarter + from sched.ttl_cleanup_starter import TtlCleanupStarter + scheduler = Scheduler(meta_db, job_processor) + defrag_starter = DefragStarter(job_processor, scheduler) + recovery_starter = RecoveryStarter(job_processor, scheduler) + ttl_cleanup_starter = TtlCleanupStarter(scheduler) return scheduler From f41705a05d90ab0c2a732f65fd5da6552bd1e08e Mon Sep 17 00:00:00 2001 From: Anna Melekhova Date: Fri, 3 Mar 2017 14:03:44 +0300 Subject: [PATCH 3/4] Scheduler technical debts --- src/cocaine-app/sched/__init__.py | 113 ++++++++++++++++++------------ 1 file changed, 69 insertions(+), 44 deletions(-) diff --git a/src/cocaine-app/sched/__init__.py b/src/cocaine-app/sched/__init__.py index 12bfca24..64609175 100644 --- a/src/cocaine-app/sched/__init__.py +++ b/src/cocaine-app/sched/__init__.py @@ -204,7 +204,7 @@ def cancel_crossing_jobs(self, job_type, sched_params, demand): for job in existing_jobs: if self.job_processor.JOB_PRIORITIES[job.type] >= self.job_processor.JOB_PRIORITIES[job_type] and not force: - logger.info('Cannot stop job {} type {} since it has >= priority and no force'.format(job.id, job.type)) + logger.debug('Cannot stop job {} type {} since it has >= priority and no force'.format(job.id, job.type)) continue # The time has passed since self.res was built. existing_jobs have more actual statuses @@ -219,15 +219,15 @@ def cancel_crossing_jobs(self, job_type, sched_params, demand): # These jobs could be cancelled, but there is a potential race between making decision # and actual job status. Since we are prohibited to cancel running jobs of not STOP_ALLOWED_TYPES # skip cancellation for a while. That would be fixed as soon as job queries would be introduced - logger.info('Job was not running {} of type {}'.format(job.id, job.type)) + logger.debug('Job was not running {} of type {}'.format(job.id, job.type)) - logger.info('Cannot stop job {} of type {}'.format(job.id, job.type)) + logger.debug('Cannot stop job {} of type {}'.format(job.id, job.type)) continue cancellable_jobs_ids.append(job.id) if len(cancellable_jobs_ids) == 0 and len(completed_jobs_ids) == 0: - logger.info("No jobs to cancel while {} are blocking".format(len(job_ids))) + logger.info("No jobs to cancel while ({}) are blocking".format(job_ids)) return False logger.info("Analyzing demand {} while cancellable_jobs are {}".format(demand, cancellable_jobs_ids)) @@ -260,10 +260,9 @@ def cancel_crossing_jobs(self, job_type, sched_params, demand): logger.info("Successfully cancelled {}".format(cancellable_jobs_ids)) return True - # TODO: rename process_jobs into create_jobs - def process_jobs(self, job_type, jobs_param_list, sched_params): + def create_jobs(self, job_type, jobs_param_list, sched_params): """ - Scheduler creates jobs with specified params of specified type. + Scheduler creates jobs of the specified type with the specified params . :param job_type: job type :param jobs_param_list: a list of dictionaries with params for jobs created. A number of job created <= len(jobs_param_list) @@ -272,18 +271,16 @@ def process_jobs(self, job_type, jobs_param_list, sched_params): For example, one may want to increase priority or decrease max_deadline_time :return: a number of generated jobs """ + created_jobs = [] self.update_resource_stat() - max_jobs = sched_params.get('max_executing_jobs') - if max_jobs: - job_count = self.job_count[job_type] - if job_count >= max_jobs: - logger.info('Found {} unfinished jobs (>= {}) of {} type'.format(job_count, max_jobs, job_type)) - return 0 - max_jobs -= job_count - - created_job = 0 + max_jobs = sched_params.get('max_executing_jobs', 100) + job_count = self.job_count[job_type] + if job_count >= max_jobs: + logger.info('Found {} unfinished jobs (>= {}) of {} type'.format(job_count, max_jobs, job_type)) + return created_jobs + max_jobs -= job_count # common param for all types of jobs need_approving = not sched_params.get('autoapprove', False) @@ -292,7 +289,7 @@ def process_jobs(self, job_type, jobs_param_list, sched_params): if not hasattr(job_class, 'report_resources'): logger.error("Couldn't schedule the job {}. Add static report_resources function".format(job_type)) - return 0 + return created_jobs job_report_resources = job_class.report_resources @@ -323,54 +320,60 @@ def process_jobs(self, job_type, jobs_param_list, sched_params): logger.exception("Creating job {} with params {} has excepted".format(job_type, job_param)) continue - created_job += 1 - if max_jobs and created_job >= max_jobs: - return created_job + max_jobs -= 1 + if max_jobs == 0: + return created_jobs + created_jobs.append(job) self.add_to_resource_stat(res, job.id) # do not update self.job_count, since they would be rewritten on next update & they don't influence calculations - # TODO: return a list of created jobs - return created_job + return created_jobs def get_history(self): - if len(self.history_data) != len(storage.couples): + if len(self.history_data) != len(storage.groupsets.replicas): self.sync_history() return self.history_data def sync_history(self): + """ + Sync mongo representation with storage.groupsets (statistical representation), then update + internal object representation + :return: + """ cursor = self.collection.find() logger.info('Sync recover data is required: {} records/{} couples, cursor {}'.format( - len(self.history_data), len(storage.couples), cursor.count())) + len(self.history_data), len(storage.groupsets.replicas), cursor.count())) - recover_data_couples = set() - history = {} + recover_data_groupsets = set() + history = defaultdict(dict) for data_record in cursor: couple_str = data_record['couple'] - history[couple_str] = {'recover_ts': data_record.get('recover_ts')} - recover_data_couples.add(couple_str) + history[couple_str] = {'recover_ts': data_record.get('recover_ts'), + 'ttl_cleanup_ts': data_record.get('ttl_cleanup_ts')} + recover_data_groupsets.add(couple_str) ts = int(time.time()) - # XXX: rework, it is too expensive - storage_couples = set(str(c) for c in storage.couples.keys()) - add_couples = list(storage_couples - recover_data_couples) - remove_couples = list(recover_data_couples - storage_couples) + storage_groupsets = set(str(c) for c in storage.groupsets.replicas.keys()) + add_groupsets = list(storage_groupsets - recover_data_groupsets) + remove_groupsets = list(recover_data_groupsets - storage_groupsets) - logger.info('Couples to add {}, couple to remove {}'.format(add_couples, remove_couples)) + logger.info('Couples to add {}, couple to remove {}'.format(add_groupsets, remove_groupsets)) offset = 0 OP_SIZE = 200 - while offset < len(add_couples): + while offset < len(add_groupsets): bulk_op = self.collection.initialize_unordered_bulk_op() - bulk_add_couples = add_couples[offset:offset + OP_SIZE] + bulk_add_couples = add_groupsets[offset:offset + OP_SIZE] for couple in bulk_add_couples: - bulk_op.insert({'couple': couple, 'recover_ts': ts}) + # these couples are new. No need to cleanup or recover + bulk_op.insert({'couple': couple, 'recover_ts': ts, 'ttl_cleanup_ts': ts}) res = bulk_op.execute() if res['nInserted'] != len(bulk_add_couples): raise ValueError('Failed to add couples recover data: {}/{} ({})'.format( @@ -378,9 +381,9 @@ def sync_history(self): offset += res['nInserted'] offset = 0 - while offset < len(remove_couples): + while offset < len(remove_groupsets): bulk_op = self.collection.initialize_unordered_bulk_op() - bulk_remove_couples = remove_couples[offset:offset + OP_SIZE] + bulk_remove_couples = remove_groupsets[offset:offset + OP_SIZE] bulk_op.find({'couple': {'$in': bulk_remove_couples}}).remove() res = bulk_op.execute() if res['nRemoved'] != len(bulk_remove_couples): @@ -390,15 +393,37 @@ def sync_history(self): self.history_data = history - def update_recover_ts(self, couple_id, ts): - ts = int(ts) + + def update_historic_ts(self, couple_id, recover_ts=None, cleanup_ts=None): + """ + Update records in mongo with corresponding times + :param couple_id: couple_id + :param recover_ts: epoch time if we need to update recover_ts time else None + :param cleanup_ts: epoch time if we need to update ttl_cleanup time else None + """ + + updated_values = {} + if recover_ts: + updated_values['recover_ts'] = int(recover_ts) + if cleanup_ts: + updated_values['ttl_cleanup_ts'] = int(cleanup_ts) + + if len(updated_values) == 0: + return + res = self.collection.update( {'couple': couple_id}, - {'couple': couple_id, 'recover_ts': ts}, + {"$set": updated_values}, upsert=True) if res['ok'] != 1: - logger.error('Unexpected mongo response during recover ts update: {}'.format(res)) - raise RuntimeError('Mongo operation result: {}'.format(res['ok'])) + logger.error('Unexpected mongo response during update of historic data: {0}'.format(res)) + raise RuntimeError('Mongo operation result: {0}'.format(res['ok'])) # update cached representation - self.history_data[couple_id] = {'recover_ts': ts} \ No newline at end of file + self.history_data[couple_id].update(updated_values) + + def update_recover_ts(self, couple_id, ts): + self.update_historic_ts(couple_id, recover_ts=ts) + + def update_cleanup_ts(self, couple_id, ts): + self.update_historic_ts(couple_id, cleanup_ts=ts) From f4e9557afa71c08761dc6861e5dad4fd57794a49 Mon Sep 17 00:00:00 2001 From: Anna Melekhova Date: Sun, 12 Mar 2017 17:07:17 +0300 Subject: [PATCH 4/4] Introduce new resource model Each job that has starters for it within mm-sched already supported a static report_resources function. But now the model of resource description is going to change 1) report_resources return a dict [(res_type, res_id)] = consumption_level instead of dict[res_type] = list of res_ids 2) report_resources for jobs analyzes config with resource_limits on its own thus it became possible to maintain different roles with different resource estimation. For instance, different nodes used by the job may have different cpu/network usage and now it could be described more precisely 3) config section describing new resource model is located under sched key sched:{ : { resource_limits:{}}} --- src/cocaine-app/jobs/couple_defrag.py | 21 ++++++++++------- src/cocaine-app/jobs/recover_dc.py | 28 ++++++++++++---------- src/cocaine-app/jobs/ttl_cleanup.py | 34 ++++++++++++++++++++------- src/cocaine-app/sched/__init__.py | 3 +-- 4 files changed, 54 insertions(+), 32 deletions(-) diff --git a/src/cocaine-app/jobs/couple_defrag.py b/src/cocaine-app/jobs/couple_defrag.py index 14703a03..b242c9a5 100644 --- a/src/cocaine-app/jobs/couple_defrag.py +++ b/src/cocaine-app/jobs/couple_defrag.py @@ -124,19 +124,22 @@ def report_resources(params): """ Report resources supposed usage for specified params :param params: params to be passed on creating the job instance - :return: dict={'groups':[], 'resources':{ Job.RESOURCE_HOST_IN: [], etc}} + :return: a dict where keys are tuples (res_type, group) or (res_type, node_addr) or (res_type, node_addr, fs_id) + and values are consumption level """ # XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose # to minimize changes to test res = {} + from mastermind_core.config import config + config_params = config.get('scheduler', {}).get('couple_defrag', {}) + resource_limits = config_params.get("resource_limits", {}) # common, no specific roles + couple = params.get('couple', '') - res['groups'] = [int(gid) for gid in couple.split(':')] - res['resources'] = { - Job.RESOURCE_FS: [], - Job.RESOURCE_CPU: [], - } + for gid in couple.split(':'): + res[("group", int(gid))] = 100 # Lock groups for a while + couples = (storage.cache_couples if params.get('is_cache_couple', False) else storage.replicas_groupsets) @@ -145,7 +148,7 @@ def report_resources(params): couple = couples[couple] for g in couple.groups: - res['resources'][Job.RESOURCE_FS].append( - (g.node_backends[0].node.host.addr, str(g.node_backends[0].fs.fsid))) - res['resources'][Job.RESOURCE_CPU].append(g.node_backends[0].node.host.addr) + res[(Job.RESOURCE_FS, g.node_backends[0].node.host.addr, str(g.node_backends[0].fs.fsid))] = 100 + res[(Job.RESOURCE_CPU, g.node_backends[0].node.host.addr)] = resource_limits.get(Job.RESOURCE_CPU, 25) + return res \ No newline at end of file diff --git a/src/cocaine-app/jobs/recover_dc.py b/src/cocaine-app/jobs/recover_dc.py index ad60ba70..cac5545b 100644 --- a/src/cocaine-app/jobs/recover_dc.py +++ b/src/cocaine-app/jobs/recover_dc.py @@ -167,32 +167,34 @@ def report_resources(params): """ Report resources supposed usage for specified params :param params: params to be passed on creating the job instance - :return: dict={'groups':[], 'resources':{ Job.RESOURCE_HOST_IN: [], etc}} + :return: a dict where keys are tuples (res_type, group) or (res_type, node_addr) or (res_type, node_addr, fs_id) + and values are consumption level """ # XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose # to minimize changes to test res = {} + groups = [] + + from mastermind_core.config import config + config_params = config.get('scheduler', {}).get('recover_dc', {}) + resource_limits = config_params.get("resource_limits", {}) # common, no specific roles couple = params.get('couple') if couple: - res['groups'] = [int(gid) for gid in couple.split(':')] + groups = [gid in couple.split(':')] else: group_id = params.get('group') group = storage.groups[group_id] couple = group.couple - res['groups'] = [g.group_id for g in couple.groups] - - res['resources'] = { - Job.RESOURCE_HOST_IN: [], - Job.RESOURCE_HOST_OUT: [], - Job.RESOURCE_FS: [], - } + groups = [g.group_id in couple.groups] - for group_id in res['groups']: + for group_id in groups: + res[("Group", group_id)] = 100 g = storage.groups[group_id] - res['resources'][Job.RESOURCE_HOST_IN].append(g.node_backends[0].node.host.addr) - res['resources'][Job.RESOURCE_HOST_OUT].append(g.node_backends[0].node.host.addr) - res['resources'][Job.RESOURCE_FS].append((g.node_backends[0].node.host.addr, str(g.node_backends[0].fs.fsid))) + nb = g.node_backends[0] + res[(Job.RESOURCE_HOST_IN, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_IN, 20) + res[(Job.RESOURCE_HOST_OUT, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_OUT, 20) + res[(Job.RESOURCE_FS, nb.node.host.addr, str(nb.fs.fsid))] = 100 return res diff --git a/src/cocaine-app/jobs/ttl_cleanup.py b/src/cocaine-app/jobs/ttl_cleanup.py index dc295a8a..dff1af2c 100644 --- a/src/cocaine-app/jobs/ttl_cleanup.py +++ b/src/cocaine-app/jobs/ttl_cleanup.py @@ -20,7 +20,7 @@ class TtlCleanupJob(Job): 'nproc', 'wait_timeout', 'dry_run', - 'remove_all_older', + 'remove_all_older', 'remove_permanent_older', 'resources' ) @@ -110,13 +110,17 @@ def report_resources(params): """ Report resources supposed usage for specified params :param params: params to be passed on creating the job instance - :return: dict={'groups':[], 'resources':{ Job.RESOURCE_HOST_IN: [], etc}} + :return: a dict where keys are tuples (res_type, group) or (res_type, node_addr) or (res_type, node_addr, fs_id) + and values are consumption level """ # XXX: this code duplicates 'set_resources', 'involved_groups' methods but this duplication is chose # to minimize changes to test res = {} + from mastermind_core.config import config + config_params = config.get('scheduler', {}).get('ttl_cleanup', {}) + iter_group = params['iter_group'] if iter_group not in storage.groups: @@ -124,11 +128,25 @@ def report_resources(params): couple = storage.groups[iter_group].couple nb = storage.groups[iter_group].node_backends[0] - res['resources'] = {} - res['resources'][Job.RESOURCE_HOST_IN] = [] - res['resources'][Job.RESOURCE_FS] = [] - res['resources'][Job.RESOURCE_HOST_IN].append(nb.node.host.addr) - res['resources'][Job.RESOURCE_FS].append((nb.node.host.addr, str(nb.fs.fsid))) - res['groups'] = [g.group_id for g in couple.groups] + + for g in couple.groups: + res[("Group", g)] = 100 # lock all groups + + resource_limits = config_params.get("resource_limits", {}) + res[(Job.RESOURCE_HOST_IN, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_IN, 20) + res[(Job.RESOURCE_HOST_OUT, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_OUT, 2) + res[(Job.RESOURCE_FS, nb.node.host.addr, str(nb.fs.fsid))] = \ + resource_limits.get(Job.RESOURCE_FS, 100) # lock the entire disk + + resource_limits = config_params.get("resource_limits_neighbour", {}) + for g in couple.groups: + # skip iter group - it is already configured within resource_limits_main + if g == iter_group: + continue + nb = storage.groups[g].node_backends[0] + res[(Job.RESOURCE_HOST_IN, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_IN, 2) + res[(Job.RESOURCE_HOST_OUT, nb.node.host.addr)] = resource_limits.get(Job.RESOURCE_HOST_OUT, 5) + res[(Job.RESOURCE_FS, nb.node.host.addr, str(nb.fs.fsid))] = \ + resource_limits.get(Job.RESOURCE_FS, 100) # lock the entire disk return res diff --git a/src/cocaine-app/sched/__init__.py b/src/cocaine-app/sched/__init__.py index 64609175..2f1deae7 100644 --- a/src/cocaine-app/sched/__init__.py +++ b/src/cocaine-app/sched/__init__.py @@ -296,8 +296,7 @@ def create_jobs(self, job_type, jobs_param_list, sched_params): for job_param in jobs_param_list: # Get resource demand and verify whether run is possible - old_res = job_report_resources(job_param) - res = self.convert_resource_representation(old_res['resources'], old_res['groups'], job_type) + res = job_report_resources(job_param) if not self.cancel_crossing_jobs(job_type, sched_params, res): continue