RADICAL-Ensemble Toolkit
The Ensemble Toolkit is a Python library for developing and executing large-scale ensemble-based workflows. It is being developed by the RADICAL Research Group at Rutgers University. Ensemble Toolkit is released under the MIT License.
More details about the science enabled by EnTK can be found in the following publications:
Balasubramanian, Vivek, Matteo Turilli, Weiming Hu, Matthieu Lefebvre, Wenjie Lei, Guido Cervone, Jeroen Tromp, and Shantenu Jha. “Harnessing the Power of Many: Extensible Toolkit for Scalable Ensemble Applications.” 32nd IEEE International Parallel and Distributed Processing Symposium, 2018 (https://arxiv.org/abs/1710.08491)
Balasubramanian, Vivekanandan, Antons Treikalis, Ole Weidner, and Shantenu Jha. “Ensemble toolkit: Scalable and flexible execution of ensembles of tasks.” In Parallel Processing (ICPP), 2016 45th International Conference on, pp. 458-463. IEEE, 2016. (https://arxiv.org/abs/1602.00678v2)
Note
Please use the following to reference Ensemble Toolkit
Balasubramanian, Vivekanandan, Antons Treikalis, Ole Weidner, and Shantenu Jha. “Ensemble toolkit: Scalable and flexible execution of ensembles of tasks.” In Parallel Processing (ICPP), 2016 45th International Conference on, pp. 458-463. IEEE, 2016.
Project Github Page: https://github.com/radical-cybertools/radical.entk
Mailing Lists
For users: https://groups.google.com/d/forum/ensemble-toolkit-users
For developers: https://groups.google.com/d/forum/ensemble-toolkit-dev
Badges
Introduction
Overview
The Ensemble Toolkit is a Python framework for developing and executing applications comprised of multiple sets of tasks, aka ensembles. Ensemble Toolkit was originally developed with ensemble-based applications in mind. As our understanding of the variety of workflows in scientific application improved, we realized our approach needs to be more generic. Although our motivation remains that of Ensemble-based applications, from EnTK 0.6 onwards, any application where the task workflow can be expressed as a Directed Acyclic Graph, can be supported.
The Ensemble Toolkit has the following unique features: (i) abstractions that enable the expression of various task graphs, (ii) abstraction of resource management and task execution, (iii) Fault tolerance as a first order concern and (iv) well-established runtime capabilities to enable efficient and dynamic usage of grid resources and supercomputers.
We will now discuss the high level design of Ensemble Toolkit in order to understand how an application is created and executed.
Design

Figure 1: High level design of Ensemble Toolkit
Ensemble toolkit consists of several components that serve different purposes. There are three user level components, namely, Pipeline, Stage and Task, that are used directly by the user. The Pipeline, Stage and Task are components used to create the application by describing its task graph. We will soon take a look into how these can be used to create an application.
The Application Manager is an internal component, that takes a workflow described by the user and converts it into a set of workloads, i.e. tasks with no dependencies by parsing through the workflow and identifying, during runtime, tasks with equivalent or no dependencies. The Application Manager also accepts the description of the resource request (with resource label, walltime, cpus, gpus, user credentials) to be created.
The Execution Manager is the last component in Ensemble Toolkit. It accepts the workloads prepared by the Application Manager and executes them on the specified resource using a Runtime system (RTS). Internally, it consists of two subcomponents: ResourceManager and TaskManager, that are responsible for the allocation, management, and deallocation of resources, and execution management of tasks, respectively. The Execution Manager is currently configured to use RADICAL Pilot (RP) as the runtime system, but can be extended to other RTS.
Ensemble Toolkit uses a runtime system as a framework to simply execute tasks on high performance computing (HPC) platforms. The runtime system is expected to manage the direct interactions with the various software and hardware layers of the HPC platforms, including their heterogeneitys.
More details about how EnTK is designed and implemented can be found here.
Dependencies
Ensemble Toolkit uses RADICAL Pilot (RP) as the runtime system. RP is targeted currently only for a set of high performance computing (HPC) systems (see here). RP can be extended to support more HPC systems by contacting the developers of RP/EnTK or by the user themselves by following this page.
EnTK also has profiling capabilities and uses Pandas dataframes to store the data. Users can use these dataframes to wrangle the data or directly plot the required fields.
Dependencies such as RP and Pandas are automatically installed when installing EnTK.
Five steps to create an application
Use the Pipeline, Stage and Task components to create the workflow.
Create an Application Manager (Amgr) object with required parameters/configurations.
Describe the resource request to be created. Assign resource request description and workflow to the Amgr.
Run the Application Manager.
Sit back and relax!
Jump ahead to take a look at the step-by-step instructions for an example script here.
Intended users
Ensemble Toolkit is completely Python based and requires familiarity with the Python language.
Our primary focus is to support domain scientists and enable them to execute their applications at scale on various HPC platforms. Some of our include:
User Groups |
Domain |
---|---|
University of Colorado, Denver |
Biochemistry/ Biophysics |
Penn State University |
Climate Science |
Princeton University |
Seismology |
University College of London |
Biochemistry/ Biophysics Medicine |
Rice University |
Biochemistry/ Biophysics |
Stony Brook University |
Polar Science |
Northern Arizona University |
Polar Science |
Oak Ridge National Laboratory |
Biochemistry/ Biophysics |
Ensemble Toolkit
The design and implementation of EnTK are iterative and driven by use cases. Use cases span several scientific domains, including Biomolecular Sciences, Material Sciences, and Earth Sciences. Users and developers collaborate to elicit requirements and rapid prototyping
Design
We describe how applications are modeled using EnTK components, the architecture and execution model of EnTK. We also specify the types of failures recognized by EnTK and how some of the failures are handled.
Application Model
We model an application by combining the Pipeline, Stage and Task (PST) components.

Figure 1: PST Model
We consider two pythonic collections of objects, Sets and Lists, to describe the task graph. A set-based object represents entities that have no relative order and hence can execute independently. A list-based object represents entities that have a linear temporal order, i.e. entity ‘i’ can only be operated after entity ‘i-1’.
In our PST model, we have the following objects:
Task: an abstraction of a computational task that contains information regarding an executable, its software environment and its data dependences.
Stage: a set of tasks without mutual dependences and that can be executed concurrently.
Pipeline: a list of stages where any stage ‘i’ can be executed only after stage ‘i–1’ has been executed.
When assigned to the Application Manager, the entire application can be expressed as a set of Pipelines. A graphical representation of an application is provided in Figure 1. Note how different Pipelines can have different numbers of Stages, and different Stages can have different numbers of Tasks.
By expressing your application as a set or list of such Pipelines, one can create any application that can be expressed as a DAG.
Architecture
EnTK sits between the user and an HPC platform, abstracting resource management and execution management from the user.

Figure 2: Ensemble Toolkit Architecture and Execution Model
Figure 2 shows the components (purple) and subcomponents (green) of EnTK, organized in three layers: API, Workflow Management, and Workload Management. The API layer enables users to codify PST descriptions. The Workflow Management layer retrieves information from the user about available HPC platforms, initializes EnTK, and holds the global state of the application during execution. The Workload Management layer acquires resources via the RTS. The Workflow Management layer has two components: AppManager and WFProcessor. AppManager uses the Synchronizer subcomponent to update the state of the application at runtime. WFProcessor uses the Enqueue and Dequeue subcomponents to queue and dequeue tasks from the Task Management layer. The Workload Management layer uses ExecManager and its Rmgr, Emgr, RTS Callback, and Heartbeat subcomponents to acquire resources from HPC platforms and execute the application.
This architecture is the isolation of the RTS into a stand-alone subsystem. This enables composability of EnTK with diverse RTS and, depending on capabilities, multiple types of HPC platforms.
Execution Model
Once EnTK is fully initialized, WFProcessor initiates the execution by creating a local copy of the application description from AppManager and tagging tasks for execution. Enqueue pushes these tasks to the Pending queue (Fig. 2, 1). Emgr pulls tasks from the Pending queue (Fig. 2, 2) and executes them using a RTS (Fig. 2, 3). RTS Callback pushes tasks that have completed execution to the Done queue (Fig. 2, 4). Dequeue pulls completed tasks (Fig. 2, 5) and tags them as done, failed or canceled, depending on the return code of the RTS. Each component and subcomponent synchronizes state transitions of pipelines, stages and tasks with AppManager by pushing messages through dedicated queues (Fig. 2, 6). AppManager pulls these messages and updates the application states. AppManager then acknowledges the updates via dedicated queues (Fig. 2, 7). This messaging mechanism ensures that AppManager is always up-to-date with any state change, making it the only stateful component of EnTK.
Failure Model
We consider four main sources of failure: EnTK components, RTS, HPC platform, and task executables. All state updates in EnTK are transactional, hence any EnTK component that fails can be restarted at runtime without losing information about ongoing execution. Both the RTS and the HPC platform are considered black boxes. Partial failures of their subcomponents at runtime are assumed to be handled locally. Platform-level failures are reported to EnTK indirectly, either as failed pilots or failed tasks. Both pilots and tasks can be restarted. Failures are logged and reported to the user at runtime for live or postmortem analysis
Implementation
EnTK is implemented in Python and uses RADICAL-Pilot (RP) as its runtime system. RP is designed to execute ensemble applications via pilots. Pilots provide a multi-stage execution mechanism: Resources are acquired via a placeholder job and subsequently used to execute the application’s tasks. When a pilot is submitted to an HPC platform as a job, it waits in the platform’s queue until the requested resources become available. At that point, the platform’s scheduler bootstraps the job on the requested compute nodes.
You can view the class diagram and sequence diagram and more in the developer documentation.
Performance
Below we present the weak and strong scaling behavior of EnTK on the ORNL Titan machine.
Detailed description of the experiments can be found in this technical paper.

Figure 3: Weak scalability on Titan: 512, 1,024, 2,048, and 4,096 1-core tasks executed on the same amount of cores

Figure 4: Strong scalability on Titan: 8,192 1-core tasks are executed on 1,024, 2,048 and 4,096 cores
Installation
Installing Ensemble Toolkit
Installing Ensemble Toolkit using virtualenv
To install the Ensemble Toolkit, we need to create a virtual environment. Open a terminal and run:
virtualenv $HOME/ve-entk -p python3.7
-p
params indicates which python version you use, python3.7+ is required
Activate virtualenv by:
source $HOME/ve-entk/bin/activate
Note
Activated env name is indicated in the prompt like: (ve-entk) username@two:~$
Deactivate the virtualenv, if you want to disengage. Your python won’t recognize EnTK after deactivation. To deactivate, run:
deactivate
It is suggested to use the released version of EnTK which you can install by executing the following command in your virtualenv:
pip install radical.entk
To install a specific branch of EnTK, e.g., devel instead of using pip installation, you will need to clone the repository and checkout the branch. You can do so using the following commands:
git clone https://github.com/radical-cybertools/radical.entk.git
cd radical.entk
git checkout <branch-name>
pip install .
You can check the version of Ensemble Toolkit with the
`radical-stack`
command-line tool. The currently installed version should
be printed.
radical-stack
python : 3.7.4
pythonpath :
virtualenv : /home/username/ve-entk
radical.entk : 1.0.0
radical.pilot : 1.0.0
radical.saga : 1.0.0
radical.utils : 1.0.0
Installing Ensemble Toolkit using Anaconda/Conda
Conda users can obtain Ensemble Toolkit from conda-forge channel.
To install the Ensemble Toolkit, we need to create a conda environment.
Open a terminal and run (assuming you have PATH to point to conda
):
conda create -n conda-entk python=3.7 -c conda-forge -y
conda activate conda-entk
It is suggested to use the released version of EnTK which you can install by executing the following command in your conda env:
conda install radical.entk -c conda-forge
You can check the version of Ensemble Toolkit with the
`radical-stack`
command-line tool. The currently installed version should
be printed.
radical-stack
python : <path>/rct/bin/python3
pythonpath :
version : 3.9.2
virtualenv : rct
radical.entk : 1.6.0
radical.pilot : 1.6.3
radical.saga : 1.6.1
radical.utils : 1.6.2
Preparing the Environment
Ensemble Toolkit uses RADICAL Pilot as the runtime system. RADICAL Pilot can access HPC clusters remotely via SSH but it requires: (1) a MongoDB server; and (2) a properly set-up passwordless SSH environment.
Setup passwordless SSH Access to HPC platforms
In order to create a passwordless access to another machine, you need to create a key-pair on your local machine and paste the public key into the authorizes_users list on the remote machine.
This is a recommended tutorial to create password ssh access.
An easy way to setup SSH access to multiple remote machines is to create a file
~/.ssh/config
. Suppose the URL used to access a specific machine is
foo@machine.example.com
. You can create an entry in this config file as
follows:
# contents of $HOME/.ssh/config
Host machine1
HostName machine.example.com
User foo
Now you can login to the machine by ssh machine1
.
Source: http://nerderati.com/2011/03/17/simplify-your-life-with-an-ssh-config-file/
Troubleshooting
Missing virtualenv
This should return the version of the RCT installation, e.g., 1.0.0.
If virtualenv is not installed on your system, you can try the following.
wget --no-check-certificate https://pypi.python.org/packages/source/v/virtualenv/virtualenv-16.7.9.tar.gz
tar xzf virtualenv-16.7.9.tar.gz
python virtualenv-16.7.9/virtualenv.py $HOME/ve-entk -p python3.7
source $HOME/ve-entk/bin/activate
User Guide
The following list of examples will guide you step-by-step to create your own entire application. Please read both the description in the website as well as comments made in the python scripts.
Getting Started
In this section we will run through the Ensemble Toolkit API. We will develop an example application consisting of a simple bag of Tasks.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in
your virtualenv under share/radical.entk/user_guide/scripts
.
Importing components from the Ensemble Toolkit Module
To create any application using Ensemble Toolkit, you need to import five modules: Pipeline, Stage, Task, AppManager, ResourceManager. We have already discussed these components in the earlier sections.
1from radical.entk import Pipeline, Stage, Task, AppManager
Creating the workflow
We first create a Pipeline, Stage and Task object. Then we assign the ‘executable’ and ‘arguments’ for the Task. For this example, we will create one Pipeline consisting of one Stage that contains one Task.
In the below snippet, we first create a Pipeline then a Stage.
22 t = Task()
23 t.name = 'my.first.task' # Assign a name to the task (optional, do not use ',' or '_')
24 t.executable = '/bin/echo' # Assign executable to the task
25 t.arguments = ['Hello World'] # Assign arguments for the task executable
26
27 # Add Task to the Stage
28 s.add_tasks(t)
Next, we create a Task and assign its name, executable and arguments of the executable.
30 # Add Stage to the Pipeline
31 p.add_stages(s)
32
33 # Create Application Manager
34 appman = AppManager()
Now, that we have a fully described Task, a Stage and a Pipeline. We create our workflow by adding the Task to the Stage and adding the Stage to the Pipeline.
37 # resource, walltime, and cpus
38 # resource is 'local.localhost' to execute locally
39 res_dict = {
40
41 'resource': 'local.localhost',
Creating the AppManager
Now that our workflow has been created, we need to specify where it is to be
executed. For this example, we will simply execute the workflow locally. We
create an AppManager object, describe a resource request for 1 core for 10
minutes on localhost, i.e. your local machine. We assign the resource request
description and the workflow to the AppManager and run
our application.
42 'walltime': 10,
43 'cpus': 1
44 }
45
46 # Assign resource request description to the Application Manager
47 appman.resource_desc = res_dict
48
49 # Assign the workflow as a set or list of Pipelines to the Application Manager
50 # Note: The list order is not guaranteed to be preserved
51 appman.workflow = set([p])
52
53 # Run the Application Manager
54 appman.run()
Warning
If the python version your system has by default is Anaconda python, please change line 51 in the above code block to
'resource': 'local.localhost_anaconda',
To run the script, simply execute the following from the command line:
python get_started.py
Warning
The first run may fail for different reasons, most of which
related to setting up the execution environment or requesting the correct
resources. Upon failure, Python may incorrectly raise the exception
KeyboardInterrupt
. This may be confusion because it is reported even when
no keyboard interrupt has been issued. Currently, we did not find a way to
avoid to raise that exception.
And that’s it! That’s all the steps in this example. You can generate more verbose output by setting the environment variable `export RADICAL_LOG_TGT=radical.log;export RADICAL_LOG_LVL=DEBUG`.
After the execution of the example, you may want to check the output. Under your home folder, you will find a folder named radical.pilot.sandbox. In that folder, there will be a re.session.* folder and a ve.local.localhost folder. Inside, re.session.*, there is a pilot.0000 folder and in there a unit.000000 folder. In the unit folder, you will see several files including a unit.000000.out and unit.000000.err files. The unit.000000.out holds the messages from the standard output and unit.000000.err holds the messages from standard error. The unit.000000.out file should have a Hello World message.
Let’s look at the complete code for this example:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s = Stage()
# Create a Task object
t = Task()
t.name = 'my.first.task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['Hello World'] # Assign arguments for the task executable
# Add Task to the Stage
s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
# Create Application Manager
appman = AppManager()
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p])
# Run the Application Manager
appman.run()
Adding Tasks
In this section, we will take a look at how we can add more tasks to our base script from the Getting Started section.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in your virtualenv under share/radical.entk/user_guide/scripts
.
Below, you can see the code snippet that shows how you can create more Task objects and add them to the Stage using the add_task() method.
t.name = 'my.task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['I am task %s' % cnt] # Assign arguments for the task executable
# Add the Task to the Stage
s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
# Create Application Manager
appman = AppManager()
python add_tasks.py
Note
Individual Task must be created each time in order to treat all the items uniquely inside a Stage. For example, for loop can be used for creating multiple tasks and creating Task object needs to be included inside iteration.
# Five tasks to be added
s = Stage()
task_dict = {
't.cpu_reqs': {
'processes' : 10,
'threads_per_process': 1,
'process_type' : "MPI",
'thread_type' : "OpenMP"
}}
for i in range(5):
task_dict['name'] = "task-{}".format(i)
task_dict['arguments'] = ["file-{}".format(i)]
# Creating new Task object and adding to Stage at every iteration
s.add_tasks(Task(from_dict=task_dict))
print("Adding shared tasks. Total: {}".format(len(s.tasks)))
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s = Stage()
for cnt in range(10):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['I am task %s' % cnt] # Assign arguments for the task executable
# Add the Task to the Stage
s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
# Create Application Manager
appman = AppManager()
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p])
# Run the Application Manager
appman.run()
Adding Stages
In this section, we will take a look at how we can add more tasks to our script from the Adding Tasks section.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in your virtualenv under share/radical.entk/user_guide/scripts
.
Below, you can see the code snippet that shows how you can add more Stages to a Pipeline. You simple create more Stage objects, populate them with Tasks and add them to the Pipeline using the add_stage() method.
for cnt in range(10):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional)
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['I am task %s in %s' % (cnt, s1.name)] # Assign arguments for the task executable
# Add the Task to the Stage
s1.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object
s2 = Stage()
s2.name = 'Stage.2'
for cnt in range(5):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['I am task %s in %s' % (cnt, s2.name)] # Assign arguments for the task executable
# Add the Task to the Stage
s2.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
# Create a dictionary describe four mandatory keys:
python add_stages.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s1 = Stage()
s1.name = 'Stage.1'
for cnt in range(10):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional)
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['I am task %s in %s' % (cnt, s1.name)] # Assign arguments for the task executable
# Add the Task to the Stage
s1.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object
s2 = Stage()
s2.name = 'Stage.2'
for cnt in range(5):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['I am task %s in %s' % (cnt, s2.name)] # Assign arguments for the task executable
# Add the Task to the Stage
s2.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p])
# Run the Application Manager
appman.run()
Adding Pipelines
In this section, we will take a look at how we can add more pipelines to our script from the Adding Stages section.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in your virtualenv under share/radical.entk/user_guide/scripts
.
Below, you can see the code snippet that shows how you can create a workflow with two Pipelines. You simple create more Pipeline objects, populate them with Stages and Tasks and create the workflow as a set of two Pipelines and assign them to the Application Manager.
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
To keep the script shorter, we created a function that creates, populates and returns a Pipeline. The code snippet of this function is as follows.
for s_cnt in range(stages):
# Create a Stage object
s = Stage()
s.name = 'Stage.%s' % s_cnt
for t_cnt in range(5):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional)
t.executable = '/bin/echo' # Assign executable to the task
# Assign arguments for the task executable
t.arguments = ['I am task %s in %s in %s' % (t_cnt, s_cnt, name)]
# Add the Task to the Stage
s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
return p
if __name__ == '__main__':
python add_pipelines.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
def generate_pipeline(name, stages):
# Create a Pipeline object
p = Pipeline()
p.name = name
for s_cnt in range(stages):
# Create a Stage object
s = Stage()
s.name = 'Stage.%s' % s_cnt
for t_cnt in range(5):
# Create a Task object
t = Task()
t.name = 'my.task' # Assign a name to the task (optional)
t.executable = '/bin/echo' # Assign executable to the task
# Assign arguments for the task executable
t.arguments = ['I am task %s in %s in %s' % (t_cnt, s_cnt, name)]
# Add the Task to the Stage
s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
return p
if __name__ == '__main__':
p1 = generate_pipeline(name='Pipeline 1', stages=1)
p2 = generate_pipeline(name='Pipeline 2', stages=2)
# Create Application Manager
appman = AppManager()
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p1, p2])
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Run the Application Manager
appman.run()
Adding Data
Data management is one of the important elements of any application. In this section, we will take a look at how we can manage data between tasks.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or
find it in your virtualenv under share/radical.entk/user_guide/scripts
.
In the following example, we will create a Pipeline of two Stages, each with one Task. In the first stage, we will create a file of size 1MB. In the next stage, we will perform a character count on the same file and write to another file. Finally, we will bring that output to the current location.
Since we already know how to create this workflow, we simply present the code snippet concerned with the data movement. The task in the second stage needs to perform two data movements: a) copy the data created in the first stage to its current directory and b) bring the output back to the directory where the script is executed. Below we present the statements that perform these operations.
s2.add_tasks(t2)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
Intermediate data are accessible through a unique identification formed by $Pipeline_%s_Stage_%s_Task_%s/{filename} where %s is replaced by entity name of pipeline, stage and task. If name is not given, .uid is available to locate files across tasks. Task API has more information about the use of API.
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_LOG_LVL=DEBUG
.
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s1 = Stage()
# Create a Task object which creates a file named 'output.txt' of size 1 MB
t1 = Task()
t1.executable = '/bin/bash'
t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object
s2 = Stage()
s2.name = 'Stage.2'
# Create a Task object
t2 = Task()
t2.executable = '/bin/bash'
t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
# Copy data from the task in the first stage to the current task's location
t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)]
# Download the output of the current task to the current location
t2.download_output_data = ['ccount.txt']
# Add the Task to the Stage
s2.add_tasks(t2)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
# Assign the workflow as a set or list of Pipelines to the Application Manager
appman.workflow = set([p])
# Create a dictionary describe four mandatory keys:
# resource, walltime, cpus and project
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Run the Application Manager
appman.run()
Handling data from login node
The following example shows how to configure a task to fetch data at runtime, when compute nodes do not have internet access.
t = Task()
t.name = 'download-task'
t.executable = '/usr/bin/ssh'
t.arguments = ["<username>@<hostname>",
"'bash -l /path/to/download_script.sh <input1> <input2>'"]
t.download_output_data = ['STDOUT', 'STDERR']
t.cpu_reqs = {'cpu_processes': 1,
'cpu_process_type': None,
'cpu_threads': 1,
'cpu_thread_type': None}
Note
bash -l
makes the shell act as if it had been directly invoked
by logging in.
Note
Verify that the command ssh localhost hostname
works on the
login node without a password prompt. If you are asked for a
password, please create an ssh keypair with the command
ssh-keygen
. That should create two keys in ~/.ssh
, named
id_rsa
and id_rsa.pub
. Now, execute the following commands:
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 0600 $HOME/.ssh/authorized_keys
Changing Target Machine
All of our examples so far have been run locally. Its time to run something on a HPC! One of the features of Ensemble Toolkit is that you can submit tasks on another machine remotely from your local machine. But this has some requirements, you need to have passwordless ssh access to the target machine. If you don’t have such access, we discuss the setup here. You also need to confirm that RP and Ensemble Toolkit are supported on this machine. A list of supported machines and how to get support for new machines is discussed here.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
Once you have passwordless access to another machine, switching from one target machine to another is quite simple. We simply re-describe the resource dictionary that is used to create the Resource Manager. For example, in order to run on the ACCESS Stampede cluster, we describe the resource dictionary as follows:
'project': 'unc100',
'schema': 'gsissh'
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Run the Application Manager
You can download the complete code discussed in this section here
or find it in your virtualenv under
share/radical.entk/user_guide/scripts
.
python change_target.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s1 = Stage()
# Create a Task object which creates a file named 'output.txt' of size 1 MB
t1 = Task()
t1.executable = '/bin/bash'
t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object
s2 = Stage()
s2.name = 'Stage 2'
# Create a Task object
t2 = Task()
t2.executable = '/bin/bash'
t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
# Copy data from the task in the first stage to the current task's location
t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.name,
s1.name, t1.name)]
# Download the output of the current task to the current location
t2.download_output_data = ['ccount.txt']
# Add the Task to the Stage
s2.add_tasks(t2)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
# Assign the workflow as a set or list of Pipelines to the Application Manager
appman.workflow = set([p])
# Create a dictionary to describe our resource request for XSEDE Stampede
res_dict = {
'resource': 'xsede.comet',
'walltime': 10,
'cpus': 16,
'project': 'unc100',
'schema': 'gsissh'
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Run the Application Manager
appman.run()
Profiling
EnTK can be configured to generate profiles by setting
RADICAL_ENTK_PROFILE=True
. Profiles are generated per component and
sub-component. These profiles can be read and analyzed by using
RADICAL Analytics (RA).
We describe profiling capabilities using RADICAL Analytics for EnTK via two examples that extract durations and timestamps.
The scripts and the data can be found in your virtualenv under share/radical.entk/analytics/scripts
or can be downloaded via the following links:
Untar the data and run either of the scripts. We recommend following the inline comments and output messages to get an understanding of RADICAL Analytics’ usage for EnTK.
More details on the capabilities of RADICAL Analytics can be found in its documentation.
Note
The current examples of RADICAL Analytics are configured for RADICAL Pilot but can be changed to EnTK by * Setting stype to ‘radical.entk’ when creating the RADICAL Analytics session * Following the state model, event model, and sequence diagram to determine the EnTK probes to use in profiling.
Extracting durations
1#!/usr/bin/env python
2__copyright__ = 'Copyright 2013-2018, http://radical.rutgers.edu'
3__license__ = 'MIT'
4
5
6import os
7import sys
8import glob
9import pprint
10import radical.utils as ru
11import radical.entk as re
12import radical.analytics as ra
13
14"""This example illustrates hoq to obtain durations for arbitrary (non-state)
15profile events. Modified from examples under RADICAL Analytics"""
16
17# ------------------------------------------------------------------------------
18#
19if __name__ == '__main__':
20
21 loc = './re.session.two.vivek.017759.0012'
22 src = os.path.dirname(loc)
23 sid = os.path.basename(loc)
24 session = ra.Session(src=src, sid = sid, stype='radical.entk')
25
26 # A formatting helper before starting...
27 def ppheader(message):
28 separator = '\n' + 78 * '-' + '\n'
29 print(separator + message + separator)
30
31 # First we look at the *event* model of our session. The event model is
32 # usually less stringent than the state model: not all events will always be
33 # available, events may have certain fields missing, they may be recorded
34 # multiple times, their meaning may slightly differ, depending on the taken
35 # code path. But in general, these are the events available, and their
36 # relative ordering.
37 ppheader("event models")
38 pprint.pprint(session.describe('event_model'))
39 pprint.pprint(session.describe('statistics'))
40
41 # Let's say that we want to see how long EnTK took to schedule, execute, and
42 # process completed tasks.
43
44 # We first filter our session to obtain only the task objects
45 tasks = session.filter(etype='task', inplace=False)
46 print('#tasks : %d' % len(tasks.get()))
47
48 # We use the 're.states.SCHEDULING' and 're.states.SUBMITTING' probes to find
49 # the time taken by EnTK to create and submit all tasks for execution
50 ppheader("Time spent to create and submit the tasks")
51 duration = tasks.duration(event=[{ru.EVENT: 'state',
52 ru.STATE: re.states.SCHEDULING},
53 {ru.EVENT: 'state',
54 ru.STATE: re.states.SUBMITTING}])
55 print('duration : %.2f' % duration)
56
57 # We use the 're.states.SUBMITTING' and 're.states.COMPLETED' probes to find
58 # the time taken by EnTK to execute all tasks
59 ppheader("Time spent to execute the tasks")
60 duration = tasks.duration(event=[{ru.EVENT: 'state',
61 ru.STATE: re.states.SUBMITTING},
62 {ru.EVENT: 'state',
63 ru.STATE: re.states.COMPLETED}])
64 print('duration : %.2f' % duration)
65
66 # We use the 're.states.COMPLETED' and 're.states.DONE' probes to find
67 # the time taken by EnTK to process all executed tasks
68 ppheader("Time spent to process executed tasks")
69 duration = tasks.duration(event=[{ru.EVENT: 'state',
70 ru.STATE: re.states.COMPLETED},
71 {ru.EVENT: 'state',
72 ru.STATE: re.states.DONE}])
73 print('duration : %.2f' % duration)
74
75 # Finally, we produce a list of the number of concurrent tasks between
76 # states 're.states.SUBMITTING' and 're.states.COMPLETED' over the course
77 # of the entire execution sampled every 10 seconds
78 ppheader("concurrent tasks in between SUBMITTING and EXECUTED states")
79 concurrency = tasks.concurrency(event=[{ru.EVENT: 'state',
80 ru.STATE: re.states.SUBMITTING},
81 {ru.EVENT: 'state',
82 ru.STATE: re.states.COMPLETED}],
83 sampling=10)
84 pprint.pprint(concurrency)
85
86
87# ------------------------------------------------------------------------------
Extracting timestamps
1#!/usr/bin/env python
2
3import os
4import pprint
5import radical.utils as ru
6import radical.entk as re
7import radical.analytics as ra
8
9__copyright__ = 'Copyright 2013-2018, http://radical.rutgers.edu'
10__license__ = 'MIT'
11
12"""
13This example illustrates the use of the method ra.Session.get().
14Modified from examples under RADICAL Analytics
15"""
16
17# ------------------------------------------------------------------------------
18#
19if __name__ == '__main__':
20
21 loc = './re.session.two.vivek.017759.0012'
22 src = os.path.dirname(loc)
23 sid = os.path.basename(loc)
24 session = ra.Session(src=src, sid=sid, stype='radical.entk')
25
26 # A formatting helper before starting...
27 def ppheader(message):
28 separator = '\n' + 78 * '-' + '\n'
29 print(separator + message + separator)
30
31 # and here we go. As seen in example 01, we use ra.Session.list() to get the
32 # name of all the types of entity of the session.
33 etypes = session.list('etype')
34 pprint.pprint(etypes)
35
36 # We limit ourselves to the type 'task'. We use the method
37 # ra.Session.get() to get all the objects in our session with etype 'task':
38 ppheader("properties of the entities with etype 'task'")
39 tasks = session.get(etype='task')
40 pprint.pprint(tasks)
41
42
43 # Mmmm, still a bit too many entities. We limit our analysis to a single
44 # task. We use ra.Session.get() to select all the objects in the
45 # session with etype 'task' and uid 'task.0000' and return them into a
46 # list:
47 ppheader("properties of the entities with etype 'task' and uid 'task.0000'")
48 task = session.get(etype='task', uid='task.0000')
49 pprint.pprint(task)
50
51
52 # We may want also to look into the states of this task:
53 ppheader("states of the entities with uid 'task.0000'")
54 states = task[0].states
55 pprint.pprint(states)
56
57 # and extract the state we need. For example, the state 'SCHEDULED', that
58 # indicates that the task has been scheduled. To refer to the state 'SCHEDULED',
59 # and to all the other states of RADICAL-Pilot, we use the re.states.SCHEDULED property
60 # that guarantees type checking.
61 ppheader("Properties of the state re.SCHEDULED of the entities with uid 'task.0000'")
62 state = task[0].states[re.states.SCHEDULED]
63 pprint.pprint(state)
64
65 # Finally, we extract a property we need from this state. For example, the
66 # timestamp of when the task has been created, i.e., the property 'time' of
67 # the state SCHEDULED:
68 ppheader("Property 'time' of the state re.states.SCHEDULED of the entities with uid 'task.000000'")
69 timestamp = task[0].states[re.states.SCHEDULED][ru.TIME]
70 pprint.pprint(timestamp)
71
72 # ra.Session.get() can also been used to to get all the entities in our
73 # session that have a specific state. For example, the following gets all
74 # the types of entity that have the state 'SCHEDULED':
75 ppheader("Entities with state re.states.SCHEDULED")
76 entities = session.get(state=re.states.SCHEDULED)
77 pprint.pprint(entities)
78
79 # We can then print the timestamp of the state 'SCHEDULED' for all the entities
80 # having that state by using something like:
81 ppheader("Timestamp of all the entities with state re.states.SCHEDULED")
82 timestamps = [entity.states[re.states.SCHEDULED][ru.TIME] for entity in entities]
83 pprint.pprint(timestamps)
84
85 # We can also create tailored data structures for our analyis. For
86 # example, using tuples to name entities, state, and timestamp:
87 ppheader("Named entities with state re.states.SCHEDULED and its timestamp")
88 named_timestamps = [(entity.uid,
89 entity.states[re.states.SCHEDULED][ru.STATE],
90 entity.states[re.states.SCHEDULED][ru.TIME]) for entity in entities]
91 pprint.pprint(named_timestamps)
Examples
The following is a list of common application structures. Please read both the description on the website and the comments made in the python scripts.
Pipeline of Ensembles
One of the most common execution patterns consists of one Pipeline with multiple Stages where each Stage consists of several Tasks. We call this a Pipeline of Ensembles. A pictorial representation of this pattern is provided below.

Figure 1: Pipeline of Ensembles
We encourage you to take a look at the Ensemble of Pipelines example on the next page and compare it with the above pattern.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
In the following example, we create one Pipeline with three Stages. The Task in the first Stage creates a file. There are 10 Tasks in the second Stage that perform a character count on that file. The 10 Tasks in the third Stage perform a checksum on the output of each Task from the second stage.
You can download the complete code discussed in this section here
or find it in
your virtualenv under share/radical.entk/simple/scripts
.
python poe.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
# ------------------------------------------------------------------------------
#
def generate_pipeline():
# Create a Pipeline object
p = Pipeline()
p.name = 'p1'
# Create a Stage object
s1 = Stage()
s1.name = 's1'
# Create a Task object which creates a file named 'output.txt' of size 1 MB
t1 = Task()
t1.name = 't1'
t1.executable = '/bin/bash'
t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object to hold character count tasks
s2 = Stage()
s2.name = 's2'
s2_task_uids = []
for cnt in range(30):
# Create a Task object
t2 = Task()
t2.name = 't%s' % (cnt + 1)
t2.executable = '/bin/bash'
t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
# Copy data from the task in the first stage to the current task's location
t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' %
(p.uid, s1.uid, t1.uid)]
# Add the Task to the Stage
s2.add_tasks(t2)
s2_task_uids.append(t2.uid)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create another Stage object to hold checksum tasks
s3 = Stage()
s3.name = 's3'
for cnt in range(30):
# Create a Task object
t3 = Task()
t3.name = 't%s' % (cnt + 1)
t3.executable = '/bin/bash'
t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt']
# Copy data from the task in the first stage to the current task's location
t3.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/ccount.txt' %
(p.uid, s2.uid, s2_task_uids[cnt])]
# Download the output of the current task to the current location
t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt]
# Add the Task to the Stage
s3.add_tasks(t3)
# Add Stage to the Pipeline
p.add_stages(s3)
return p
# ------------------------------------------------------------------------------
#
if __name__ == '__main__':
# Create Application Manager
appman = AppManager()
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus':2
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([generate_pipeline()])
# Run the Application Manager
appman.run()
# ------------------------------------------------------------------------------
Ensemble of Pipelines
One of the most common execution patterns consists of mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this an Ensemble of Pipelines. A pictorial representation of this pattern is provided below.

Figure 1: Ensemble of Pipelines
We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
In the following example, we create 10 Pipelines, each with three Stages. The Task in the first Stage creates a file. The Task in the second Stage performs a character count on that file. The Task in the third Stage performs a checksum on the output of the Task from the second stage.
You can download the complete code discussed in this section here
or find it in
your virtualenv under share/radical.entk/simple/scripts
.
python eop.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
def generate_pipeline():
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s1 = Stage()
# Create a Task object which creates a file named 'output.txt' of size 1 MB
t1 = Task()
t1.executable = '/bin/bash'
t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object to hold character count tasks
s2 = Stage()
# Create a Task object
t2 = Task()
t2.executable = '/bin/bash'
t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
# Copy data from the task in the first stage to the current task's location
t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.uid,
s1.uid, t1.uid)]
# Add the Task to the Stage
s2.add_tasks(t2)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create another Stage object to hold checksum tasks
s3 = Stage()
# Create a Task object
t3 = Task()
t3.executable = '/bin/bash'
t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt']
# Copy data from the task in the first stage to the current task's location
t3.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/ccount.txt' % (p.uid,
s2.uid, t2.uid)]
# Download the output of the current task to the current location
t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt]
# Add the Task to the Stage
s3.add_tasks(t3)
# Add Stage to the Pipeline
p.add_stages(s3)
return p
if __name__ == '__main__':
pipelines = []
for cnt in range(10):
pipelines.append(generate_pipeline())
# Create Application Manager
appman = AppManager()
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set(pipelines)
# Run the Application Manager
appman.run()
Sequence of Workflows
Another common execution pattern consists of same-session sequential workflows with mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this a Sequence of Workflows.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
In the following example, we create 2 sequential workflows, each with 2 Pipelines and 3 Stages per Pipeline. For demonstration purposes, each Task does nothing but “sleep” for 3 seconds. The example suggests starting AppManager with autoterminate=False and using appman.terminate() once all pipelines are finished. This allows you to use the same application manager for the second workflow.
You can download the complete code discussed in this section here
or find it in
your virtualenv under share/radical.entk/simple/scripts
.
python sow.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
def generate_pipeline():
p = Pipeline()
s1 = Stage()
t1 = Task()
t1.executable = '/bin/sleep'
t1.arguments = ['3']
s1.add_tasks(t1)
p.add_stages(s1)
s2 = Stage()
t2 = Task()
t2.executable = '/bin/sleep'
t2.arguments = ['3']
s2.add_tasks(t2)
p.add_stages(s2)
s3 = Stage()
t3 = Task()
t3.executable = '/bin/sleep'
t3.arguments = ['3']
s3.add_tasks(t3)
p.add_stages(s3)
return p
if __name__ == '__main__':
appman = AppManager()
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus' : 8
}
appman.resource_desc = res_dict
pipelines = list()
for cnt in range(2):
pipelines.append(generate_pipeline())
appman.workflow = set(pipelines)
appman.run()
print('1 ===================================================')
pipelines = list()
for cnt in range(2):
pipelines.append(generate_pipeline())
appman.workflow = set(pipelines)
appman.run()
print('2 ===================================================')
appman.terminate()
print('t ===================================================')
Advanced Examples
The following is a list of common application structures. Please read both the description in the website as well as comments made in the python scripts.
Adaptive applications: Task-count
We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or
find it in your virtualenv under share/radical.entk/advanced/scripts
.
For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
def func_post():
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
...
s = Stage()
s.post_exec = func_post
In the following example, we initially create 1 Pipeline with one Stage. There are 10 tasks in the first Stage that each runs ‘sleep 30’. After the Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the number of new stages added is less than 4. If yes, we add a new Stage with similar tasks as before to the Pipeline. If 4 stages have already been added, no more stages are added.
python adapt_tc.py
Let’s take a look at the complete code in the example. You can generate a more
verbose output by setting the environment variable
RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
def generate_pipeline():
def func_post():
global CUR_NEW_STAGE, MAX_NEW_STAGE
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
CUR_NEW_STAGE += 1
s = Stage()
for i in range(10):
t = Task()
t.executable = '/bin/sleep'
t.arguments = [ '30']
s.add_tasks(t)
# Add post-exec to the Stage
s.post_exec = func_post
p.add_stages(s)
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s1 = Stage()
for i in range(10):
t1 = Task()
t1.executable = 'sleep'
t1.arguments = [ '30']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add post-exec to the Stage
s1.post_exec = func_post
# Add Stage to the Pipeline
p.add_stages(s1)
return p
if __name__ == '__main__':
# Create a dictionary describe four mandatory keys:
# resource, walltime, cores and project
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 15,
'cpus': 2,
}
# Create Application Manager
appman = AppManager()
appman.resource_desc = res_dict
p = generate_pipeline()
# Assign the workflow as a set of Pipelines to the Application Manager
appman.workflow = [p]
# Run the Application Manager
appman.run()
Adaptive applications: Task-order
We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or
find it in your virtualenv under share/radical.entk/advanced/scripts
.
For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
def func_post():
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
...
s = Stage()
s.post_exec = func_post
In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage that each run ‘sleep 30’. After a Stage is DONE (i.e. all tasks of the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the remaining stages are shuffled. If not, no operation is performed.
python adapt_to.py
Let’s take a look at the complete code in the example. You can generate a more
verbose output by setting the environment variable
RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
def generate_pipeline():
def func_post():
global CUR_NEW_STAGE, MAX_NEW_STAGE
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
CUR_NEW_STAGE += 1
shuffle(p.stages[CUR_NEW_STAGE:])
else:
print('Done')
# Create a Pipeline object
p = Pipeline()
for s in range(MAX_NEW_STAGE+1):
# Create a Stage object
s1 = Stage()
for i in range(CUR_TASKS):
t1 = Task()
t1.executable = '/bin/sleep'
t1.arguments = [ '30']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add post-exec to the Stage
s1.post_exec = func_post
# Add Stage to the Pipeline
p.add_stages(s1)
return p
if __name__ == '__main__':
# Create a dictionary describe four mandatory keys:
# resource, walltime, cores and project
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 15,
'cpus': 2,
}
# Create Application Manager
appman = AppManager()
appman.resource_desc = res_dict
p = generate_pipeline()
# Assign the workflow as a set of Pipelines to the Application Manager
appman.workflow = [p]
# Run the Application Manager
appman.run()
Adaptive applications: Task-attribute
We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in your virtualenv under
share/radical.entk/advanced/scripts
.
For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
s1.post_exec = func_post
def func_post():
global CUR_NEW_STAGE, MAX_NEW_STAGE
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
CUR_NEW_STAGE += 1
s = Stage()
for i in range(10):
t = Task()
t.executable = '/bin/sleep'
t.arguments = ['30']
s.add_tasks(t)
s.post_exec = func_condition
p.add_stages(s)
In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage, each running ‘sleep 30’. After a Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the arguments of the tasks of the next Stage are modified. If not, no operation is performed, and the pipeline will terminate.
python adapt_ta.py
Let’s take a look at the complete code in the example. Note: You can generate
a more verbose output by setting the environment variable
RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
def generate_pipeline():
def func_post():
global CUR_NEW_STAGE, MAX_NEW_STAGE
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
CUR_NEW_STAGE += 1
for t in p.stages[CUR_NEW_STAGE].tasks:
dur = randint(10,30)
t.arguments = [str(dur)]
else:
print('Done')
# Create a Pipeline object
p = Pipeline()
for s in range(MAX_NEW_STAGE+1):
# Create a Stage object
s1 = Stage()
for _ in range(CUR_TASKS):
t1 = Task()
t1.executable = '/bin/sleep'
t1.arguments = [ '30']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add post-exec to the Stage
s1.post_exec = func_post
# Add Stage to the Pipeline
p.add_stages(s1)
return p
if __name__ == '__main__':
# Create a dictionary describe four mandatory keys:
# resource, walltime, cores and project
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 15,
'cpus': 2,
}
# Create Application Manager
appman = AppManager()
appman.resource_desc = res_dict
p = generate_pipeline()
# Assign the workflow as a set of Pipelines to the Application Manager
appman.workflow = [p]
# Run the Application Manager
appman.run()
API Reference for Users
Application Creation components API
Pipeline API
- class radical.entk.Pipeline[source]
A pipeline represents a collection of objects that have a linear temporal execution order. In this case, a pipeline consists of multiple ‘Stage’ objects. Each
`Stage_i`
can execute only after all stages up to`Stage_(i-1)`
have completed execution.- add_stages(value)[source]
Appends stages to the current Pipeline
- Argument
List of Stage objects
- as_dict()[source]
Convert current Pipeline (i.e. its attributes) into a dictionary
- Returns
python dictionary
- property completed
Returns whether the Pipeline has completed
- Returns
Boolean
- property current_stage
Returns the current stage being executed
- Returns
Integer
- from_dict(d)[source]
Create a Pipeline from a dictionary. The change is in inplace.
- Argument
python dictionary
- Returns
None
- property lock
Returns the lock over the current Pipeline
- Returns
Lock object
- property luid
Unique ID of the current pipeline (fully qualified). For the pipeline class, his is an alias to uid.
Example
>>> pipeline.luid pipe.0001
- Getter
Returns the fully qualified uid of the current pipeline
- Type
String
- property name
Name of the pipeline useful for bookkeeping and to refer to this pipeline while data staging. Do not use a ‘,’ or ‘_’ in an object’s name.
- Getter
Returns the name of the pipeline
- Setter
Assigns the name of the pipeline
- Type
String
- resume()[source]
Continue execution of paused stages and tasks.
The resume() method can only be called on a suspended pipeline, an exception will be raised if that condition is not met.
The state of a resumed pipeline will be set to the state the pipeline had before suspension.
- property stages
Stages of the list
- Getter
Returns the stages in the current Pipeline
- Setter
Assigns the stages to the current Pipeline
- Type
List
- property state
Current state of the pipeline
- Getter
Returns the state of the current pipeline
- Type
String
- property state_history
Returns a list of the states obtained in temporal order
- Returns
list
- suspend()[source]
Pause execution of the pipeline: stages and tasks that are executing will continue to execute, but no new stages and tasks will be eligible for execution until resume() is called.
The suspend() method can not be called on a suspended or completed pipeline, doing so will result in an exeption.
The state of the pipeline will be set to SUSPENDED.
- property uid
Unique ID of the current pipeline
- Getter
Returns the unique id of the current pipeline
- Type
String
Stage API
- class radical.entk.Stage[source]
A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of ‘Task’ objects. All tasks of the same stage may execute concurrently.
- add_tasks(value)[source]
Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won’t be added but updated with changes.
- Argument
iterable task object
- as_dict()[source]
Convert current Stage into a dictionary
- Returns
python dictionary
- from_dict(d)[source]
Create a Stage from a dictionary. The change is in inplace.
- Argument
python dictionary
- Returns
None
- property luid
Unique ID of the current stage (fully qualified).
Example
>>> stage.luid pipe.0001.stage.0004
- Getter
Returns the fully qualified uid of the current stage
- Type
String
- property name
Name of the stage. Do not use a ‘,’ or ‘_’ in an object’s name.
- Getter
Returns the name of the current stage
- Setter
Assigns the name of the current stage
- Type
String
- property parent_pipeline
Returns the pipeline this stage belongs to :setter: Assigns the pipeline uid this stage belongs to
- Type
getter
- property post_exec
The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished.
Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon.
Example:
s1.post_exec = func_post
def func_post():
- if condition is met:
s = Stage() t = Task() t.executable = ‘/bin/sleep’ t.arguments = [‘30’] s.add_tasks(t) p.add_stages(s)
- else:
# do nothing pass
- property state
Current state of the stage
- Getter
Returns the state of the current stage
- Type
String
- property state_history
Returns a list of the states obtained in temporal order
- Returns
list
- property tasks
Tasks of the stage
- Getter
Returns all the tasks of the current stage
- Setter
Assigns tasks to the current stage
- Type
set of Tasks
- property uid
Unique ID of the current stage
- Getter
Returns the unique id of the current stage
- Type
String
Task API
- class radical.entk.Task(from_dict=None)[source]
A Task is an abstraction of a computational unit. In this case, a Task consists of its executable along with its required software environment, files to be staged as input and output.
At the user level a Task is to be populated by assigning attributes. Internally, an empty Task is created and population using the from_dict function. This is to avoid creating Tasks with new uid as tasks with new uid offset the uid count file in radical.utils and can potentially affect the profiling if not taken care.
- uid
[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RE if the field is not set, and cannot be re-assigned (immutable attribute).
- name
[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads. Such characters as ‘,’ or ‘_’ are not allowed in a name.
- state
[type: str | default: “DESCRIBED”] Current state of the task, its initial value re.states.INITIAL is set during the task initialization, and this value is also appended into state_history attribute.
- state_history
[type: list | default: [“DESCRIBED”]] List of the states obtained in temporal order. Every time new state is set, it is appended to state_history automatically.
- executable
[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.
- arguments
[type: list | default: []] List of arguments to be supplied to the executable.
- environment
[type: dict | default: {}] Environment variables to set in the
environment before the execution process.
- sandbox
[type: str | default: “”] This specifies the working directory of the task. By default, the task’s uid is used as a sandbox for the task.
- pre_launch
[type: list | default: []] List of actions (shell commands) to perform before the task is launched.
Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of resources! Deviating from that rule will likely result in reduced overall throughput.
- post_launch
[type: list | default: []] List of actions (shell commands) to perform after the task finishes.
The same remarks as on pre_launch apply
- pre_exec
[type: list | default: []] List of actions (shell commands) to perform after task is launched, but before rank(s) starts execution.
The same remarks as on pre_launch apply
- post_exec
[type: list | default: []] List of actions (shell commands) to perform right after rank(s) finishes.
The same remarks as on pre_launch apply
- cpu_reqs
[type: CpuReqs | default: CpuReqs()] The CPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.
The expected format is dict-like:
- task.cpu_reqs = {‘cpu_processes’X,
‘cpu_process_type’ : None/’MPI’, ‘cpu_threads’ : Y, ‘cpu_thread_type’ : None/’OpenMP’}
This description means that the Task is going to spawn X processes and Y threads per each of these processes to run on CPUs. Hence, the total number of cpus required by the Task is X * Y for all the processes and threads to execute concurrently.
By default, 1 CPU process and 1 CPU thread per process are requested.
- gpu_reqs
[type: GpuReqs | default: GpuReqs()] The GPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.
The expected format is dict-like:
- task.gpu_reqs = {‘gpu_processes’X,
‘gpu_process_type’ : None/’CUDA’/’ROCm’, ‘gpu_threads’ : Y, ‘gpu_thread_type’ : None}
This description means that each rank of the task is going to use X GPUs with Y GPU-threads. By default, 0 GPUs are requested.
- lfs_per_process
[type: int | default: 0] Local File Storage per process - amount of space (MB) required on the local file system of the node by the task.
- mem_per_process
[type: int | default: 0] Amount of memory required by the task.
- upload_input_data
[type: list | default: []] List of file names required to be transferred from a local client storage to the location of the task or data staging area before task starts.
- copy_input_data
[type: list | default: []] List of file names required to be copied from another task-/pilot-sandbox to a current task (or data staging area) before task starts.
Example of a file location format:
$Pipeline_%s_Stage_%s_Task_%s, where %s is replaced entity name/uid
# output.txt is copied from a t1 task to a current task sandbox t2.copy_input_data = [‘$Pipeline_%s_Stage_%s_Task_%s/output.txt’ %
(p.name, s1.name, t1.name)]
- link_input_data
[type: list | default: []] List of file names required to be symlinked from another task-/pilot-sandbox to a current task (or data staging area) before task starts.
- move_input_data
[type: list | default: []] List of file names required to be moved from another task-/pilot-sandbox to a current task (or data staging area) before task starts.
- copy_output_data
[type: list | default: []] List of file names required to be copied from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.
Example of defining data to be copied:
# results.txt will be copied to a data staging area $SHARED t.copy_output_data = [‘results.txt > $SHARED/results.txt’]
- link_output_data
[type: list | default: []] List of file names required to be symlinked from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.
- move_output_data
[type: list | default: []] List of file names required to be moved from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.
- download_output_data
[type: list | default: []] List of file names required to be downloaded from a current task to a local client storage space when a task is finished.
Example of defining data to be downloaded:
# results.txt is transferred to a local client storage space t.download_output_data = [‘results.txt’]
- stdout
[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.
- stderr
[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.
- stage_on_error
[type: bool | default: False] Flag to allow staging out data if task got failed (output staging is normally skipped on FAILED or CANCELED tasks).
- exit_code
[type: int | default: None] Get the exit code for finished tasks: 0 - for successful tasks; 1 - for failed tasks.
- exception
[type: str | default: None] Get the representation of the exception which caused the task to fail.
- exception_detail
[type: str | default: None] Get additional details (traceback or error messages) to the exception which caused this task to fail.
- path
[type: str | default: “”] Get the path of the task on the remote machine. Useful to reference files generated in the current task.
- tags
[type: dict | default: None] The tags for the task that can be used while scheduling by the RTS (configuration specific tags, which influence task scheduling and execution, e.g., tasks co-location).
- rts_uid
[type: str | default: None] Unique RTS ID of the current task.
- parent_stage
[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the stage, which contains the current task.
- parent_pipeline
[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the pipeline, which contains the current task.
- annotations
[type: Annotations | default: None] Annotations to describe task’s input and output files, and sets dependencies between tasks.
Read-only attributes
- property luid
[type: str] Unique ID of the current task (fully qualified).
> task.luid pipe.0001.stage.0004.task.0234
- annotate(inputs: Optional[Union[Dict, List, str]] = None, outputs: Optional[Union[List, str]] = None) None [source]
Adds dataflow annotations with provided input and output files, and defines dependencies between tasks.
- inputs
List of input files. If a file is produced by the previously executed task, then the corresponding input element is provided as a dictionary with the task instance as a key. Example: inputs=[‘file1’, {task0: ‘file2’, task1: [‘file2’]}]
- Type
list, optional
- outputs
List of produced/generated files.
- Type
list, optional
- from_dict(d)[source]
Re-initialization, resets all attributes with provided input data.
- property luid
Unique ID of the current task (fully qualified).
Example
> task.luid pipe.0001.stage.0004.task.123456
- Luid
Returns the fully qualified uid of the current task
- Type
str
Application Manager API
- class radical.entk.AppManager(config_path=None, reattempts=None, resubmit_failed=None, autoterminate=None, write_workflow=None, rts=None, rts_config=None, name=None, base_path=None, **kwargs)[source]
An application manager takes the responsibility of setting up the communication infrastructure, instantiates the ResourceManager, TaskManager, WFProcessor objects and all their threads and processes. This is the Master object running in the main process and is designed to recover from errors from all other objects, threads and processes.
- Arguments
- config_path
Url to config path to be read for AppManager
- reattempts
number of attempts to re-invoke any failed EnTK components
- resubmit_failed
resubmit failed tasks (True/False)
- autoterminate
terminate resource reservation upon execution of all tasks of first workflow (True/False)
- write_workflow
write workflow and mapping to rts entities to a file (post-termination)
- rts
Specify RTS to use. Current options: ‘mock’, ‘radical.pilot’ (default if unspecified)
- rts_config
Configuration for the RTS, accepts {‘sandbox_cleanup’: True/False,’db_cleanup’: True/False} when RTS is RP
- name
Name of the Application. It should be unique between executions. (default is randomly assigned)
- base_path
base path of logger and profiler with a session id, None (default) will use a current working directory
- property name
Name for the application manager. Allows the user to setup the name of the application manager, as well as its session ID. This name should be unique between different EnTK executions, otherwise it will produce an error.
- Getter
Returns the name of the application manager
- Setter
Assigns the name of the application manager
- Type
String
- property outputs
- Return list of filenames that are to be staged out after
execution
- Setter
Assign a list of names of files that need to be staged from the remote machine
- Type
getter
- property resource_desc
The resource description is a dictionary that holds information about the resource(s) that will be used to execute the workflow.
- The following keys are mandatory in all resource descriptions:
- ‘resource’ : Label of the platform with resources.‘walltime’ : Amount of time the workflow is expected to run.‘cpus’ : Number of CPU cores/threads.
- Optional keys include:
- ‘project’ : The project that will be charged.‘gpus’ : Number of GPUs to be used by the workflow.‘access_schema’ : The key of an access mechanism to use.‘queue’ : The name of the batch queue.‘job_name’ : The specific name of a batch job.
- Getter
Returns the resource description
- Setter
Assigns a resource description
- run()[source]
Purpose: Run the application manager. Once the workflow and resource manager have been assigned. Invoking this method will start the setting up the communication infrastructure, submitting a resource request and then submission of all the tasks.
- property services
Returns the list of tasks used to start “global” services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution
- Type
getter
- property shared_data
- Return list of filenames that are shared between multiple tasks
of the application
- Setter
Assign a list of names of files that need to be staged to the remote machine
- Type
getter
- property sid
Get the session ID of the current EnTK execution
- Getter
Returns the session ID of the EnTK execution
- Type
String
- property workflow
Return the last workflow assigned for execution :setter: Assign a new workflow to be executed
- Type
getter
- property workflows
Return a list of workflows assigned for execution
- Type
getter
Exceptions
In this document, we document all the exception types that are covered by Ensemble Toolkit. This document is to be assumed as a growing list and not complete.
- exception radical.entk.exceptions.EnTKError[source]
EnTKError is the generic exception type used by EnTK – exception arg messages are usually
- exception radical.entk.exceptions.EnTKMissingError(obj, missing_attribute)[source]
MissingError is raised when an attribute that is mandatory is left unassigned by the user
Developer Documentation
UML diagrams and other resources
This page presents the UML diagrams for the Ensemble Toolkit. Additional information to help understand the design of Ensemble Toolkit is also presented for interested developers.
Class Diagram
The following document describes the classes in EnTK, their data members and functional members.

Sequence Diagram
The interaction of these modules for one successful run of an application is described in the following figure:

State Diagram
The stateful objects Pipeline, Stage and Task undergo several state transition during the execution of a workflow. We document them in the following figure:

Events Recorded
Following is a list of events recorded in each module (in temporal order). These events can be used to profile durations between two events or a state and an event.
Resource Manager
create rmgr
validating rdesc
rdesc validated
populating rmgr
rmgr populated
rmgr created
creating rreq
rreq created
rreq submitted
resource active
canceling resource allocation
resource allocation canceled
App Manager
create amgr
amgr created
assigning workflow
validating workflow
workflow validated
amgr run started
init mqs setup
mqs setup done
init rreq submission
starting synchronizer thread
creating wfp obj
creating tmgr obj
synchronizer thread started
start termination
terminating synchronizer
termination done
WFprocessor
create wfp obj
wfp obj created
creating wfp process
starting wfp process
wfp process started
creating dequeue-thread
starting dequeue-thread
creating enqueue-thread
starting enqueue-thread
dequeue-thread started
enqueue-thread started
terminating dequeue-thread
terminating enqueue-thread
termination done
terminating wfp process
wfp process terminated
TaskManager
create tmgr obj
tmgr obj created
creating heartbeat thread
starting heartbeat thread
heartbeat thread started
creating tmgr process
starting tmgr process
tmgr process started
tmgr infrastructure setup done
cud from task - create
cud from task - done
task from cu - create
task from cu - done
terminating tmgr process
tmgr process terminated
API Reference for Developers
Application Manager API
- class radical.entk.AppManager(config_path=None, reattempts=None, resubmit_failed=None, autoterminate=None, write_workflow=None, rts=None, rts_config=None, name=None, base_path=None, **kwargs)[source]
An application manager takes the responsibility of setting up the communication infrastructure, instantiates the ResourceManager, TaskManager, WFProcessor objects and all their threads and processes. This is the Master object running in the main process and is designed to recover from errors from all other objects, threads and processes.
- Arguments
- config_path
Url to config path to be read for AppManager
- reattempts
number of attempts to re-invoke any failed EnTK components
- resubmit_failed
resubmit failed tasks (True/False)
- autoterminate
terminate resource reservation upon execution of all tasks of first workflow (True/False)
- write_workflow
write workflow and mapping to rts entities to a file (post-termination)
- rts
Specify RTS to use. Current options: ‘mock’, ‘radical.pilot’ (default if unspecified)
- rts_config
Configuration for the RTS, accepts {‘sandbox_cleanup’: True/False,’db_cleanup’: True/False} when RTS is RP
- name
Name of the Application. It should be unique between executions. (default is randomly assigned)
- base_path
base path of logger and profiler with a session id, None (default) will use a current working directory
- _get_message_to_sync(qname)[source]
Reads a message from the queue, and exchange the message to where it was published by update_task
- _read_config(config_path, reattempts, resubmit_failed, autoterminate, write_workflow, rts, rts_config, base_path)[source]
- _setup_zmq()[source]
Purpose: Setup ZMQ system on the client side. We instantiate queue(s) ‘pendingq-’ for communication between the enqueuer thread and the task manager process. We instantiate queue(s) ‘completedq-’ for communication between the task manager and dequeuer thread. We instantiate queue ‘sync-to-master’ for communication from enqueuer/dequeuer/task_manager to the synchronizer thread. We instantiate queue ‘sync-ack’ for communication from synchronizer thread to enqueuer/dequeuer/task_manager.
- _submit_rts_tmgr(rts_info)[source]
Purpose: Update the runtime system information in the task manager
- _synchronizer_work()[source]
- Purpose: Thread in the master process to keep the workflow data
structure in appmanager up to date. We receive only tasks objects from the task manager.
- Details: Important to note that acknowledgements of the type
channel.basic_ack() is an acknowledgement to the server that the msg was received. This is not to be confused with the Ack sent to the task_manager through the sync-ack queue.
- property name
Name for the application manager. Allows the user to setup the name of the application manager, as well as its session ID. This name should be unique between different EnTK executions, otherwise it will produce an error.
- Getter
Returns the name of the application manager
- Setter
Assigns the name of the application manager
- Type
String
- property outputs
- Return list of filenames that are to be staged out after
execution
- Setter
Assign a list of names of files that need to be staged from the remote machine
- Type
getter
- property resource_desc
The resource description is a dictionary that holds information about the resource(s) that will be used to execute the workflow.
- The following keys are mandatory in all resource descriptions:
- ‘resource’ : Label of the platform with resources.‘walltime’ : Amount of time the workflow is expected to run.‘cpus’ : Number of CPU cores/threads.
- Optional keys include:
- ‘project’ : The project that will be charged.‘gpus’ : Number of GPUs to be used by the workflow.‘access_schema’ : The key of an access mechanism to use.‘queue’ : The name of the batch queue.‘job_name’ : The specific name of a batch job.
- Getter
Returns the resource description
- Setter
Assigns a resource description
- run()[source]
Purpose: Run the application manager. Once the workflow and resource manager have been assigned. Invoking this method will start the setting up the communication infrastructure, submitting a resource request and then submission of all the tasks.
- property services
Returns the list of tasks used to start “global” services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution
- Type
getter
- Return list of filenames that are shared between multiple tasks
of the application
- Setter
Assign a list of names of files that need to be staged to the remote machine
- Type
getter
- property sid
Get the session ID of the current EnTK execution
- Getter
Returns the session ID of the EnTK execution
- Type
String
- property workflow
Return the last workflow assigned for execution :setter: Assign a new workflow to be executed
- Type
getter
- property workflows
Return a list of workflows assigned for execution
- Type
getter
Pipeline API
- class radical.entk.Pipeline[source]
A pipeline represents a collection of objects that have a linear temporal execution order. In this case, a pipeline consists of multiple ‘Stage’ objects. Each
`Stage_i`
can execute only after all stages up to`Stage_(i-1)`
have completed execution.- _validate()[source]
Purpose: Validate that the state of the current Pipeline is ‘DESCRIBED’ (user has not meddled with it). Also validate that the current Pipeline contains Stages.
- classmethod _validate_entities(stages)[source]
Purpose: Validate whether the argument ‘stages’ is of list of Stage objects
- Argument
list of Stage objects
- as_dict()[source]
Convert current Pipeline (i.e. its attributes) into a dictionary
- Returns
python dictionary
- property completed
Returns whether the Pipeline has completed
- Returns
Boolean
- property current_stage
Returns the current stage being executed
- Returns
Integer
- from_dict(d)[source]
Create a Pipeline from a dictionary. The change is in inplace.
- Argument
python dictionary
- Returns
None
- property lock
Returns the lock over the current Pipeline
- Returns
Lock object
- property luid
Unique ID of the current pipeline (fully qualified). For the pipeline class, his is an alias to uid.
Example
>>> pipeline.luid pipe.0001
- Getter
Returns the fully qualified uid of the current pipeline
- Type
String
- property name
Name of the pipeline useful for bookkeeping and to refer to this pipeline while data staging. Do not use a ‘,’ or ‘_’ in an object’s name.
- Getter
Returns the name of the pipeline
- Setter
Assigns the name of the pipeline
- Type
String
- resume()[source]
Continue execution of paused stages and tasks.
The resume() method can only be called on a suspended pipeline, an exception will be raised if that condition is not met.
The state of a resumed pipeline will be set to the state the pipeline had before suspension.
- property stages
Stages of the list
- Getter
Returns the stages in the current Pipeline
- Setter
Assigns the stages to the current Pipeline
- Type
List
- property state
Current state of the pipeline
- Getter
Returns the state of the current pipeline
- Type
String
- property state_history
Returns a list of the states obtained in temporal order
- Returns
list
- suspend()[source]
Pause execution of the pipeline: stages and tasks that are executing will continue to execute, but no new stages and tasks will be eligible for execution until resume() is called.
The suspend() method can not be called on a suspended or completed pipeline, doing so will result in an exeption.
The state of the pipeline will be set to SUSPENDED.
- property uid
Unique ID of the current pipeline
- Getter
Returns the unique id of the current pipeline
- Type
String
Base ResourceManager API
- class radical.entk.execman.base.resource_manager.Base_ResourceManager(resource_desc, sid, rts, rts_config)[source]
A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs.
- Arguments
- resource_desc
dictionary with details of the resource request and access credentials of the user
- example
resource_desc = { | ‘resource’ : ‘xsede.stampede’, | ‘walltime’ : 120, | ‘cpus’ : 64, | ‘gpus’ : 0, # optional | ‘project’ : ‘TG-abcxyz’, | ‘memory’ : ‘16000’ # optional | ‘queue’ : ‘abc’, # optional | ‘access_schema’ : ‘ssh’ # optional | ‘job_name’ : ‘test_job’ # optional}
- _populate()[source]
- Purpose: Populate the ResourceManager class with the validated
resource description
- _terminate_resource_request()[source]
- Purpose: Cancel resource request by terminating any reservation
on any acquired resources or resources pending acquisition
- property access_schema
Return user specified access schema – ‘ssh’ or ‘gsissh’ or None
- Type
getter
- property cpus
Return user specified number of cpus
- Type
getter
- property gpus
Return user specified number of gpus
- Type
getter
- property job_name
Return user specified job_name
- Type
getter
- property memory
Return user specified amount of memory
- Type
getter
- property outputs
list of files to be staged from remote after execution :setter: Assign a list of names of files that need to be staged from the
remote machine
- Type
getter
- property project
Return user specified project ID
- Type
getter
- property queue
Return user specified resource queue to be used
- Type
getter
- property resource
Return user specified resource name
- Type
getter
- property services
Returns the list of tasks used to start “global” services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution
- Type
getter
- list of files to be staged to remote and that are common to
multiple tasks
- Setter
Assign a list of names of files that need to be accessible to tasks
- Type
getter
- property walltime
Return user specified walltime
- Type
getter
RP ResourceManager API
- class radical.entk.execman.rp.resource_manager.ResourceManager(resource_desc, sid, rts_config)[source]
A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs. This ResourceManager uses the RADICAL Pilot as the underlying runtime system.
- Arguments
- resource_desc
dictionary with details of the resource request and access credentials of the user
- example
resource_desc = { | ‘resource’ : ‘xsede.stampede’, | ‘walltime’ : 120, | ‘cpus’ : 64, | ‘project’ : ‘TG-abcxyz’, | ‘queue’ : ‘abc’, # optional | ‘access_schema’ : ‘ssh’ # optional}
- property pilot
Return the submitted Pilot
- Type
getter
- property pmgr
Return the Radical Pilot manager currently being used
- Type
getter
- property session
Return the Radical Pilot session currently being used
- Type
getter
Dummy ResourceManager API
- class radical.entk.execman.mock.resource_manager.ResourceManager(resource_desc, sid, rts_config)[source]
A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs. This ResourceManager uses mocks an implementation by doing nothing, it is only usable for testing.
- Arguments
- resource_desc
dictionary with details of the resource request and access credentials of the user
- example
resource_desc = { | ‘resource’ : ‘xsede.stampede’, | ‘walltime’ : 120, | ‘cpus’ : 64, | ‘project’ : ‘TG-abcxyz’, | ‘queue’ : ‘abc’, # optional | ‘access_schema’ : ‘ssh’ # optional}
Stage API
- class radical.entk.Stage[source]
A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of ‘Task’ objects. All tasks of the same stage may execute concurrently.
- _check_stage_complete()[source]
- Purpose: Check if all tasks of the current stage have completed, i.e.,
are in either DONE or FAILED state.
- _set_tasks_state(value)[source]
Purpose: Set state of all tasks of the current stage.
- Arguments
String
- _validate()[source]
- Purpose: Validate that the state of the current Stage is ‘DESCRIBED’
(user has not meddled with it). Also validate that the current Stage contains Tasks
- classmethod _validate_entities(tasks)[source]
- Purpose: Validate whether the ‘tasks’ is of type set. Validate the
description of each Task.
- add_tasks(value)[source]
Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won’t be added but updated with changes.
- Argument
iterable task object
- from_dict(d)[source]
Create a Stage from a dictionary. The change is in inplace.
- Argument
python dictionary
- Returns
None
- property luid
Unique ID of the current stage (fully qualified).
Example
>>> stage.luid pipe.0001.stage.0004
- Getter
Returns the fully qualified uid of the current stage
- Type
String
- property name
Name of the stage. Do not use a ‘,’ or ‘_’ in an object’s name.
- Getter
Returns the name of the current stage
- Setter
Assigns the name of the current stage
- Type
String
- property parent_pipeline
Returns the pipeline this stage belongs to :setter: Assigns the pipeline uid this stage belongs to
- Type
getter
- property post_exec
The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished.
Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon.
Example:
s1.post_exec = func_post
def func_post():
- if condition is met:
s = Stage() t = Task() t.executable = ‘/bin/sleep’ t.arguments = [‘30’] s.add_tasks(t) p.add_stages(s)
- else:
# do nothing pass
- property state
Current state of the stage
- Getter
Returns the state of the current stage
- Type
String
- property state_history
Returns a list of the states obtained in temporal order
- Returns
list
- property tasks
Tasks of the stage
- Getter
Returns all the tasks of the current stage
- Setter
Assigns tasks to the current stage
- Type
set of Tasks
- property uid
Unique ID of the current stage
- Getter
Returns the unique id of the current stage
- Type
String
Task API
- class radical.entk.Task(from_dict=None)[source]
A Task is an abstraction of a computational unit. In this case, a Task consists of its executable along with its required software environment, files to be staged as input and output.
At the user level a Task is to be populated by assigning attributes. Internally, an empty Task is created and population using the from_dict function. This is to avoid creating Tasks with new uid as tasks with new uid offset the uid count file in radical.utils and can potentially affect the profiling if not taken care.
- uid
[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RE if the field is not set, and cannot be re-assigned (immutable attribute).
- name
[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads. Such characters as ‘,’ or ‘_’ are not allowed in a name.
- state
[type: str | default: “DESCRIBED”] Current state of the task, its initial value re.states.INITIAL is set during the task initialization, and this value is also appended into state_history attribute.
- state_history
[type: list | default: [“DESCRIBED”]] List of the states obtained in temporal order. Every time new state is set, it is appended to state_history automatically.
- executable
[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.
- arguments
[type: list | default: []] List of arguments to be supplied to the executable.
- environment
[type: dict | default: {}] Environment variables to set in the
environment before the execution process.
- sandbox
[type: str | default: “”] This specifies the working directory of the task. By default, the task’s uid is used as a sandbox for the task.
- pre_launch
[type: list | default: []] List of actions (shell commands) to perform before the task is launched.
Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of resources! Deviating from that rule will likely result in reduced overall throughput.
- post_launch
[type: list | default: []] List of actions (shell commands) to perform after the task finishes.
The same remarks as on pre_launch apply
- pre_exec
[type: list | default: []] List of actions (shell commands) to perform after task is launched, but before rank(s) starts execution.
The same remarks as on pre_launch apply
- post_exec
[type: list | default: []] List of actions (shell commands) to perform right after rank(s) finishes.
The same remarks as on pre_launch apply
- cpu_reqs
[type: CpuReqs | default: CpuReqs()] The CPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.
The expected format is dict-like:
- task.cpu_reqs = {‘cpu_processes’X,
‘cpu_process_type’ : None/’MPI’, ‘cpu_threads’ : Y, ‘cpu_thread_type’ : None/’OpenMP’}
This description means that the Task is going to spawn X processes and Y threads per each of these processes to run on CPUs. Hence, the total number of cpus required by the Task is X * Y for all the processes and threads to execute concurrently.
By default, 1 CPU process and 1 CPU thread per process are requested.
- gpu_reqs
[type: GpuReqs | default: GpuReqs()] The GPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.
The expected format is dict-like:
- task.gpu_reqs = {‘gpu_processes’X,
‘gpu_process_type’ : None/’CUDA’/’ROCm’, ‘gpu_threads’ : Y, ‘gpu_thread_type’ : None}
This description means that each rank of the task is going to use X GPUs with Y GPU-threads. By default, 0 GPUs are requested.
- lfs_per_process
[type: int | default: 0] Local File Storage per process - amount of space (MB) required on the local file system of the node by the task.
- mem_per_process
[type: int | default: 0] Amount of memory required by the task.
- upload_input_data
[type: list | default: []] List of file names required to be transferred from a local client storage to the location of the task or data staging area before task starts.
- copy_input_data
[type: list | default: []] List of file names required to be copied from another task-/pilot-sandbox to a current task (or data staging area) before task starts.
Example of a file location format:
$Pipeline_%s_Stage_%s_Task_%s, where %s is replaced entity name/uid
# output.txt is copied from a t1 task to a current task sandbox t2.copy_input_data = [‘$Pipeline_%s_Stage_%s_Task_%s/output.txt’ %
(p.name, s1.name, t1.name)]
- link_input_data
[type: list | default: []] List of file names required to be symlinked from another task-/pilot-sandbox to a current task (or data staging area) before task starts.
- move_input_data
[type: list | default: []] List of file names required to be moved from another task-/pilot-sandbox to a current task (or data staging area) before task starts.
- copy_output_data
[type: list | default: []] List of file names required to be copied from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.
Example of defining data to be copied:
# results.txt will be copied to a data staging area $SHARED t.copy_output_data = [‘results.txt > $SHARED/results.txt’]
- link_output_data
[type: list | default: []] List of file names required to be symlinked from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.
- move_output_data
[type: list | default: []] List of file names required to be moved from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.
- download_output_data
[type: list | default: []] List of file names required to be downloaded from a current task to a local client storage space when a task is finished.
Example of defining data to be downloaded:
# results.txt is transferred to a local client storage space t.download_output_data = [‘results.txt’]
- stdout
[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.
- stderr
[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.
- stage_on_error
[type: bool | default: False] Flag to allow staging out data if task got failed (output staging is normally skipped on FAILED or CANCELED tasks).
- exit_code
[type: int | default: None] Get the exit code for finished tasks: 0 - for successful tasks; 1 - for failed tasks.
- exception
[type: str | default: None] Get the representation of the exception which caused the task to fail.
- exception_detail
[type: str | default: None] Get additional details (traceback or error messages) to the exception which caused this task to fail.
- path
[type: str | default: “”] Get the path of the task on the remote machine. Useful to reference files generated in the current task.
- tags
[type: dict | default: None] The tags for the task that can be used while scheduling by the RTS (configuration specific tags, which influence task scheduling and execution, e.g., tasks co-location).
- rts_uid
[type: str | default: None] Unique RTS ID of the current task.
- parent_stage
[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the stage, which contains the current task.
- parent_pipeline
[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the pipeline, which contains the current task.
- annotations
[type: Annotations | default: None] Annotations to describe task’s input and output files, and sets dependencies between tasks.
Read-only attributes
- property luid
[type: str] Unique ID of the current task (fully qualified).
> task.luid pipe.0001.stage.0004.task.0234
- _cast = True
- _check = True
- _defaults = {'arguments': [], 'copy_input_data': [], 'copy_output_data': [], 'cpu_reqs': {'cpu_process_type': None, 'cpu_processes': 1, 'cpu_thread_type': None, 'cpu_threads': 1}, 'download_output_data': [], 'environment': {}, 'exception': None, 'exception_detail': None, 'executable': '', 'exit_code': None, 'gpu_reqs': {'gpu_process_type': None, 'gpu_processes': 0.0, 'gpu_thread_type': None, 'gpu_threads': 1}, 'lfs_per_process': 0, 'link_input_data': [], 'link_output_data': [], 'mem_per_process': 0, 'move_input_data': [], 'move_output_data': [], 'name': '', 'parent_pipeline': {'name': None, 'uid': None}, 'parent_stage': {'name': None, 'uid': None}, 'path': '', 'post_exec': [], 'post_launch': [], 'pre_exec': [], 'pre_launch': [], 'rts_uid': None, 'sandbox': '', 'stage_on_error': False, 'state': '', 'state_history': [], 'stderr': '', 'stdout': '', 'tags': None, 'uid': '', 'upload_input_data': []}
- _schema = {'annotations': <class 'radical.entk.task.Annotations'>, 'arguments': [<class 'str'>], 'copy_input_data': [<class 'str'>], 'copy_output_data': [<class 'str'>], 'cpu_reqs': <class 'radical.entk.task.CpuReqs'>, 'download_output_data': [<class 'str'>], 'environment': {<class 'str'>: <class 'str'>}, 'exception': <class 'str'>, 'exception_detail': <class 'str'>, 'executable': <class 'str'>, 'exit_code': <class 'int'>, 'gpu_reqs': <class 'radical.entk.task.GpuReqs'>, 'lfs_per_process': <class 'int'>, 'link_input_data': [<class 'str'>], 'link_output_data': [<class 'str'>], 'mem_per_process': <class 'int'>, 'move_input_data': [<class 'str'>], 'move_output_data': [<class 'str'>], 'name': <class 'str'>, 'parent_pipeline': {<class 'str'>: None}, 'parent_stage': {<class 'str'>: None}, 'path': <class 'str'>, 'post_exec': [<class 'str'>], 'post_launch': [<class 'str'>], 'pre_exec': [<class 'str'>], 'pre_launch': [<class 'str'>], 'rts_uid': <class 'str'>, 'sandbox': <class 'str'>, 'stage_on_error': <class 'bool'>, 'state': <class 'str'>, 'state_history': [<class 'str'>], 'stderr': <class 'str'>, 'stdout': <class 'str'>, 'tags': {<class 'str'>: None}, 'uid': <class 'str'>, 'upload_input_data': [<class 'str'>]}
- _self_default = False
- _uids = []
- _validate()[source]
Purpose: Validate that the state of the task is ‘DESCRIBED’ and that an executable has been specified for the task.
- annotate(inputs: Optional[Union[Dict, List, str]] = None, outputs: Optional[Union[List, str]] = None) None [source]
Adds dataflow annotations with provided input and output files, and defines dependencies between tasks.
- inputs
List of input files. If a file is produced by the previously executed task, then the corresponding input element is provided as a dictionary with the task instance as a key. Example: inputs=[‘file1’, {task0: ‘file2’, task1: [‘file2’]}]
- Type
list, optional
- outputs
List of produced/generated files.
- Type
list, optional
- property luid
Unique ID of the current task (fully qualified).
Example
> task.luid pipe.0001.stage.0004.task.123456
- Luid
Returns the fully qualified uid of the current task
- Type
str
Base TaskManager API
- class radical.entk.execman.base.task_manager.Base_TaskManager(sid, rmgr, rts, zmq_info)[source]
A Task Manager takes the responsibility of dispatching tasks it receives from a ‘pending’ queue for execution on to the available resources using a runtime system. Once the tasks have completed execution, they are pushed on to the completed queue for other components of EnTK to thread.
- Arguments
- rmgr
(ResourceManager) Object to be used to access the Pilot where the tasks can be submitted
Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption.
- _tmgr(uid, rmgr, zmq_info)[source]
- Purpose: Method to be run by the tmgr thread. This method receives
a Task from the ‘pending’ queue and submits it to the RTS. At all state transititons, they are synced (blocking) with the AppManager in the master thread.
- Details: The AppManager can re-invoke the tmgr thread with this
function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholder_dict, which stores the path of each of the tasks on the remote machine.
RP TaskManager API
- class radical.entk.execman.rp.task_manager.TaskManager(sid, rmgr, zmq_info)[source]
A Task Manager takes the responsibility of dispatching tasks it receives from a queue for execution on to the available resources using a runtime system. In this case, the runtime system being used RADICAL Pilot. Once the tasks have completed execution, they are pushed on to another queue for other components of EnTK to access.
- Arguments
- rmgr
(ResourceManager) Object to be used to access the Pilot where the tasks can be submitted
Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption.
- _process_tasks(task_queue, tmgr)[source]
- Purpose: The new thread that gets spawned by the main tmgr thread
invokes this function. This function receives tasks from ‘task_queue’ and submits them to the RADICAL Pilot RTS.
- _tmgr(uid, rmgr, zmq_info)[source]
- Purpose: This method has 2 purposes: receive tasks from the
‘pending’ queue, start a new thread that processes these tasks and submits to the RTS.
The new thread is responsible for pushing completed tasks (returned by the RTS) to the dequeueing queue. It also converts Tasks into TDs and CUs into (partially described) Tasks. This conversion is necessary since the current RTS is RADICAL Pilot. Once Tasks are recovered from a CU, they are then pushed to the completed queue. At all state transitions, they are synced (blocking) with the AppManager in the master process.
- Details: The AppManager can re-invoke the tmgr thread with this
function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholders, which stores the path of each of the tasks on the remote machine.
Dummy TaskManager API
- class radical.entk.execman.mock.task_manager.TaskManager(sid, rmgr, zmq_info)[source]
A Task Manager takes the responsibility of dispatching tasks it receives from a ‘pending’ queue for execution on to the available resources using a runtime system. Once the tasks have completed execution, they are pushed on to the completed queue for other components of EnTK to process.
- Arguments
- rmgr
(ResourceManager) Object to be used to access the Pilot where the tasks can be submitted
Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption.
- _process_tasks(task_queue, rmgr)[source]
- Purpose: The new thread that gets spawned by the main tmgr process
invokes this function. This function receives tasks from ‘task_queue’ and submits them to the RADICAL Pilot RTS.
- _tmgr(uid, rmgr, zmq_info)[source]
- Purpose: Method to be run by the tmgr process. This method receives
a Task from the ‘pending’ and submits it to the RTS. Currently, it also converts Tasks into CUDs and CUs into (partially described) Tasks. This conversion is necessary since the current RTS is RADICAL Pilot. Once Tasks are recovered from a CU, they are then pushed to the completed queue. At all state transititons, they are synced (blocking) with the AppManager in the master process.
- Details: The AppManager can re-invoke the tmgr process with this
function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholders, which stores the path of each of the tasks on the remote machine.
WFProcessor API
- class radical.entk.appman.wfprocessor.WFprocessor(sid, workflow, resubmit_failed, zmq_info)[source]
An WFprocessor (workflow processor) takes the responsibility of dispatching tasks from the various pipelines of the workflow according to their relative order to the TaskManager. All state updates are relflected in the AppManager as we operate on the reference of the same workflow object. The WFprocessor also retrieves completed tasks from the TaskManager and updates states of PST accordingly.
- Arguments
- sid
(str) session id used by the profiler and loggers
- workflow
(set) REFERENCE of the AppManager’s workflow
- resubmit_failed
(bool) True if failed tasks should be resubmitted
- zmq_info
(dict) zmq queue addresses
- _dequeue()[source]
Purpose: This is the function that is run in the dequeue thread. This function extracts Tasks from the completed queues and updates the workflow.
- _enqueue()[source]
Purpose: This is the function that is run in the enqueue thread. This function extracts Tasks from the workflow that exists in the WFprocessor object and pushes them to the queues in the pending_q list.
- _execute_post_exec(pipe, stage)[source]
Purpose: This method executes the post_exec step of a stage for a pipeline.
- reset_workflow()[source]
When a component is restarted we reset all the tasks that did not finish to the first state. Then they are scheduled again for execution.
- start_processor()[source]
Purpose: Method to start the wfp process. The wfp function is not to be accessed directly. The function is started in a separate process using this method.
- terminate_processor()[source]
Purpose: Method to terminate the wfp process. This method is blocking as it waits for the wfp process to terminate (aka join).
- property workflow
Unit tests & Integration tests
All tests are run on a Travis. Tests are run using pytest and test coverage is measured using coverage and reported using codecov .
Unit tests test the functionality of basic methods and functions offered by EnTK. We thrive to create and include a new test for every new feature offered by EnTK.
Integration tests test the correct communication between different EnTK components and packages and services used by EnTK, such as RADICAL-Pilot.
Writing tests for EnTK requires to follow the test coding guidelines of RADICAL. An example can be found here.