9.2.1. Application Manager API

class radical.entk.AppManager(config_path=None, hostname=None, port=None, username=None, password=None, reattempts=None, resubmit_failed=None, autoterminate=None, write_workflow=None, rts=None, rmq_cleanup=None, rts_config=None, name=None)[source]

An application manager takes the responsibility of setting up the communication infrastructure, instantiates the ResourceManager, TaskManager, WFProcessor objects and all their threads and processes. This is the Master object running in the main process and is designed to recover from errors from all other objects, threads and processes.

Arguments:
config_path:Url to config path to be read for AppManager
hostname:host rabbitmq server is running
port:port at which rabbitmq can be accessed
username:username to log in to RabbitMQ
password:password to log in to RabbitMQ
reattempts:number of attempts to re-invoke any failed EnTK components
resubmit_failed:
 resubmit failed tasks (True/False)
autoterminate:terminate resource reservation upon execution of all tasks of first workflow (True/False)
write_workflow:write workflow and mapping to rts entities to a file (post-termination)
rts:Specify RTS to use. Current options: ‘mock’, ‘radical.pilot’ (default if unspecified)
rmq_cleanup:Cleanup all queues created in RabbitMQ server for current execution (default is True)
rts_config:Configuration for the RTS, accepts {‘sandbox_cleanup’: True/False,’db_cleanup’: True/False} when RTS is RP
name:Name of the Application. It should be unique between executions. (default is randomly assigned)
_cleanup_mqs()[source]
_get_message_to_sync(mq_channel, qname)[source]

Reads a message from the queue, and exchange the message to where it was published by update_task

_read_config(config_path, hostname, port, username, password, reattempts, resubmit_failed, autoterminate, write_workflow, rts, rmq_cleanup, rts_config)[source]
_run_workflow()[source]
_setup_mqs()[source]

Purpose: Setup RabbitMQ system on the client side. We instantiate queue(s) ‘pendingq-‘ for communication between the enqueuer thread and the task manager process. We instantiate queue(s) ‘completedq-‘ for communication between the task manager and dequeuer thread. We instantiate queue ‘sync-to-master’ for communication from enqueuer/dequeuer/task_manager to the synchronizer thread. We instantiate queue ‘sync-ack’ for communication from synchronizer thread to enqueuer/dequeuer/task_manager.

Details: All queues are durable: Even if the RabbitMQ server goes down, the queues are saved to disk and can be retrieved. This also means that after an erroneous run the queues might still have unacknowledged messages and will contain messages from that run. Hence, in every new run, we first delete the queue and create a new one.

_start_all_comps()[source]
_synchronizer()[source]
_synchronizer_work()[source]
Purpose: Thread in the master process to keep the workflow data
structure in appmanager up to date. We receive only tasks objects from the task manager.
Details: Important to note that acknowledgements of the type
channel.basic_ack() is an acknowledgement to the server that the msg was received. This is not to be confused with the Ack sent to the task_manager through the sync-ack queue.
_update_task(msg, reply_to, corr_id, mq_channel, method_frame)[source]
name

Name for the application manager. Allows the user to setup the name of the application manager, as well as its session ID. This name should be unique between different EnTK executions, otherwise it will produce an error.

Getter:Returns the name of the application manager
Setter:Assigns the name of the application manager
Type:String
outputs
Getter:Return list of filenames that are to be staged out after execution
Setter:Assign a list of names of files that need to be staged from the remote machine
resource_desc

The resource description is a dictionary that holds information about the reousrce that will be used to execute a workflow.

The following keys are mandatory in all resource descritpions:
‘resource’ : Label of the resource that will be used.
‘runtime’ : Amount of time the workflow is expected to execute.
‘cores’ : Number of CPU cores.
Optional keys include:
‘project’ : The project that will be charged.
‘gpus’ : Number of GPU devices to be used by the workflow.
‘access_schema’ : The key of an access mechanism to use.
‘queue’ : The name of the job queue RE will use to execute the workflow
‘job_name’ : self._job_name
Getter:Returns the resource description
Setter:Assigns a resource description
resource_terminate()[source]
run()[source]

Purpose: Run the application manager. Once the workflow and resource manager have been assigned. Invoking this method will start the setting up the communication infrastructure, submitting a resource request and then submission of all the tasks.

shared_data
Getter:Return list of filenames that are shared between multiple tasks of the application
Setter:Assign a list of names of files that need to be staged to the remote machine
sid

Get the session ID of the current EnTK execution

Getter:Returns the session ID of the EnTK execution
Type:String
terminate()[source]
workflow
Getter:Return the last workflow assigned for execution
Setter:Assign a new workflow to be executed
workflows
Getter:Return a list of workflows assigned for execution