Source code for radical.entk.execman.base.resource_manager


__copyright__ = "Copyright 2017-2018, http://radical.rutgers.edu"
__author__    = "Vivek Balasubramanian <vivek.balasubramaniana@rutgers.edu>"
__license__   = "MIT"

import os

import radical.utils as ru

from ...exceptions import EnTKMissingError, EnTKTypeError, EnTKError


# ------------------------------------------------------------------------------
#
[docs]class Base_ResourceManager(object): """ A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs. :arguments: :resource_desc: dictionary with details of the resource request and access credentials of the user :example: resource_desc = { | 'resource' : 'xsede.stampede', | 'walltime' : 120, | 'cpus' : 64, | 'gpus' : 0, # optional | 'project' : 'TG-abcxyz', | 'memory' : '16000' # optional | 'queue' : 'abc', # optional | 'access_schema' : 'ssh' # optional | 'job_name' : 'test_job' # optional} """ # -------------------------------------------------------------------------- # def __init__(self, resource_desc, sid, rts, rts_config): if not isinstance(resource_desc, dict): raise EnTKTypeError(expected_type=dict, actual_type=type(resource_desc)) self._resource_desc = resource_desc self._sid = sid self._rts = rts self._rts_config = rts_config # Resource reservation related parameters self._resource = None self._walltime = None self._cpus = 1 self._gpus = 0 self._memory = 0 self._project = None self._access_schema = None self._queue = None self._job_name = None self._services = [] self._validated = False # Utility parameters self._uid = ru.generate_id('resource_manager.%(counter)04d', ru.ID_CUSTOM) self._path = os.getcwd() + '/' + self._sid name = 'radical.entk.%s' % self._uid self._logger = ru.Logger (name, path=self._path) self._prof = ru.Profiler(name, path=self._path) self._shared_data = list() self._outputs = None # -------------------------------------------------------------------------- # @property def resource(self): """ :getter: Return user specified resource name """ return self._resource @property def walltime(self): """ :getter: Return user specified walltime """ return self._walltime @property def cpus(self): """ :getter: Return user specified number of cpus """ return self._cpus @property def memory(self): """ :getter: Return user specified amount of memory """ return self._memory @property def gpus(self): """ :getter: Return user specified number of gpus """ return self._gpus @property def project(self): """ :getter: Return user specified project ID """ return self._project @property def access_schema(self): """ :getter: Return user specified access schema -- 'ssh' or 'gsissh' or None """ return self._access_schema @property def queue(self): """ :getter: Return user specified resource queue to be used """ return self._queue @property def job_name(self): """ :getter: Return user specified job_name """ return self._job_name @property def services(self): """ :getter: Returns the list of tasks used to start "global" services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution """ return self._services @property def shared_data(self): """ :getter: list of files to be staged to remote and that are common to multiple tasks :setter: Assign a list of names of files that need to be accessible to tasks """ return self._shared_data @property def outputs(self): """ :getter: list of files to be staged from remote after execution :setter: Assign a list of names of files that need to be staged from the remote machine """ return self._outputs # -------------------------------------------------------------------------- # Setter functions # @shared_data.setter def shared_data(self, data_list): self._shared_data = data_list @outputs.setter def outputs(self, data): self._outputs = data @services.setter def services(self, tasks): self._services = tasks # -------------------------------------------------------------------------- #
[docs] def get_resource_allocation_state(self): """ **Purpose**: Get the state of the resource allocation """ raise NotImplementedError('get_resource_allocation_state() method not ' 'implemented in ResourceManager for %s' % self._rts)
# -------------------------------------------------------------------------- #
[docs] def get_completed_states(self): """ **Purpose**: Test if a resource allocation was submitted """ raise NotImplementedError('completed_states() method not implemented ' 'in ResourceManager for %s' % self._rts)
# -------------------------------------------------------------------------- #
[docs] def get_rts_info(self): """ **Purpose**: Return the RTS information as a dict. """ raise NotImplementedError('get_rts_info() method not implemented ' 'in ResourceManager for %s' % self._rts)
# -------------------------------------------------------------------------- #
[docs] def _validate_resource_desc(self): """ **Purpose**: Validate the provided resource description """ self._prof.prof('rdesc_validate', uid=self._uid) self._logger.debug('Validating resource description') expected_keys = ['resource', 'walltime', 'cpus'] for key in expected_keys: if key not in self._resource_desc: raise EnTKMissingError(obj='resource description', missing_attribute=key) if not isinstance(self._resource_desc['resource'], str): raise EnTKTypeError(expected_type=str, actual_type=type(self._resource_desc['resource'])) if not isinstance(self._resource_desc['walltime'], int): raise EnTKTypeError(expected_type=int, actual_type=type(self._resource_desc['walltime'])) if not isinstance(self._resource_desc['cpus'], int): raise EnTKTypeError(expected_type=int, actual_type=type(self._resource_desc['cpus'])) if 'memory' in self._resource_desc: if not isinstance(self._resource_desc['memory'], int): raise EnTKTypeError(expected_type=int, actual_type=type(self._resource_desc['memory'])) if 'gpus' in self._resource_desc: if not isinstance(self._resource_desc['gpus'], int): raise EnTKTypeError(expected_type=int, actual_type=type(self._resource_desc['gpus'])) if 'project' in self._resource_desc: if not isinstance(self._resource_desc['project'], str): raise EnTKTypeError(expected_type=str, actual_type=type(self._resource_desc['project'])) if 'access_schema' in self._resource_desc: if not isinstance(self._resource_desc['access_schema'], str): raise EnTKTypeError(expected_type=str, actual_type=type(self._resource_desc['access_schema'])) if 'queue' in self._resource_desc: if not isinstance(self._resource_desc['queue'], str): raise EnTKTypeError(expected_type=str, actual_type=type(self._resource_desc['queue'])) if not isinstance(self._rts_config, dict): raise EnTKTypeError(expected_type=dict, actual_type=type(self._rts_config)) self._logger.info('Resource description validated') self._prof.prof('rdesc_valid', uid=self._uid) self._validated = True return self._validated
# -------------------------------------------------------------------------- #
[docs] def _populate(self): """ **Purpose**: Populate the ResourceManager class with the validated resource description """ if not self._validated: raise EnTKError('Resource description not validated') self._prof.prof('populating rmgr', uid=self._uid) self._logger.debug('Populating resource manager object') self._resource = self._resource_desc['resource'] self._walltime = self._resource_desc['walltime'] self._cpus = self._resource_desc['cpus'] self._memory = self._resource_desc.get('memory', 0) self._gpus = self._resource_desc.get('gpus', 0) self._project = self._resource_desc.get('project', None) self._access_schema = self._resource_desc.get('access_schema', None) self._queue = self._resource_desc.get('queue', None) self._job_name = self._resource_desc.get('job_name', None) self._logger.debug('Resource manager population successful') self._prof.prof('rmgr populated', uid=self._uid)
# -------------------------------------------------------------------------- #
[docs] def submit_resource_request(self): """ **Purpose**: Submit resource request per provided description """ raise NotImplementedError('submit_resource_request() method not ' 'implemented in ResourceManager for %s' % self._rts)
# -------------------------------------------------------------------------- #
[docs] def _terminate_resource_request(self): """ **Purpose**: Cancel resource request by terminating any reservation on any acquired resources or resources pending acquisition """ raise NotImplementedError('_terminate_resource_request() method not ' 'implemented in ResourceManager for %s' % self._rts)
# ------------------------------------------------------------------------------