Source code for radical.entk.stage

# pylint: disable=protected-access

__copyright__ = 'Copyright 2014-2020, http://radical.rutgers.edu'
__license__   = 'MIT'


import radical.utils as ru

from string      import punctuation
from .exceptions import EnTKValueError, EnTKTypeError
from .exceptions import EnTKMissingError, EnTKError
from .constants  import NAME_MESSAGE
from .task       import Task
from .states     import INITIAL, DONE, FAILED
from .states     import _stage_state_values, state_numbers


[docs]class Stage(object): """ A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of 'Task' objects. All tasks of the same stage may execute concurrently. """ def __init__(self): self._uid = ru.generate_id('stage.%(counter)04d', ru.ID_CUSTOM) self._name = None self._tasks = set() self._state = INITIAL # Keep track of attained self._state_history = [INITIAL] # To change states self._task_count = len(self._tasks) # Pipeline this stage belongs to self._p_pipeline = {'uid': None, 'name': None} self._post_exec = None # -------------------------------------------------------------------------- # Getter functions # -------------------------------------------------------------------------- @property def name(self): """ Name of the stage. Do not use a ',' or '_' in an object's name. :getter: Returns the name of the current stage :setter: Assigns the name of the current stage :type: String """ return self._name @property def tasks(self): """ Tasks of the stage :getter: Returns all the tasks of the current stage :setter: Assigns tasks to the current stage :type: set of Tasks """ return self._tasks @property def state(self): """ Current state of the stage :getter: Returns the state of the current stage :type: String """ return self._state @property def parent_pipeline(self): """ :getter: Returns the pipeline this stage belongs to :setter: Assigns the pipeline uid this stage belongs to """ return self._p_pipeline @property def uid(self): """ Unique ID of the current stage :getter: Returns the unique id of the current stage :type: String """ return self._uid @property def luid(self): """ Unique ID of the current stage (fully qualified). example: >>> stage.luid pipe.0001.stage.0004 :getter: Returns the fully qualified uid of the current stage :type: String """ p_elem = self.parent_pipeline.get('name') if not p_elem: p_elem = self.parent_pipeline['uid'] s_elem = self.name if not s_elem: s_elem = self.uid return '%s.%s' % (p_elem, s_elem) @property def state_history(self): """ Returns a list of the states obtained in temporal order :return: list """ return self._state_history @property def post_exec(self): ''' The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished. Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon. Example: s1.post_exec = func_post def func_post(): if condition is met: s = Stage() t = Task() t.executable = '/bin/sleep' t.arguments = ['30'] s.add_tasks(t) p.add_stages(s) else: # do nothing pass ''' return self._post_exec # -------------------------------------------------------------------------- # Setter functions # -------------------------------------------------------------------------- @name.setter def name(self, value): invalid_symbols = punctuation.replace('.','') if not isinstance(value, str): raise EnTKTypeError(expected_type=str, actual_type=type(value)) if any(symbol in value for symbol in invalid_symbols): raise EnTKValueError(obj=self._uid, attribute='name', actual_value=value, expected_value=NAME_MESSAGE) self._name = value @tasks.setter def tasks(self, value): self._tasks = self._validate_entities(value) self._task_count = len(self._tasks) @parent_pipeline.setter def parent_pipeline(self, value): if isinstance(value, dict): self._p_pipeline = value else: raise EnTKTypeError(expected_type=dict, actual_type=type(value)) @state.setter def state(self, value): if isinstance(value, str): if value in list(_stage_state_values.keys()): self._state = value self._state_history.append(value) else: raise EnTKValueError(obj=self._uid, attribute='state', expected_value=list(_stage_state_values.keys()), actual_value=value) else: raise EnTKTypeError(expected_type=str, actual_type=type(value)) @post_exec.setter def post_exec(self, value): if not callable(value): raise EnTKTypeError(entity='stage %s branch' % self._uid, expected_type='callable', actual_type=type(value)) self._post_exec = value # -------------------------------------------------------------------------- # Public methods #
[docs] def add_tasks(self, value): """ Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won't be added but updated with changes. :argument: iterable task object """ tasks = self._validate_entities(value) self._tasks.update(tasks) self._task_count = len(self._tasks) for task in self._tasks: task.parent_stage['uid'] = self._uid task.parent_stage['name'] = self._name task.parent_pipeline['uid'] = self.parent_pipeline['uid'] task.parent_pipeline['name'] = self.parent_pipeline['name']
[docs] def as_dict(self): """ Convert current Stage into a dictionary :return: python dictionary """ stage_desc_as_dict = { 'uid': self._uid, 'name': self._name, 'state': self._state, 'state_history': self._state_history, 'parent_pipeline': self._p_pipeline } return stage_desc_as_dict
[docs] def from_dict(self, d): """ Create a Stage from a dictionary. The change is in inplace. :argument: python dictionary :return: None """ if 'uid' in d: if d['uid']: self._uid = d['uid'] if 'name' in d: if d['name']: invalid_symbols = punctuation.replace('.','') if not isinstance(d['name'], str): raise EnTKTypeError(expected_type=str, actual_type=type(d['name'])) if any(symbol in d['name'] for symbol in invalid_symbols): raise EnTKValueError(obj=self._uid, attribute='name', actual_value=d['name'], expected_value=NAME_MESSAGE) self._name = d['name'] if 'state' in d: if isinstance(d['state'], str) or isinstance(d['state'], str): if d['state'] in list(_stage_state_values.keys()): self._state = d['state'] else: value = d['state'] raise EnTKValueError(obj=self._uid, attribute='state', expected_value=list(_stage_state_values.keys()), actual_value=value) else: raise EnTKTypeError(entity='state', expected_type=str, actual_type=type(d['state'])) else: self._state = INITIAL if 'state_history' in d: if isinstance(d['state_history'], list): self._state_history = d['state_history'] else: raise EnTKTypeError(entity='state_history', expected_type=list, actual_type=type(d['state_history'])) if 'parent_pipeline' in d: if isinstance(d['parent_pipeline'], dict): self._p_pipeline = d['parent_pipeline'] else: raise EnTKTypeError(entity='parent_pipeline', expected_type=dict, actual_type=type(d['parent_pipeline']))
# -------------------------------------------------------------------------- # Private methods # --------------------------------------------------------------------------
[docs] def _set_tasks_state(self, value): """ Purpose: Set state of all tasks of the current stage. :arguments: String """ if value not in list(state_numbers.keys()): raise EnTKValueError(obj=self._uid, attribute='set_tasks_state', expected_value=list(state_numbers.keys()), actual_value=value) for task in self._tasks: task.state = value
[docs] def _check_stage_complete(self): """ Purpose: Check if all tasks of the current stage have completed, i.e., are in either DONE or FAILED state. """ try: for task in self._tasks: if task.state not in [DONE, FAILED]: return False return True except Exception as ex: raise EnTKError(ex) from ex
[docs] @classmethod def _validate_entities(cls, tasks): """ Purpose: Validate whether the 'tasks' is of type set. Validate the description of each Task. """ if not tasks: raise EnTKTypeError(expected_type=Task, actual_type=type(tasks)) if not isinstance(tasks, set): if not isinstance(tasks, list): tasks = set([tasks]) else: tasks = set(tasks) for t in tasks: if not isinstance(t, Task): raise EnTKTypeError(expected_type=Task, actual_type=type(t)) return tasks
[docs] def _validate(self): """ Purpose: Validate that the state of the current Stage is 'DESCRIBED' (user has not meddled with it). Also validate that the current Stage contains Tasks """ if self._state is not INITIAL: raise EnTKValueError(obj=self._uid, attribute='state', expected_value=INITIAL, actual_value=self._state) if not self._tasks: raise EnTKMissingError(obj=self._uid, missing_attribute='tasks') for task in self._tasks: task._validate()
# ------------------------------------------------------------------------------