Source code for radical.entk.execman.base.task_manager


__copyright__ = "Copyright 2017-2018, http://radical.rutgers.edu"
__author__    = "Vivek Balasubramanian <vivek.balasubramanian@rutgers.edu>"
__license__   = "MIT"


import os

import radical.utils as ru

from ...exceptions import EnTKError, EnTKTypeError

from .resource_manager import Base_ResourceManager


# ------------------------------------------------------------------------------
#
[docs]class Base_TaskManager(object): """ A Task Manager takes the responsibility of dispatching tasks it receives from a 'pending' queue for execution on to the available resources using a runtime system. Once the tasks have completed execution, they are pushed on to the completed queue for other components of EnTK to thread. :arguments: :rmgr: (ResourceManager) Object to be used to access the Pilot where the tasks can be submitted Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption. """ # -------------------------------------------------------------------------- # def __init__(self, sid, rmgr, rts, zmq_info): if not isinstance(sid, str): raise EnTKTypeError(expected_type=str, actual_type=type(sid)) if not isinstance(rmgr, Base_ResourceManager): raise EnTKTypeError(expected_type=Base_ResourceManager, actual_type=type(rmgr)) if not isinstance(zmq_info, dict): raise EnTKTypeError(expected_type=dict, actual_type=type(zmq_info)) self._sid = sid self._rmgr = rmgr self._rts = rts # Utility parameters self._uid = ru.generate_id('task_manager.%(counter)04d', ru.ID_CUSTOM) self._path = os.getcwd() + '/' + self._sid name = 'radical.entk.%s' % self._uid self._log = ru.Logger (name, path=self._path) self._prof = ru.Profiler(name, path=self._path) self._dh = ru.DebugHelper(name=name) self._tmgr_thread = None self._tmgr_terminate = None self._zmq_queue = None self._zmq_info = zmq_info self._setup_zmq(zmq_info) # -------------------------------------------------------------------------- #
[docs] def _setup_zmq(self, zmq_info): self._prof.prof('zmq_setup_start', uid=self._uid) sid = self._sid self._zmq_queue = { 'put' : ru.zmq.Putter(sid, url=zmq_info['put'], path=sid), 'get' : ru.zmq.Getter(sid, url=zmq_info['get'], path=sid)} self._prof.prof('zmq_setup_stop', uid=self._uid)
# -------------------------------------------------------------------------- #
[docs] def _tmgr(self, uid, rmgr, zmq_info): """ **Purpose**: Method to be run by the tmgr thread. This method receives a Task from the 'pending' queue and submits it to the RTS. At all state transititons, they are synced (blocking) with the AppManager in the master thread. **Details**: The AppManager can re-invoke the tmgr thread with this function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholder_dict, which stores the path of each of the tasks on the remote machine. """ raise NotImplementedError('_tmgr() method not implemented in ' 'TaskManager for %s' % self._rts)
# -------------------------------------------------------------------------- #
[docs] def _sync_with_master(self, obj, obj_type, qname): body = {'object': obj.as_dict(), 'type' : obj_type} if obj_type == 'Task' : msg = obj.parent_stage['uid'] elif obj_type == 'Stage': msg = obj.parent_pipeline['uid'] else : msg = '' self._prof.prof('pub_sync', state=obj.state, uid=obj.uid, msg=msg) self._log.debug('%s (%s) to sync with amgr', obj.uid, obj.state) self._zmq_queue['put'].put(msgs=[body], qname=qname)
# -------------------------------------------------------------------------- #
[docs] def _advance(self, obj, obj_type, new_state, qname): old_state = obj.state try: obj.state = new_state if obj_type == 'Task' : msg = obj.parent_stage['uid'] elif obj_type == 'Stage': msg = obj.parent_pipeline['uid'] else : msg = None self._prof.prof('advance', uid=obj.uid, state=obj.state, msg=msg) self._log.info('Transition %s to %s', obj.uid, new_state) self._sync_with_master(obj, obj_type, qname) except Exception as ex: self._log.exception('Transition %s to state %s failed, error: %s', obj.uid, new_state, ex) obj.state = old_state self._sync_with_master(obj, obj_type, qname) raise EnTKError(ex) from ex
# -------------------------------------------------------------------------- #
[docs] def start_manager(self): """ **Purpose**: Method to start the tmgr thread. The tmgr function is not to be accessed directly. The function is started in a separate thread using this method. """ raise NotImplementedError('start_manager() method not implemented in ' 'TaskManager for %s' % self._rts)
# -------------------------------------------------------------------------- #
[docs] def terminate_manager(self): """ **Purpose**: Method to terminate the tmgr thread. This method is blocking as it waits for the tmgr thread to terminate (aka join). """ try: if self._tmgr_thread: self._log.debug('Trying to terminate task manager.') if self._tmgr_terminate is not None: if not self._tmgr_terminate.is_set(): self._tmgr_terminate.set() self._log.debug('TMGR terminate is set %s', self._tmgr_terminate.is_set()) if self.check_manager(): self._log.debug('TMGR thread is alive') self._tmgr_thread.join(30) self._log.debug('TMGR thread joined') self._tmgr_thread = None self._log.info('Task manager thread closed') self._prof.prof('tmgr_term', uid=self._uid) except Exception as ex: self._log.exception('Could not terminate task manager thread') raise EnTKError(ex) from ex
# -------------------------------------------------------------------------- #
[docs] def check_manager(self): """ **Purpose**: Check if the tmgr thread is alive and running """ # Return False if the thread does not exist if self._tmgr_thread: return self._tmgr_thread.is_alive() else: return False
# ------------------------------------------------------------------------------