Source code for radical.entk.execman.mock.task_manager


__copyright__ = 'Copyright 2017-2018, http://radical.rutgers.edu'
__author__    = 'Vivek Balasubramanian <vivek.balasubramanian@rutgers.edu>'
__license__   = 'MIT'

import queue

import threading       as mt

from ...exceptions       import EnTKError
from ...                 import states, Task
from ..base.task_manager import Base_TaskManager


# pylint: disable=unused-argument
# ------------------------------------------------------------------------------
#
[docs]class TaskManager(Base_TaskManager): ''' A Task Manager takes the responsibility of dispatching tasks it receives from a 'pending' queue for execution on to the available resources using a runtime system. Once the tasks have completed execution, they are pushed on to the completed queue for other components of EnTK to process. :arguments: :rmgr: (ResourceManager) Object to be used to access the Pilot where the tasks can be submitted Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption. ''' # -------------------------------------------------------------------------- # def __init__(self, sid, rmgr, zmq_info): super().__init__(sid, rmgr, rts='mock', zmq_info=zmq_info) self._rts_runner = None self._zmq_info = zmq_info self._log.info('Created task manager object: %s', self._uid) self._prof.prof('tmgr_create', uid=self._uid) # -------------------------------------------------------------------------- #
[docs] def _tmgr(self, uid, rmgr, zmq_info): ''' **Purpose**: Method to be run by the tmgr process. This method receives a Task from the 'pending' and submits it to the RTS. Currently, it also converts Tasks into CUDs and CUs into (partially described) Tasks. This conversion is necessary since the current RTS is RADICAL Pilot. Once Tasks are recovered from a CU, they are then pushed to the completed queue. At all state transititons, they are synced (blocking) with the AppManager in the master process. **Details**: The AppManager can re-invoke the tmgr process with this function if the execution of the workflow is still incomplete. There is also population of a dictionary, `placeholders`, which stores the path of each of the tasks on the remote machine. ''' try: self._setup_zmq(zmq_info) self._prof.prof('tmgr process started', uid=self._uid) self._log.info('Task Manager process started') # Queue for communication between threads of this process task_queue = queue.Queue() # Start second thread to receive tasks and push to RTS self._rts_runner = mt.Thread(target=self._process_tasks, args=(task_queue, rmgr)) self._rts_runner.daemon = True self._rts_runner.start() self._prof.prof('tmgr infrastructure setup done', uid=uid) # While we are supposed to run and the thread that does the work is # alive go. while not self._tmgr_terminate.is_set(): try: msgs = self._zmq_queue['get'].get_nowait( qname='pending', timeout=100) if msgs: for msg in msgs: if msg['type'] == 'workload': task_queue.put(msg['body']) elif msg['type'] == 'rts': pass else: self._log.error('TMGR receiver wrong message type') except Exception as e: self._log.exception('Error in task execution') raise EnTKError(e) from e self._log.debug('Exited TMGR main loop') except KeyboardInterrupt: self._log.exception('Execution interrupted (probably by Ctrl+C), ' 'cancel tmgr process gracefully...') raise except Exception as e: self._log.exception('task %s failed') raise EnTKError(e) from e finally: self._prof.prof('tmgr_term', uid=uid) if self._rts_runner: self._rts_runner.join() self._log.debug('TMGR RTS Runner joined') self._prof.close() self._log.debug('TMGR profile closed')
# -------------------------------------------------------------------------- #
[docs] def _process_tasks(self, task_queue, rmgr): ''' **Purpose**: The new thread that gets spawned by the main tmgr process invokes this function. This function receives tasks from 'task_queue' and submits them to the RADICAL Pilot RTS. ''' try: while not self._tmgr_terminate.is_set(): body = None try: body = task_queue.get_nowait() except queue.Empty: # Ignore, we don't always have new tasks to run pass if not body: continue task_queue.task_done() bulk_tasks = list() for msg in body: task = Task(from_dict=msg) bulk_tasks.append(task) self._advance(task, 'Task', states.SUBMITTING, 'tmgr-to-sync') # this mock RTS immmedialtely completes all tasks for task in bulk_tasks: task.exit_code = 0 self._advance(task, 'Task', states.COMPLETED, 'cb-to-sync') self._log.info('Pushed task %s with state %s to completed', task.uid, task.state) tdict = task.as_dict() self._zmq_queue['put'].put(qname='completed', msgs=[tdict]) except KeyboardInterrupt: self._log.exception('Execution interrupted (probably by Ctrl+C), ' 'cancel task processor gracefully...') raise except Exception as e: self._log.exception('%s failed', self._uid) raise EnTKError(e) from e
# -------------------------------------------------------------------------- #
[docs] def start_manager(self): ''' **Purpose**: Method to start the tmgr process. The tmgr function is not to be accessed directly. The function is started in a separate thread using this method. ''' # pylint: disable=attribute-defined-outside-init, access-member-before-definition if self._tmgr_thread: self._log.warn('tmgr process already running!') return try: self._prof.prof('creating tmgr process', uid=self._uid) self._tmgr_terminate = mt.Event() self._tmgr_thread = mt.Thread(target=self._tmgr, name='task-manager', args=(self._uid, self._rmgr, self._zmq_info) ) self._log.info('Starting task manager process') self._prof.prof('starting tmgr process', uid=self._uid) self._tmgr_thread.start() self._log.debug('tmgr thread %s', self._tmgr_thread.ident) return True except Exception as e: self._log.exception('Task manager not started') self.terminate_manager() raise EnTKError(e) from e
# ------------------------------------------------------------------------------ # pylint: enable=attribute-defined-outside-init, access-member-before-definition