RADICAL-Ensemble Toolkit

The Ensemble Toolkit is a Python library for developing and executing large-scale ensemble-based workflows. It is being developed by the RADICAL Research Group at Rutgers University. Ensemble Toolkit is released under the MIT License.

More details about the science enabled by EnTK can be found in the following publications:

  • Balasubramanian, Vivek, Matteo Turilli, Weiming Hu, Matthieu Lefebvre, Wenjie Lei, Guido Cervone, Jeroen Tromp, and Shantenu Jha. “Harnessing the Power of Many: Extensible Toolkit for Scalable Ensemble Applications.” 32nd IEEE International Parallel and Distributed Processing Symposium, 2018 (https://arxiv.org/abs/1710.08491)

  • Balasubramanian, Vivekanandan, Antons Treikalis, Ole Weidner, and Shantenu Jha. “Ensemble toolkit: Scalable and flexible execution of ensembles of tasks.” In Parallel Processing (ICPP), 2016 45th International Conference on, pp. 458-463. IEEE, 2016. (https://arxiv.org/abs/1602.00678v2)

Note

Please use the following to reference Ensemble Toolkit

Balasubramanian, Vivekanandan, Antons Treikalis, Ole Weidner, and Shantenu Jha. “Ensemble toolkit: Scalable and flexible execution of ensembles of tasks.” In Parallel Processing (ICPP), 2016 45th International Conference on, pp. 458-463. IEEE, 2016.

Project Github Page: https://github.com/radical-cybertools/radical.entk

Mailing Lists

Badges

Build Status Test Coverage PyPI version Conda Version Python version

Introduction

Overview

The Ensemble Toolkit is a Python framework for developing and executing applications comprised of multiple sets of tasks, aka ensembles. Ensemble Toolkit was originally developed with ensemble-based applications in mind. As our understanding of the variety of workflows in scientific application improved, we realized our approach needs to be more generic. Although our motivation remains that of Ensemble-based applications, from EnTK 0.6 onwards, any application where the task workflow can be expressed as a Directed Acyclic Graph, can be supported.

The Ensemble Toolkit has the following unique features: (i) abstractions that enable the expression of various task graphs, (ii) abstraction of resource management and task execution, (iii) Fault tolerance as a first order concern and (iv) well-established runtime capabilities to enable efficient and dynamic usage of grid resources and supercomputers.

We will now discuss the high level design of Ensemble Toolkit in order to understand how an application is created and executed.

Design

Ensemble Toolkit Design - high level

Figure 1: High level design of Ensemble Toolkit

Ensemble toolkit consists of several components that serve different purposes. There are three user level components, namely, Pipeline, Stage and Task, that are used directly by the user. The Pipeline, Stage and Task are components used to create the application by describing its task graph. We will soon take a look into how these can be used to create an application.

The Application Manager is an internal component, that takes a workflow described by the user and converts it into a set of workloads, i.e. tasks with no dependencies by parsing through the workflow and identifying, during runtime, tasks with equivalent or no dependencies. The Application Manager also accepts the description of the resource request (with resource label, walltime, cpus, gpus, user credentials) to be created.

The Execution Manager is the last component in Ensemble Toolkit. It accepts the workloads prepared by the Application Manager and executes them on the specified resource using a Runtime system (RTS). Internally, it consists of two subcomponents: ResourceManager and TaskManager, that are responsible for the allocation, management, and deallocation of resources, and execution management of tasks, respectively. The Execution Manager is currently configured to use RADICAL Pilot (RP) as the runtime system, but can be extended to other RTS.

Ensemble Toolkit uses a runtime system as a framework to simply execute tasks on high performance computing (HPC) platforms. The runtime system is expected to manage the direct interactions with the various software and hardware layers of the HPC platforms, including their heterogeneitys.

More details about how EnTK is designed and implemented can be found here.

Dependencies

Ensemble Toolkit uses RADICAL Pilot (RP) as the runtime system. RP is targeted currently only for a set of high performance computing (HPC) systems (see here). RP can be extended to support more HPC systems by contacting the developers of RP/EnTK or by the user themselves by following this page.

EnTK also has profiling capabilities and uses Pandas dataframes to store the data. Users can use these dataframes to wrangle the data or directly plot the required fields.

Dependencies such as RP and Pandas are automatically installed when installing EnTK.

Five steps to create an application

  1. Use the Pipeline, Stage and Task components to create the workflow.

  2. Create an Application Manager (Amgr) object with required parameters/configurations.

  3. Describe the resource request to be created. Assign resource request description and workflow to the Amgr.

  4. Run the Application Manager.

  5. Sit back and relax!

Jump ahead to take a look at the step-by-step instructions for an example script here.

Intended users

Ensemble Toolkit is completely Python based and requires familiarity with the Python language.

Our primary focus is to support domain scientists and enable them to execute their applications at scale on various HPC platforms. Some of our include:

User Groups

Domain

University of Colorado, Denver

Biochemistry/ Biophysics

Penn State University

Climate Science

Princeton University

Seismology

University College of London

Biochemistry/ Biophysics Medicine

Rice University

Biochemistry/ Biophysics

Stony Brook University

Polar Science

Northern Arizona University

Polar Science

Oak Ridge National Laboratory

Biochemistry/ Biophysics

Ensemble Toolkit

The design and implementation of EnTK are iterative and driven by use cases. Use cases span several scientific domains, including Biomolecular Sciences, Material Sciences, and Earth Sciences. Users and developers collaborate to elicit requirements and rapid prototyping

Design

We describe how applications are modeled using EnTK components, the architecture and execution model of EnTK. We also specify the types of failures recognized by EnTK and how some of the failures are handled.

Application Model

We model an application by combining the Pipeline, Stage and Task (PST) components.

Ensemble Toolkit Design - high level

Figure 1: PST Model

We consider two pythonic collections of objects, Sets and Lists, to describe the task graph. A set-based object represents entities that have no relative order and hence can execute independently. A list-based object represents entities that have a linear temporal order, i.e. entity ‘i’ can only be operated after entity ‘i-1’.

In our PST model, we have the following objects:

  • Task: an abstraction of a computational task that contains information regarding an executable, its software environment and its data dependences.

  • Stage: a set of tasks without mutual dependences and that can be executed concurrently.

  • Pipeline: a list of stages where any stage ‘i’ can be executed only after stage ‘i–1’ has been executed.

When assigned to the Application Manager, the entire application can be expressed as a set of Pipelines. A graphical representation of an application is provided in Figure 1. Note how different Pipelines can have different numbers of Stages, and different Stages can have different numbers of Tasks.

By expressing your application as a set or list of such Pipelines, one can create any application that can be expressed as a DAG.

Architecture

EnTK sits between the user and an HPC platform, abstracting resource management and execution management from the user.

Ensemble Toolkit Architecture and Execution Model

Figure 2: Ensemble Toolkit Architecture and Execution Model

Figure 2 shows the components (purple) and subcomponents (green) of EnTK, organized in three layers: API, Workflow Management, and Workload Management. The API layer enables users to codify PST descriptions. The Workflow Management layer retrieves information from the user about available HPC platforms, initializes EnTK, and holds the global state of the application during execution. The Workload Management layer acquires resources via the RTS. The Workflow Management layer has two components: AppManager and WFProcessor. AppManager uses the Synchronizer subcomponent to update the state of the application at runtime. WFProcessor uses the Enqueue and Dequeue subcomponents to queue and dequeue tasks from the Task Management layer. The Workload Management layer uses ExecManager and its Rmgr, Emgr, RTS Callback, and Heartbeat subcomponents to acquire resources from HPC platforms and execute the application.

This architecture is the isolation of the RTS into a stand-alone subsystem. This enables composability of EnTK with diverse RTS and, depending on capabilities, multiple types of HPC platforms.

Execution Model

Once EnTK is fully initialized, WFProcessor initiates the execution by creating a local copy of the application description from AppManager and tagging tasks for execution. Enqueue pushes these tasks to the Pending queue (Fig. 2, 1). Emgr pulls tasks from the Pending queue (Fig. 2, 2) and executes them using a RTS (Fig. 2, 3). RTS Callback pushes tasks that have completed execution to the Done queue (Fig. 2, 4). Dequeue pulls completed tasks (Fig. 2, 5) and tags them as done, failed or canceled, depending on the return code of the RTS. Each component and subcomponent synchronizes state transitions of pipelines, stages and tasks with AppManager by pushing messages through dedicated queues (Fig. 2, 6). AppManager pulls these messages and updates the application states. AppManager then acknowledges the updates via dedicated queues (Fig. 2, 7). This messaging mechanism ensures that AppManager is always up-to-date with any state change, making it the only stateful component of EnTK.

Failure Model

We consider four main sources of failure: EnTK components, RTS, HPC platform, and task executables. All state updates in EnTK are transactional, hence any EnTK component that fails can be restarted at runtime without losing information about ongoing execution. Both the RTS and the HPC platform are considered black boxes. Partial failures of their subcomponents at runtime are assumed to be handled locally. Platform-level failures are reported to EnTK indirectly, either as failed pilots or failed tasks. Both pilots and tasks can be restarted. Failures are logged and reported to the user at runtime for live or postmortem analysis

Implementation

EnTK is implemented in Python and uses RADICAL-Pilot (RP) as its runtime system. RP is designed to execute ensemble applications via pilots. Pilots provide a multi-stage execution mechanism: Resources are acquired via a placeholder job and subsequently used to execute the application’s tasks. When a pilot is submitted to an HPC platform as a job, it waits in the platform’s queue until the requested resources become available. At that point, the platform’s scheduler bootstraps the job on the requested compute nodes.

You can view the class diagram and sequence diagram and more in the developer documentation.

Performance

Below we present the weak and strong scaling behavior of EnTK on the ORNL Titan machine.

Detailed description of the experiments can be found in this technical paper.

Weak Scaling experiments from Titan

Figure 3: Weak scalability on Titan: 512, 1,024, 2,048, and 4,096 1-core tasks executed on the same amount of cores

Strong Scaling experiments from Titan

Figure 4: Strong scalability on Titan: 8,192 1-core tasks are executed on 1,024, 2,048 and 4,096 cores

Installation

Installing Ensemble Toolkit

Installing Ensemble Toolkit using virtualenv

To install the Ensemble Toolkit, we need to create a virtual environment. Open a terminal and run:

virtualenv $HOME/ve-entk -p python3.7
  • -p params indicates which python version you use, python3.7+ is required

Activate virtualenv by:

source $HOME/ve-entk/bin/activate

Note

Activated env name is indicated in the prompt like: (ve-entk) username@two:~$

Deactivate the virtualenv, if you want to disengage. Your python won’t recognize EnTK after deactivation. To deactivate, run:

deactivate

It is suggested to use the released version of EnTK which you can install by executing the following command in your virtualenv:

pip install radical.entk

To install a specific branch of EnTK, e.g., devel instead of using pip installation, you will need to clone the repository and checkout the branch. You can do so using the following commands:

git clone https://github.com/radical-cybertools/radical.entk.git
cd radical.entk
git checkout <branch-name>
pip install .

You can check the version of Ensemble Toolkit with the `radical-stack` command-line tool. The currently installed version should be printed.

radical-stack

  python               : 3.7.4
  pythonpath           :
  virtualenv           : /home/username/ve-entk

  radical.entk         : 1.0.0
  radical.pilot        : 1.0.0
  radical.saga         : 1.0.0
  radical.utils        : 1.0.0

Installing Ensemble Toolkit using Anaconda/Conda

Conda users can obtain Ensemble Toolkit from conda-forge channel. To install the Ensemble Toolkit, we need to create a conda environment. Open a terminal and run (assuming you have PATH to point to conda):

conda create -n conda-entk python=3.7 -c conda-forge -y
conda activate conda-entk

It is suggested to use the released version of EnTK which you can install by executing the following command in your conda env:

conda install radical.entk -c conda-forge

You can check the version of Ensemble Toolkit with the `radical-stack` command-line tool. The currently installed version should be printed.

radical-stack

  python               : <path>/rct/bin/python3
  pythonpath           :
  version              : 3.9.2
  virtualenv           : rct
  radical.entk         : 1.6.0
  radical.pilot        : 1.6.3
  radical.saga         : 1.6.1
  radical.utils        : 1.6.2

Preparing the Environment

Ensemble Toolkit uses RADICAL Pilot as the runtime system. RADICAL Pilot can access HPC clusters remotely via SSH but it requires: (1) a MongoDB server; and (2) a properly set-up passwordless SSH environment.

Setup passwordless SSH Access to HPC platforms

In order to create a passwordless access to another machine, you need to create a key-pair on your local machine and paste the public key into the authorizes_users list on the remote machine.

This is a recommended tutorial to create password ssh access.

An easy way to setup SSH access to multiple remote machines is to create a file ~/.ssh/config. Suppose the URL used to access a specific machine is foo@machine.example.com. You can create an entry in this config file as follows:

# contents of $HOME/.ssh/config
Host machine1
        HostName machine.example.com
        User foo

Now you can login to the machine by ssh machine1.

Source: http://nerderati.com/2011/03/17/simplify-your-life-with-an-ssh-config-file/

Troubleshooting

Missing virtualenv

This should return the version of the RCT installation, e.g., 1.0.0.

If virtualenv is not installed on your system, you can try the following.

wget --no-check-certificate https://pypi.python.org/packages/source/v/virtualenv/virtualenv-16.7.9.tar.gz
tar xzf virtualenv-16.7.9.tar.gz

python virtualenv-16.7.9/virtualenv.py $HOME/ve-entk -p python3.7
source $HOME/ve-entk/bin/activate

User Guide

The following list of examples will guide you step-by-step to create your own entire application. Please read both the description in the website as well as comments made in the python scripts.

Getting Started

In this section we will run through the Ensemble Toolkit API. We will develop an example application consisting of a simple bag of Tasks.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

Importing components from the Ensemble Toolkit Module

To create any application using Ensemble Toolkit, you need to import five modules: Pipeline, Stage, Task, AppManager, ResourceManager. We have already discussed these components in the earlier sections.

1from radical.entk import Pipeline, Stage, Task, AppManager

Creating the workflow

We first create a Pipeline, Stage and Task object. Then we assign the ‘executable’ and ‘arguments’ for the Task. For this example, we will create one Pipeline consisting of one Stage that contains one Task.

In the below snippet, we first create a Pipeline then a Stage.

22    t = Task()
23    t.name = 'my.first.task'        # Assign a name to the task (optional, do not use ',' or '_')
24    t.executable = '/bin/echo'   # Assign executable to the task
25    t.arguments = ['Hello World']  # Assign arguments for the task executable
26
27    # Add Task to the Stage
28    s.add_tasks(t)

Next, we create a Task and assign its name, executable and arguments of the executable.

30    # Add Stage to the Pipeline
31    p.add_stages(s)
32
33    # Create Application Manager
34    appman = AppManager()

Now, that we have a fully described Task, a Stage and a Pipeline. We create our workflow by adding the Task to the Stage and adding the Stage to the Pipeline.

37    # resource, walltime, and cpus
38    # resource is 'local.localhost' to execute locally
39    res_dict = {
40
41        'resource': 'local.localhost',

Creating the AppManager

Now that our workflow has been created, we need to specify where it is to be executed. For this example, we will simply execute the workflow locally. We create an AppManager object, describe a resource request for 1 core for 10 minutes on localhost, i.e. your local machine. We assign the resource request description and the workflow to the AppManager and run our application.

42        'walltime': 10,
43        'cpus': 1
44    }
45
46    # Assign resource request description to the Application Manager
47    appman.resource_desc = res_dict
48
49    # Assign the workflow as a set or list of Pipelines to the Application Manager
50    # Note: The list order is not guaranteed to be preserved
51    appman.workflow = set([p])
52
53    # Run the Application Manager
54    appman.run()

Warning

If the python version your system has by default is Anaconda python, please change line 51 in the above code block to

'resource': 'local.localhost_anaconda',

To run the script, simply execute the following from the command line:

python get_started.py

Warning

The first run may fail for different reasons, most of which related to setting up the execution environment or requesting the correct resources. Upon failure, Python may incorrectly raise the exception KeyboardInterrupt. This may be confusion because it is reported even when no keyboard interrupt has been issued. Currently, we did not find a way to avoid to raise that exception.

And that’s it! That’s all the steps in this example. You can generate more verbose output by setting the environment variable `export RADICAL_LOG_TGT=radical.log;export RADICAL_LOG_LVL=DEBUG`.

After the execution of the example, you may want to check the output. Under your home folder, you will find a folder named radical.pilot.sandbox. In that folder, there will be a re.session.* folder and a ve.local.localhost folder. Inside, re.session.*, there is a pilot.0000 folder and in there a unit.000000 folder. In the unit folder, you will see several files including a unit.000000.out and unit.000000.err files. The unit.000000.out holds the messages from the standard output and unit.000000.err holds the messages from standard error. The unit.000000.out file should have a Hello World message.

Let’s look at the complete code for this example:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s = Stage()

    # Create a Task object
    t = Task()
    t.name = 'my.first.task'        # Assign a name to the task (optional, do not use ',' or '_')
    t.executable = '/bin/echo'   # Assign executable to the task
    t.arguments = ['Hello World']  # Assign arguments for the task executable

    # Add Task to the Stage
    s.add_tasks(t)

    # Add Stage to the Pipeline
    p.add_stages(s)

    # Create Application Manager
    appman = AppManager()

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set([p])

    # Run the Application Manager
    appman.run()

Adding Tasks

In this section, we will take a look at how we can add more tasks to our base script from the Getting Started section.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

Below, you can see the code snippet that shows how you can create more Task objects and add them to the Stage using the add_task() method.

    t.name = 'my.task'        # Assign a name to the task (optional, do not use ',' or '_')
    t.executable = '/bin/echo'   # Assign executable to the task
    t.arguments = ['I am task %s' % cnt]  # Assign arguments for the task executable

    # Add the Task to the Stage
    s.add_tasks(t)

# Add Stage to the Pipeline
p.add_stages(s)

# Create Application Manager
appman = AppManager()
python add_tasks.py

Note

Individual Task must be created each time in order to treat all the items uniquely inside a Stage. For example, for loop can be used for creating multiple tasks and creating Task object needs to be included inside iteration.

# Five tasks to be added
s = Stage()
task_dict = {
    't.cpu_reqs': {
    'processes'          : 10,
    'threads_per_process': 1,
    'process_type'       : "MPI",
    'thread_type'        : "OpenMP"
    }}

for i in range(5):
    task_dict['name']      = "task-{}".format(i)
    task_dict['arguments'] = ["file-{}".format(i)]
    # Creating new Task object and adding to Stage at every iteration
    s.add_tasks(Task(from_dict=task_dict))

print("Adding shared tasks. Total: {}".format(len(s.tasks)))

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s = Stage()

    for cnt in range(10):

        # Create a Task object
        t = Task()
        t.name = 'my.task'        # Assign a name to the task (optional, do not use ',' or '_')
        t.executable = '/bin/echo'   # Assign executable to the task
        t.arguments = ['I am task %s' % cnt]  # Assign arguments for the task executable

        # Add the Task to the Stage
        s.add_tasks(t)

    # Add Stage to the Pipeline
    p.add_stages(s)

    # Create Application Manager
    appman = AppManager()

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set([p])

    # Run the Application Manager
    appman.run()

Adding Stages

In this section, we will take a look at how we can add more tasks to our script from the Adding Tasks section.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

Below, you can see the code snippet that shows how you can add more Stages to a Pipeline. You simple create more Stage objects, populate them with Tasks and add them to the Pipeline using the add_stage() method.


    for cnt in range(10):

        # Create a Task object
        t = Task()
        t.name = 'my.task'        # Assign a name to the task (optional)
        t.executable = '/bin/echo'   # Assign executable to the task
        t.arguments = ['I am task %s in %s' % (cnt, s1.name)]  # Assign arguments for the task executable

        # Add the Task to the Stage
        s1.add_tasks(t)

    # Add Stage to the Pipeline
    p.add_stages(s1)


    # Create another Stage object
    s2 = Stage()
    s2.name = 'Stage.2'

    for cnt in range(5):

        # Create a Task object
        t = Task()
        t.name = 'my.task'        # Assign a name to the task (optional, do not use ',' or '_')
        t.executable = '/bin/echo'   # Assign executable to the task
        t.arguments = ['I am task %s in %s' % (cnt, s2.name)]  # Assign arguments for the task executable

        # Add the Task to the Stage
        s2.add_tasks(t)

    # Add Stage to the Pipeline
    p.add_stages(s2)


    # Create Application Manager
    appman = AppManager()

    # Create a dictionary describe four mandatory keys:
python add_stages.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()
    s1.name = 'Stage.1'

    for cnt in range(10):

        # Create a Task object
        t = Task()
        t.name = 'my.task'        # Assign a name to the task (optional)
        t.executable = '/bin/echo'   # Assign executable to the task
        t.arguments = ['I am task %s in %s' % (cnt, s1.name)]  # Assign arguments for the task executable

        # Add the Task to the Stage
        s1.add_tasks(t)

    # Add Stage to the Pipeline
    p.add_stages(s1)


    # Create another Stage object
    s2 = Stage()
    s2.name = 'Stage.2'

    for cnt in range(5):

        # Create a Task object
        t = Task()
        t.name = 'my.task'        # Assign a name to the task (optional, do not use ',' or '_')
        t.executable = '/bin/echo'   # Assign executable to the task
        t.arguments = ['I am task %s in %s' % (cnt, s2.name)]  # Assign arguments for the task executable

        # Add the Task to the Stage
        s2.add_tasks(t)

    # Add Stage to the Pipeline
    p.add_stages(s2)


    # Create Application Manager
    appman = AppManager()

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set([p])

    # Run the Application Manager
    appman.run()

Adding Pipelines

In this section, we will take a look at how we can add more pipelines to our script from the Adding Stages section.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

Below, you can see the code snippet that shows how you can create a workflow with two Pipelines. You simple create more Pipeline objects, populate them with Stages and Tasks and create the workflow as a set of two Pipelines and assign them to the Application Manager.

# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {

    'resource': 'local.localhost',

To keep the script shorter, we created a function that creates, populates and returns a Pipeline. The code snippet of this function is as follows.

    for s_cnt in range(stages):

        # Create a Stage object
        s = Stage()
        s.name = 'Stage.%s' % s_cnt

        for t_cnt in range(5):

            # Create a Task object
            t = Task()
            t.name = 'my.task'        # Assign a name to the task (optional)
            t.executable = '/bin/echo'   # Assign executable to the task
            # Assign arguments for the task executable
            t.arguments = ['I am task %s in %s in %s' % (t_cnt, s_cnt, name)]

            # Add the Task to the Stage
            s.add_tasks(t)

        # Add Stage to the Pipeline
        p.add_stages(s)

    return p



if __name__ == '__main__':

python add_pipelines.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


def generate_pipeline(name, stages):

    # Create a Pipeline object
    p = Pipeline()
    p.name = name


    for s_cnt in range(stages):

        # Create a Stage object
        s = Stage()
        s.name = 'Stage.%s' % s_cnt

        for t_cnt in range(5):

            # Create a Task object
            t = Task()
            t.name = 'my.task'        # Assign a name to the task (optional)
            t.executable = '/bin/echo'   # Assign executable to the task
            # Assign arguments for the task executable
            t.arguments = ['I am task %s in %s in %s' % (t_cnt, s_cnt, name)]

            # Add the Task to the Stage
            s.add_tasks(t)

        # Add Stage to the Pipeline
        p.add_stages(s)

    return p



if __name__ == '__main__':

    p1 = generate_pipeline(name='Pipeline 1', stages=1)
    p2 = generate_pipeline(name='Pipeline 2', stages=2)

    # Create Application Manager
    appman = AppManager()

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set([p1, p2])

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Run the Application Manager
    appman.run()

Adding Shared Data

Data management is one of the important elements of any application. In many applications, there exist an initial set of files, common to multiple tasks, that need to be transferred, once, to the remote machine. In this section, we will take a look at how we can manage data shared between multiple tasks to the remote machine by a one-time transfer.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

In the following example, we will create a Pipeline with one Stage and 10 tasks. The tasks concatenate two input files and write the standard output to a file. Since the two input files are common between all the tasks, it will be efficient to transfer those files only once to the remote machine and have the tasks copy the input files when being executed.

Users can specify such shared data using the shared_data attribute of the AppManager object.


p = generate_pipeline()

The shared data can then be referenced for copying or linking using the $SHARED keyword for specifying the file movement description of the task.

    t1.executable = 'cat'
    t1.arguments = ['file1.txt','file2.txt','>','output.txt']
    t1.copy_input_data = ['$SHARED/file1.txt', '$SHARED/file2.txt']
    t1.download_output_data = ['output.txt > %s/output_%s.txt'  % (cur_dir, i + 1)]

    # Add the Task to the Stage

In the example provided, the two files contain the words ‘Hello’ and ‘World’, respectively, and the output files are expected to contain ‘Hello World’

python add_shared_data.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

import os
from glob import glob

import radical.utils as ru

from radical.entk import Pipeline, Stage, Task, AppManager

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'

cur_dir = os.path.dirname(os.path.abspath(__file__))


def generate_pipeline():

    # Create a Pipeline object
    p1 = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    for i in range(10):
        t1 = Task()
        t1.executable = 'cat'
        t1.arguments = ['file1.txt','file2.txt','>','output.txt']
        t1.copy_input_data = ['$SHARED/file1.txt', '$SHARED/file2.txt']
        t1.download_output_data = ['output.txt > %s/output_%s.txt'  % (cur_dir, i + 1)]

        # Add the Task to the Stage
        s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p1.add_stages(s1)

    return p1


if __name__ == '__main__':

    for f in glob('%s/file*.txt' % cur_dir):
        os.remove(f)

    os.system('echo "Hello" > %s/file1.txt' % cur_dir)
    os.system('echo "World" > %s/file2.txt' % cur_dir)


    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cpus and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

            'resource': 'local.localhost',
            'walltime': 1,
            'cpus': 1
    }

    # Create Application Manager
    appman = AppManager()

    # Assign resource manager to the Application Manager
    appman.resource_desc = res_dict
    appman.shared_data = ['%s/file1.txt' % cur_dir, '%s/file2.txt' % cur_dir]

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()

    for x in range(10):
        with ru.ru_open('%s/output_%s.txt' % (cur_dir,x + 1), 'r') as fp:
            print('Output %s: ' % (x + 1), fp.readlines())
        os.remove('%s/output_%s.txt' % (cur_dir,x + 1))


    os.remove('%s/file1.txt' % cur_dir)
    os.remove('%s/file2.txt' % cur_dir)

Adding Data

Data management is one of the important elements of any application. In this section, we will take a look at how we can manage data between tasks.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

In the following example, we will create a Pipeline of two Stages, each with one Task. In the first stage, we will create a file of size 1MB. In the next stage, we will perform a character count on the same file and write to another file. Finally, we will bring that output to the current location.

Since we already know how to create this workflow, we simply present the code snippet concerned with the data movement. The task in the second stage needs to perform two data movements: a) copy the data created in the first stage to its current directory and b) bring the output back to the directory where the script is executed. Below we present the statements that perform these operations.

s2.add_tasks(t2)

# Add Stage to the Pipeline
p.add_stages(s2)

# Create Application Manager
appman = AppManager()

Intermediate data are accessible through a unique identification formed by $Pipeline_%s_Stage_%s_Task_%s/{filename} where %s is replaced by entity name of pipeline, stage and task. If name is not given, .uid is available to locate files across tasks. Task API has more information about the use of API.

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_LOG_LVL=DEBUG.

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)


    # Create another Stage object
    s2 = Stage()
    s2.name = 'Stage.2'

    # Create a Task object
    t2 = Task()
    t2.executable = '/bin/bash'
    t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
    # Copy data from the task in the first stage to the current task's location
    t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)]
    # Download the output of the current task to the current location
    t2.download_output_data = ['ccount.txt']

    # Add the Task to the Stage
    s2.add_tasks(t2)

    # Add Stage to the Pipeline
    p.add_stages(s2)

    # Create Application Manager
    appman = AppManager()

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    appman.workflow = set([p])

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cpus and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Run the Application Manager
    appman.run()

Handling data from login node

The following example shows how to configure a task to fetch data at runtime, when compute nodes do not have internet access.

t = Task()
t.name = 'download-task'
t.executable = '/usr/bin/ssh'
t.arguments = ["<username>@<hostname>",
               "'bash -l /path/to/download_script.sh <input1> <input2>'"]
t.download_output_data = ['STDOUT', 'STDERR']
t.cpu_reqs = {'cpu_processes': 1,
              'cpu_process_type': None,
              'cpu_threads': 1,
              'cpu_thread_type': None}

Note

bash -l makes the shell act as if it had been directly invoked by logging in.

Note

Verify that the command ssh localhost hostname works on the login node without a password prompt. If you are asked for a password, please create an ssh keypair with the command ssh-keygen. That should create two keys in ~/.ssh, named id_rsa and id_rsa.pub. Now, execute the following commands:

cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 0600 $HOME/.ssh/authorized_keys

Changing Target Machine

All of our examples so far have been run locally. Its time to run something on a HPC! One of the features of Ensemble Toolkit is that you can submit tasks on another machine remotely from your local machine. But this has some requirements, you need to have passwordless ssh access to the target machine. If you don’t have such access, we discuss the setup here. You also need to confirm that RP and Ensemble Toolkit are supported on this machine. A list of supported machines and how to get support for new machines is discussed here.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

Once you have passwordless access to another machine, switching from one target machine to another is quite simple. We simply re-describe the resource dictionary that is used to create the Resource Manager. For example, in order to run on the ACCESS Stampede cluster, we describe the resource dictionary as follows:

    'project': 'unc100',
    'schema': 'gsissh'

}

# Assign resource request description to the Application Manager
appman.resource_desc = res_dict

# Run the Application Manager

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

python change_target.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)

    # Create another Stage object
    s2 = Stage()
    s2.name = 'Stage 2'

    # Create a Task object
    t2 = Task()
    t2.executable = '/bin/bash'
    t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
    # Copy data from the task in the first stage to the current task's location
    t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.name,
        s1.name, t1.name)]
    # Download the output of the current task to the current location
    t2.download_output_data = ['ccount.txt']

    # Add the Task to the Stage
    s2.add_tasks(t2)

    # Add Stage to the Pipeline
    p.add_stages(s2)

   # Create Application Manager
    appman = AppManager()

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    appman.workflow = set([p])

    # Create a dictionary to describe our resource request for XSEDE Stampede
    res_dict = {

        'resource': 'xsede.comet',
        'walltime': 10,
        'cpus': 16,
        'project': 'unc100',
        'schema': 'gsissh'

    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Run the Application Manager
    appman.run()

Profiling

EnTK can be configured to generate profiles by setting RADICAL_ENTK_PROFILE=True. Profiles are generated per component and sub-component. These profiles can be read and analyzed by using RADICAL Analytics (RA).

We describe profiling capabilities using RADICAL Analytics for EnTK via two examples that extract durations and timestamps.

The scripts and the data can be found in your virtualenv under share/radical.entk/analytics/scripts or can be downloaded via the following links:

  • Data: Link

  • Durations: Link

  • Timestamps: Link

Untar the data and run either of the scripts. We recommend following the inline comments and output messages to get an understanding of RADICAL Analytics’ usage for EnTK.

More details on the capabilities of RADICAL Analytics can be found in its documentation.

Note

The current examples of RADICAL Analytics are configured for RADICAL Pilot but can be changed to EnTK by * Setting stype to ‘radical.entk’ when creating the RADICAL Analytics session * Following the state model, event model, and sequence diagram to determine the EnTK probes to use in profiling.

Extracting durations

 1#!/usr/bin/env python
 2__copyright__ = 'Copyright 2013-2018, http://radical.rutgers.edu'
 3__license__ = 'MIT'
 4
 5
 6import os
 7import sys
 8import glob
 9import pprint
10import radical.utils as ru
11import radical.entk as re
12import radical.analytics as ra
13
14"""This example illustrates hoq to obtain durations for arbitrary (non-state)
15profile events. Modified from examples under RADICAL Analytics"""
16
17# ------------------------------------------------------------------------------
18#
19if __name__ == '__main__':
20
21    loc = './re.session.two.vivek.017759.0012'
22    src = os.path.dirname(loc)
23    sid = os.path.basename(loc)
24    session = ra.Session(src=src, sid = sid, stype='radical.entk')
25
26    # A formatting helper before starting...
27    def ppheader(message):
28        separator = '\n' + 78 * '-' + '\n'
29        print(separator + message + separator)
30
31    # First we look at the *event* model of our session.  The event model is
32    # usually less stringent than the state model: not all events will always be
33    # available, events may have certain fields missing, they may be recorded
34    # multiple times, their meaning may slightly differ, depending on the taken
35    # code path.  But in general, these are the events available, and their
36    # relative ordering.
37    ppheader("event models")
38    pprint.pprint(session.describe('event_model'))
39    pprint.pprint(session.describe('statistics'))
40
41    # Let's say that we want to see how long EnTK took to schedule, execute, and
42    # process completed tasks.
43
44    # We first filter our session to obtain only the task objects
45    tasks = session.filter(etype='task', inplace=False)
46    print('#tasks   : %d' % len(tasks.get()))
47
48    # We use the 're.states.SCHEDULING' and 're.states.SUBMITTING' probes to find
49    # the time taken by EnTK to create and submit all tasks for execution
50    ppheader("Time spent to create and submit the tasks")
51    duration = tasks.duration(event=[{ru.EVENT: 'state',
52                                    ru.STATE: re.states.SCHEDULING},
53                                    {ru.EVENT: 'state',
54                                    ru.STATE: re.states.SUBMITTING}])
55    print('duration : %.2f' % duration)
56
57    # We use the 're.states.SUBMITTING' and 're.states.COMPLETED' probes to find
58    # the time taken by EnTK to execute all tasks
59    ppheader("Time spent to execute the tasks")
60    duration = tasks.duration(event=[{ru.EVENT: 'state',
61                                    ru.STATE: re.states.SUBMITTING},
62                                    {ru.EVENT: 'state',
63                                    ru.STATE: re.states.COMPLETED}])
64    print('duration : %.2f' % duration)
65
66    # We use the 're.states.COMPLETED' and 're.states.DONE' probes to find
67    # the time taken by EnTK to process all executed tasks
68    ppheader("Time spent to process executed tasks")
69    duration = tasks.duration(event=[{ru.EVENT: 'state',
70                                    ru.STATE: re.states.COMPLETED},
71                                    {ru.EVENT: 'state',
72                                    ru.STATE: re.states.DONE}])
73    print('duration : %.2f' % duration)
74
75    # Finally, we produce a list of the number of concurrent tasks between
76    # states 're.states.SUBMITTING' and 're.states.COMPLETED' over the course
77    # of the entire execution sampled every 10 seconds
78    ppheader("concurrent tasks in between SUBMITTING and EXECUTED states")
79    concurrency = tasks.concurrency(event=[{ru.EVENT: 'state',
80                                            ru.STATE: re.states.SUBMITTING},
81                                            {ru.EVENT: 'state',
82                                            ru.STATE: re.states.COMPLETED}],
83                                    sampling=10)
84    pprint.pprint(concurrency)
85
86
87# ------------------------------------------------------------------------------

Extracting timestamps

 1#!/usr/bin/env python
 2
 3import os
 4import pprint
 5import radical.utils as ru
 6import radical.entk as re
 7import radical.analytics as ra
 8
 9__copyright__ = 'Copyright 2013-2018, http://radical.rutgers.edu'
10__license__ = 'MIT'
11
12"""
13This example illustrates the use of the method ra.Session.get().
14Modified from examples under RADICAL Analytics
15"""
16
17# ------------------------------------------------------------------------------
18#
19if __name__ == '__main__':
20
21    loc = './re.session.two.vivek.017759.0012'
22    src = os.path.dirname(loc)
23    sid = os.path.basename(loc)
24    session = ra.Session(src=src, sid=sid, stype='radical.entk')
25
26    # A formatting helper before starting...
27    def ppheader(message):
28        separator = '\n' + 78 * '-' + '\n'
29        print(separator + message + separator)
30
31    # and here we go. As seen in example 01, we use ra.Session.list() to get the
32    # name of all the types of entity of the session.
33    etypes = session.list('etype')
34    pprint.pprint(etypes)
35
36    # We limit ourselves to the type 'task'. We use the method
37    # ra.Session.get() to get all the objects in our session with etype 'task':
38    ppheader("properties of the entities with etype 'task'")
39    tasks = session.get(etype='task')
40    pprint.pprint(tasks)
41
42
43    # Mmmm, still a bit too many entities. We limit our analysis to a single
44    # task. We use ra.Session.get() to select all the objects in the
45    # session with etype 'task' and uid 'task.0000' and return them into a
46    # list:
47    ppheader("properties of the entities with etype 'task' and uid 'task.0000'")
48    task = session.get(etype='task', uid='task.0000')
49    pprint.pprint(task)
50
51
52    # We may want also to look into the states of this task:
53    ppheader("states of the entities with uid 'task.0000'")
54    states = task[0].states
55    pprint.pprint(states)
56
57    # and extract the state we need. For example, the state 'SCHEDULED', that
58    # indicates that the task has been scheduled. To refer to the state 'SCHEDULED',
59    # and to all the other states of RADICAL-Pilot, we use the re.states.SCHEDULED property
60    # that guarantees type checking.
61    ppheader("Properties of the state re.SCHEDULED of the entities with uid 'task.0000'")
62    state = task[0].states[re.states.SCHEDULED]
63    pprint.pprint(state)
64
65    # Finally, we extract a property we need from this state. For example, the
66    # timestamp of when the task has been created, i.e., the property 'time' of
67    # the state SCHEDULED:
68    ppheader("Property 'time' of the state re.states.SCHEDULED of the entities with uid 'task.000000'")
69    timestamp = task[0].states[re.states.SCHEDULED][ru.TIME]
70    pprint.pprint(timestamp)
71
72    # ra.Session.get() can also been used to to get all the entities in our
73    # session that have a specific state. For example, the following gets all
74    # the types of entity that have the state 'SCHEDULED':
75    ppheader("Entities with state re.states.SCHEDULED")
76    entities = session.get(state=re.states.SCHEDULED)
77    pprint.pprint(entities)
78
79    # We can then print the timestamp of the state 'SCHEDULED' for all the entities
80    # having that state by using something like:
81    ppheader("Timestamp of all the entities with state re.states.SCHEDULED")
82    timestamps = [entity.states[re.states.SCHEDULED][ru.TIME] for entity in entities]
83    pprint.pprint(timestamps)
84
85    # We can also create tailored data structures for our analyis. For
86    # example, using tuples to name entities, state, and timestamp:
87    ppheader("Named entities with state re.states.SCHEDULED and its timestamp")
88    named_timestamps = [(entity.uid,
89                        entity.states[re.states.SCHEDULED][ru.STATE],
90                        entity.states[re.states.SCHEDULED][ru.TIME]) for entity in entities]
91    pprint.pprint(named_timestamps)

Examples

The following is a list of common application structures. Please read both the description on the website and the comments made in the python scripts.

Pipeline of Ensembles

One of the most common execution patterns consists of one Pipeline with multiple Stages where each Stage consists of several Tasks. We call this a Pipeline of Ensembles. A pictorial representation of this pattern is provided below.

Pipeline of Ensembles

Figure 1: Pipeline of Ensembles

We encourage you to take a look at the Ensemble of Pipelines example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

In the following example, we create one Pipeline with three Stages. The Task in the first Stage creates a file. There are 10 Tasks in the second Stage that perform a character count on that file. The 10 Tasks in the third Stage perform a checksum on the output of each Task from the second stage.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/simple/scripts.

python poe.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os


# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


# ------------------------------------------------------------------------------
#
def generate_pipeline():

    # Create a Pipeline object
    p = Pipeline()
    p.name = 'p1'

    # Create a Stage object
    s1 = Stage()
    s1.name = 's1'

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.name = 't1'
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)

    # Create another Stage object to hold character count tasks
    s2 = Stage()
    s2.name = 's2'
    s2_task_uids = []

    for cnt in range(30):

        # Create a Task object
        t2 = Task()
        t2.name = 't%s' % (cnt + 1)
        t2.executable = '/bin/bash'
        t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
        # Copy data from the task in the first stage to the current task's location
        t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' %
                (p.uid, s1.uid, t1.uid)]

        # Add the Task to the Stage
        s2.add_tasks(t2)
        s2_task_uids.append(t2.uid)

    # Add Stage to the Pipeline
    p.add_stages(s2)

    # Create another Stage object to hold checksum tasks
    s3 = Stage()
    s3.name = 's3'

    for cnt in range(30):

        # Create a Task object
        t3 = Task()
        t3.name = 't%s' % (cnt + 1)
        t3.executable = '/bin/bash'
        t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt']
        # Copy data from the task in the first stage to the current task's location
        t3.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/ccount.txt' %
                (p.uid, s2.uid, s2_task_uids[cnt])]
        # Download the output of the current task to the current location
        t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt]

        # Add the Task to the Stage
        s3.add_tasks(t3)

    # Add Stage to the Pipeline
    p.add_stages(s3)

    return p


# ------------------------------------------------------------------------------
#
if __name__ == '__main__':

    # Create Application Manager
    appman = AppManager()

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {
            'resource': 'local.localhost',
            'walltime': 10,
            'cpus':2
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set([generate_pipeline()])

    # Run the Application Manager
    appman.run()


# ------------------------------------------------------------------------------

Ensemble of Pipelines

One of the most common execution patterns consists of mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this an Ensemble of Pipelines. A pictorial representation of this pattern is provided below.

Ensemble of Pipelines

Figure 1: Ensemble of Pipelines

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

In the following example, we create 10 Pipelines, each with three Stages. The Task in the first Stage creates a file. The Task in the second Stage performs a character count on that file. The Task in the third Stage performs a checksum on the output of the Task from the second stage.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/simple/scripts.

python eop.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


def generate_pipeline():

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)

    # Create another Stage object to hold character count tasks
    s2 = Stage()

    # Create a Task object
    t2 = Task()
    t2.executable = '/bin/bash'
    t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
    # Copy data from the task in the first stage to the current task's location
    t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.uid,
        s1.uid, t1.uid)]

    # Add the Task to the Stage
    s2.add_tasks(t2)

    # Add Stage to the Pipeline
    p.add_stages(s2)

    # Create another Stage object to hold checksum tasks
    s3 = Stage()

    # Create a Task object
    t3 = Task()
    t3.executable = '/bin/bash'
    t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt']
    # Copy data from the task in the first stage to the current task's location
    t3.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/ccount.txt' % (p.uid,
        s2.uid, t2.uid)]
    # Download the output of the current task to the current location
    t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt]

    # Add the Task to the Stage
    s3.add_tasks(t3)

    # Add Stage to the Pipeline
    p.add_stages(s3)

    return p


if __name__ == '__main__':

    pipelines = []

    for cnt in range(10):
        pipelines.append(generate_pipeline())

    # Create Application Manager
    appman = AppManager()

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {
        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set(pipelines)

    # Run the Application Manager
    appman.run()

Sequence of Workflows

Another common execution pattern consists of same-session sequential workflows with mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this a Sequence of Workflows.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

In the following example, we create 2 sequential workflows, each with 2 Pipelines and 3 Stages per Pipeline. For demonstration purposes, each Task does nothing but “sleep” for 3 seconds. The example suggests starting AppManager with autoterminate=False and using appman.terminate() once all pipelines are finished. This allows you to use the same application manager for the second workflow.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/simple/scripts.

python sow.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager


def generate_pipeline():

    p = Pipeline()
    s1 = Stage()

    t1 = Task()
    t1.executable = '/bin/sleep'
    t1.arguments = ['3']

    s1.add_tasks(t1)

    p.add_stages(s1)
    s2 = Stage()
    t2 = Task()
    t2.executable = '/bin/sleep'
    t2.arguments = ['3']

    s2.add_tasks(t2)
    p.add_stages(s2)
    s3 = Stage()

    t3 = Task()
    t3.executable = '/bin/sleep'
    t3.arguments = ['3']

    s3.add_tasks(t3)
    p.add_stages(s3)

    return p


if __name__ == '__main__':

    appman   = AppManager()
    res_dict = {
        'resource': 'local.localhost',
        'walltime': 10,
        'cpus'    :  8
    }
    appman.resource_desc = res_dict


    pipelines = list()
    for cnt in range(2):
        pipelines.append(generate_pipeline())

    appman.workflow = set(pipelines)
    appman.run()

    print('1 ===================================================')


    pipelines = list()
    for cnt in range(2):
        pipelines.append(generate_pipeline())

    appman.workflow = set(pipelines)
    appman.run()

    print('2 ===================================================')

    appman.terminate()

    print('t ===================================================')

Advanced Examples

The following is a list of common application structures. Please read both the description in the website as well as comments made in the python scripts.

Adaptive applications: Task-count

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/advanced/scripts.

For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def func_post():
    if CUR_NEW_STAGE <= MAX_NEW_STAGE:
        ...

s = Stage()
s.post_exec = func_post

In the following example, we initially create 1 Pipeline with one Stage. There are 10 tasks in the first Stage that each runs ‘sleep 30’. After the Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the number of new stages added is less than 4. If yes, we add a new Stage with similar tasks as before to the Pipeline. If 4 stages have already been added, no more stages are added.

python adapt_tc.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def generate_pipeline():

    def func_post():

        global CUR_NEW_STAGE, MAX_NEW_STAGE

        if CUR_NEW_STAGE <= MAX_NEW_STAGE:

            CUR_NEW_STAGE += 1
            s = Stage()
            for i in range(10):
                t = Task()
                t.executable = '/bin/sleep'
                t.arguments  = [ '30']
                s.add_tasks(t)

        # Add post-exec to the Stage
        s.post_exec = func_post

        p.add_stages(s)

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    for i in range(10):

        t1 = Task()
        t1.executable = 'sleep'
        t1.arguments = [ '30']

        # Add the Task to the Stage
        s1.add_tasks(t1)

    # Add post-exec to the Stage
    s1.post_exec = func_post

    # Add Stage to the Pipeline
    p.add_stages(s1)

    return p

if __name__ == '__main__':

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

            'resource': 'local.localhost',
            'walltime': 15,
            'cpus': 2,
    }


    # Create Application Manager
    appman = AppManager()
    appman.resource_desc = res_dict

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()

Adaptive applications: Task-order

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/advanced/scripts.

For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def func_post():
    if CUR_NEW_STAGE <= MAX_NEW_STAGE:
        ...

s = Stage()
s.post_exec = func_post

In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage that each run ‘sleep 30’. After a Stage is DONE (i.e. all tasks of the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the remaining stages are shuffled. If not, no operation is performed.

python adapt_to.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def generate_pipeline():

    def func_post():

        global CUR_NEW_STAGE, MAX_NEW_STAGE

        if CUR_NEW_STAGE <= MAX_NEW_STAGE:
            CUR_NEW_STAGE += 1
            shuffle(p.stages[CUR_NEW_STAGE:])

        else:
            print('Done')

    # Create a Pipeline object
    p = Pipeline()

    for s in range(MAX_NEW_STAGE+1):

        # Create a Stage object
        s1 = Stage()

        for i in range(CUR_TASKS):

            t1 = Task()
            t1.executable = '/bin/sleep'
            t1.arguments = [ '30']

            # Add the Task to the Stage
            s1.add_tasks(t1)

        # Add post-exec to the Stage
        s1.post_exec = func_post

        # Add Stage to the Pipeline
        p.add_stages(s1)

    return p

if __name__ == '__main__':

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 15,
        'cpus': 2,
    }

    # Create Application Manager
    appman = AppManager()
    appman.resource_desc = res_dict

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()

Adaptive applications: Task-attribute

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/advanced/scripts.

For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

s1.post_exec = func_post

def func_post():

    global CUR_NEW_STAGE, MAX_NEW_STAGE

    if CUR_NEW_STAGE <= MAX_NEW_STAGE:
        CUR_NEW_STAGE += 1
        s = Stage()

        for i in range(10):
            t = Task()
            t.executable = '/bin/sleep'
            t.arguments = ['30']
            s.add_tasks(t)

        s.post_exec = func_condition
        p.add_stages(s)

In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage, each running ‘sleep 30’. After a Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the arguments of the tasks of the next Stage are modified. If not, no operation is performed, and the pipeline will terminate.

python adapt_ta.py

Let’s take a look at the complete code in the example. Note: You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def generate_pipeline():

    def func_post():

        global CUR_NEW_STAGE, MAX_NEW_STAGE

        if CUR_NEW_STAGE <= MAX_NEW_STAGE:
            CUR_NEW_STAGE += 1
            for t in p.stages[CUR_NEW_STAGE].tasks:
                dur = randint(10,30)
                t.arguments = [str(dur)]
        else:
            print('Done')


    # Create a Pipeline object
    p = Pipeline()

    for s in range(MAX_NEW_STAGE+1):

        # Create a Stage object
        s1 = Stage()

        for _ in range(CUR_TASKS):

            t1 = Task()
            t1.executable = '/bin/sleep'
            t1.arguments = [ '30']

            # Add the Task to the Stage
            s1.add_tasks(t1)

        # Add post-exec to the Stage
        s1.post_exec = func_post

        # Add Stage to the Pipeline
        p.add_stages(s1)

    return p

if __name__ == '__main__':

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 15,
        'cpus': 2,
    }

    # Create Application Manager
    appman = AppManager()
    appman.resource_desc = res_dict

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()

API Reference for Users

Application Creation components API

Pipeline API

class radical.entk.Pipeline[source]

A pipeline represents a collection of objects that have a linear temporal execution order. In this case, a pipeline consists of multiple ‘Stage’ objects. Each `Stage_i` can execute only after all stages up to `Stage_(i-1)` have completed execution.

add_stages(value)[source]

Appends stages to the current Pipeline

Argument

List of Stage objects

as_dict()[source]

Convert current Pipeline (i.e. its attributes) into a dictionary

Returns

python dictionary

property completed

Returns whether the Pipeline has completed

Returns

Boolean

property current_stage

Returns the current stage being executed

Returns

Integer

from_dict(d)[source]

Create a Pipeline from a dictionary. The change is in inplace.

Argument

python dictionary

Returns

None

property lock

Returns the lock over the current Pipeline

Returns

Lock object

property luid

Unique ID of the current pipeline (fully qualified). For the pipeline class, his is an alias to uid.

Example

>>> pipeline.luid
pipe.0001
Getter

Returns the fully qualified uid of the current pipeline

Type

String

property name

Name of the pipeline useful for bookkeeping and to refer to this pipeline while data staging. Do not use a ‘,’ or ‘_’ in an object’s name.

Getter

Returns the name of the pipeline

Setter

Assigns the name of the pipeline

Type

String

resume()[source]

Continue execution of paused stages and tasks.

  • The resume() method can only be called on a suspended pipeline, an exception will be raised if that condition is not met.

  • The state of a resumed pipeline will be set to the state the pipeline had before suspension.

property stages

Stages of the list

Getter

Returns the stages in the current Pipeline

Setter

Assigns the stages to the current Pipeline

Type

List

property state

Current state of the pipeline

Getter

Returns the state of the current pipeline

Type

String

property state_history

Returns a list of the states obtained in temporal order

Returns

list

suspend()[source]

Pause execution of the pipeline: stages and tasks that are executing will continue to execute, but no new stages and tasks will be eligible for execution until resume() is called.

  • The suspend() method can not be called on a suspended or completed pipeline, doing so will result in an exeption.

  • The state of the pipeline will be set to SUSPENDED.

property uid

Unique ID of the current pipeline

Getter

Returns the unique id of the current pipeline

Type

String

Stage API

class radical.entk.Stage[source]

A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of ‘Task’ objects. All tasks of the same stage may execute concurrently.

add_tasks(value)[source]

Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won’t be added but updated with changes.

Argument

iterable task object

as_dict()[source]

Convert current Stage into a dictionary

Returns

python dictionary

from_dict(d)[source]

Create a Stage from a dictionary. The change is in inplace.

Argument

python dictionary

Returns

None

property luid

Unique ID of the current stage (fully qualified).

Example

>>> stage.luid
pipe.0001.stage.0004
Getter

Returns the fully qualified uid of the current stage

Type

String

property name

Name of the stage. Do not use a ‘,’ or ‘_’ in an object’s name.

Getter

Returns the name of the current stage

Setter

Assigns the name of the current stage

Type

String

property parent_pipeline

Returns the pipeline this stage belongs to :setter: Assigns the pipeline uid this stage belongs to

Type

getter

property post_exec

The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished.

Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon.

Example:

s1.post_exec = func_post

def func_post():

if condition is met:

s = Stage() t = Task() t.executable = ‘/bin/sleep’ t.arguments = [‘30’] s.add_tasks(t) p.add_stages(s)

else:

# do nothing pass

property state

Current state of the stage

Getter

Returns the state of the current stage

Type

String

property state_history

Returns a list of the states obtained in temporal order

Returns

list

property tasks

Tasks of the stage

Getter

Returns all the tasks of the current stage

Setter

Assigns tasks to the current stage

Type

set of Tasks

property uid

Unique ID of the current stage

Getter

Returns the unique id of the current stage

Type

String

Task API

class radical.entk.Task(from_dict=None)[source]

A Task is an abstraction of a computational unit. In this case, a Task consists of its executable along with its required software environment, files to be staged as input and output.

At the user level a Task is to be populated by assigning attributes. Internally, an empty Task is created and population using the from_dict function. This is to avoid creating Tasks with new uid as tasks with new uid offset the uid count file in radical.utils and can potentially affect the profiling if not taken care.

uid

[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RE if the field is not set, and cannot be re-assigned (immutable attribute).

name

[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads. Such characters as ‘,’ or ‘_’ are not allowed in a name.

state

[type: str | default: “DESCRIBED”] Current state of the task, its initial value re.states.INITIAL is set during the task initialization, and this value is also appended into state_history attribute.

state_history

[type: list | default: [“DESCRIBED”]] List of the states obtained in temporal order. Every time new state is set, it is appended to state_history automatically.

executable

[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.

arguments

[type: list | default: []] List of arguments to be supplied to the executable.

environment

[type: dict | default: {}] Environment variables to set in the

environment before the execution process.

sandbox

[type: str | default: “”] This specifies the working directory of the task. By default, the task’s uid is used as a sandbox for the task.

pre_launch

[type: list | default: []] List of actions (shell commands) to perform before the task is launched.

Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of resources! Deviating from that rule will likely result in reduced overall throughput.

post_launch

[type: list | default: []] List of actions (shell commands) to perform after the task finishes.

The same remarks as on pre_launch apply

pre_exec

[type: list | default: []] List of actions (shell commands) to perform after task is launched, but before rank(s) starts execution.

The same remarks as on pre_launch apply

post_exec

[type: list | default: []] List of actions (shell commands) to perform right after rank(s) finishes.

The same remarks as on pre_launch apply

cpu_reqs

[type: CpuReqs | default: CpuReqs()] The CPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.

The expected format is dict-like:

task.cpu_reqs = {‘cpu_processes’X,

‘cpu_process_type’ : None/’MPI’, ‘cpu_threads’ : Y, ‘cpu_thread_type’ : None/’OpenMP’}

This description means that the Task is going to spawn X processes and Y threads per each of these processes to run on CPUs. Hence, the total number of cpus required by the Task is X * Y for all the processes and threads to execute concurrently.

By default, 1 CPU process and 1 CPU thread per process are requested.

gpu_reqs

[type: GpuReqs | default: GpuReqs()] The GPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.

The expected format is dict-like:

task.gpu_reqs = {‘gpu_processes’X,

‘gpu_process_type’ : None/’CUDA’/’ROCm’, ‘gpu_threads’ : Y, ‘gpu_thread_type’ : None}

This description means that each rank of the task is going to use X GPUs with Y GPU-threads. By default, 0 GPUs are requested.

lfs_per_process

[type: int | default: 0] Local File Storage per process - amount of space (MB) required on the local file system of the node by the task.

mem_per_process

[type: int | default: 0] Amount of memory required by the task.

upload_input_data

[type: list | default: []] List of file names required to be transferred from a local client storage to the location of the task or data staging area before task starts.

copy_input_data

[type: list | default: []] List of file names required to be copied from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

Example of a file location format:

$Pipeline_%s_Stage_%s_Task_%s, where %s is replaced entity name/uid

# output.txt is copied from a t1 task to a current task sandbox t2.copy_input_data = [‘$Pipeline_%s_Stage_%s_Task_%s/output.txt’ %

(p.name, s1.name, t1.name)]

[type: list | default: []] List of file names required to be symlinked from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

move_input_data

[type: list | default: []] List of file names required to be moved from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

copy_output_data

[type: list | default: []] List of file names required to be copied from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

Example of defining data to be copied:

# results.txt will be copied to a data staging area $SHARED t.copy_output_data = [‘results.txt > $SHARED/results.txt’]

[type: list | default: []] List of file names required to be symlinked from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

move_output_data

[type: list | default: []] List of file names required to be moved from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

download_output_data

[type: list | default: []] List of file names required to be downloaded from a current task to a local client storage space when a task is finished.

Example of defining data to be downloaded:

# results.txt is transferred to a local client storage space t.download_output_data = [‘results.txt’]

stdout

[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.

stderr

[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.

stage_on_error

[type: bool | default: False] Flag to allow staging out data if task got failed (output staging is normally skipped on FAILED or CANCELED tasks).

exit_code

[type: int | default: None] Get the exit code for finished tasks: 0 - for successful tasks; 1 - for failed tasks.

exception

[type: str | default: None] Get the representation of the exception which caused the task to fail.

exception_detail

[type: str | default: None] Get additional details (traceback or error messages) to the exception which caused this task to fail.

path

[type: str | default: “”] Get the path of the task on the remote machine. Useful to reference files generated in the current task.

tags

[type: dict | default: None] The tags for the task that can be used while scheduling by the RTS (configuration specific tags, which influence task scheduling and execution, e.g., tasks co-location).

rts_uid

[type: str | default: None] Unique RTS ID of the current task.

parent_stage

[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the stage, which contains the current task.

parent_pipeline

[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the pipeline, which contains the current task.

annotations

[type: Annotations | default: None] Annotations to describe task’s input and output files, and sets dependencies between tasks.

metadata

[type: dict | default: {}] User/system defined metadata.

Read-only attributes
property luid

[type: str] Unique ID of the current task (fully qualified).

> task.luid pipe.0001.stage.0004.task.0234

annotate(inputs: Optional[Union[Dict, List, str]] = None, outputs: Optional[Union[List, str]] = None) None[source]

Adds dataflow annotations with provided input and output files, and defines dependencies between tasks.

inputs

List of input files. If a file is produced by the previously executed task, then the corresponding input element is provided as a dictionary with the task instance as a key. Example: inputs=[‘file1’, {task0: ‘file2’, task1: [‘file2’]}]

Type

list, optional

outputs

List of produced/generated files.

Type

list, optional

from_dict(d)[source]

Re-initialization, resets all attributes with provided input data.

property luid

Unique ID of the current task (fully qualified).

Example

> task.luid pipe.0001.stage.0004.task.123456

Luid

Returns the fully qualified uid of the current task

Type

str

Application Manager API

class radical.entk.AppManager(config_path=None, reattempts=None, resubmit_failed=None, autoterminate=None, write_workflow=None, rts=None, rts_config=None, name=None, base_path=None, **kwargs)[source]

An application manager takes the responsibility of setting up the communication infrastructure, instantiates the ResourceManager, TaskManager, WFProcessor objects and all their threads and processes. This is the Master object running in the main process and is designed to recover from errors from all other objects, threads and processes.

Arguments
config_path

Url to config path to be read for AppManager

reattempts

number of attempts to re-invoke any failed EnTK components

resubmit_failed

resubmit failed tasks (True/False)

autoterminate

terminate resource reservation upon execution of all tasks of first workflow (True/False)

write_workflow

write workflow and mapping to rts entities to a file (post-termination)

rts

Specify RTS to use. Current options: ‘mock’, ‘radical.pilot’ (default if unspecified)

rts_config

Configuration for the RTS, accepts {‘sandbox_cleanup’: True/False,’db_cleanup’: True/False} when RTS is RP

name

Name of the Application. It should be unique between executions. (default is randomly assigned)

base_path

base path of logger and profiler with a session id, None (default) will use a current working directory

property name

Name for the application manager. Allows the user to setup the name of the application manager, as well as its session ID. This name should be unique between different EnTK executions, otherwise it will produce an error.

Getter

Returns the name of the application manager

Setter

Assigns the name of the application manager

Type

String

property outputs
Return list of filenames that are to be staged out after

execution

Setter

Assign a list of names of files that need to be staged from the remote machine

Type

getter

property resource_desc

The resource description is a dictionary that holds information about the resource(s) that will be used to execute the workflow.

The following keys are mandatory in all resource descriptions:
‘resource’ : Label of the platform with resources.
‘walltime’ : Amount of time the workflow is expected to run.
‘cpus’ : Number of CPU cores/threads.
Optional keys include:
‘project’ : The project that will be charged.
‘gpus’ : Number of GPUs to be used by the workflow.
‘access_schema’ : The key of an access mechanism to use.
‘queue’ : The name of the batch queue.
‘job_name’ : The specific name of a batch job.
Getter

Returns the resource description

Setter

Assigns a resource description

run()[source]

Purpose: Run the application manager. Once the workflow and resource manager have been assigned. Invoking this method will start the setting up the communication infrastructure, submitting a resource request and then submission of all the tasks.

property services

Returns the list of tasks used to start “global” services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution

Type

getter

property shared_data
Return list of filenames that are shared between multiple tasks

of the application

Setter

Assign a list of names of files that need to be staged to the remote machine

Type

getter

property sid

Get the session ID of the current EnTK execution

Getter

Returns the session ID of the EnTK execution

Type

String

property workflow

Return the last workflow assigned for execution :setter: Assign a new workflow to be executed

Type

getter

property workflows

Return a list of workflows assigned for execution

Type

getter

Exceptions

In this document, we document all the exception types that are covered by Ensemble Toolkit. This document is to be assumed as a growing list and not complete.

exception radical.entk.exceptions.EnTKError[source]

EnTKError is the generic exception type used by EnTK – exception arg messages are usually

exception radical.entk.exceptions.EnTKMissingError(obj, missing_attribute)[source]

MissingError is raised when an attribute that is mandatory is left unassigned by the user

exception radical.entk.exceptions.EnTKTypeError(expected_type, actual_type, entity=None)[source]

TypeError is raised if value of a wrong type is passed to a function or assigned as an attribute of an object

exception radical.entk.exceptions.EnTKValueError(obj, attribute, expected_value, actual_value)[source]

ValueError is raised if a value that is unacceptable is passed to a function or assigned as an attribute of an object

Developer Documentation

UML diagrams and other resources

This page presents the UML diagrams for the Ensemble Toolkit. Additional information to help understand the design of Ensemble Toolkit is also presented for interested developers.

Class Diagram

The following document describes the classes in EnTK, their data members and functional members.

_images/entk_class_diagram.jpg

Sequence Diagram

The interaction of these modules for one successful run of an application is described in the following figure:

_images/entk_sequence_diagram.jpg

State Diagram

The stateful objects Pipeline, Stage and Task undergo several state transition during the execution of a workflow. We document them in the following figure:

_images/entk_state_diagram.png

Events Recorded

Following is a list of events recorded in each module (in temporal order). These events can be used to profile durations between two events or a state and an event.

Resource Manager

create rmgr
validating rdesc
rdesc validated
populating rmgr
rmgr populated
rmgr created
creating rreq
rreq created
rreq submitted
resource active
canceling resource allocation
resource allocation canceled

App Manager

create amgr
amgr created
assigning workflow
validating workflow
workflow validated
amgr run started
init mqs setup
mqs setup done
init rreq submission
starting synchronizer thread
creating wfp obj
creating tmgr obj
synchronizer thread started
start termination
terminating synchronizer
termination done

WFprocessor

create wfp obj
wfp obj created
creating wfp process
starting wfp process
wfp process started
creating dequeue-thread
starting dequeue-thread
creating enqueue-thread
starting enqueue-thread
dequeue-thread started
enqueue-thread started
terminating dequeue-thread
terminating enqueue-thread
termination done
terminating wfp process
wfp process terminated

TaskManager

create tmgr obj
tmgr obj created
creating heartbeat thread
starting heartbeat thread
heartbeat thread started
creating tmgr process
starting tmgr process
tmgr process started
tmgr infrastructure setup done
cud from task - create
cud from task - done
task from cu - create
task from cu - done
terminating tmgr process
tmgr process terminated

API Reference for Developers

Application Manager API

class radical.entk.AppManager(config_path=None, reattempts=None, resubmit_failed=None, autoterminate=None, write_workflow=None, rts=None, rts_config=None, name=None, base_path=None, **kwargs)[source]

An application manager takes the responsibility of setting up the communication infrastructure, instantiates the ResourceManager, TaskManager, WFProcessor objects and all their threads and processes. This is the Master object running in the main process and is designed to recover from errors from all other objects, threads and processes.

Arguments
config_path

Url to config path to be read for AppManager

reattempts

number of attempts to re-invoke any failed EnTK components

resubmit_failed

resubmit failed tasks (True/False)

autoterminate

terminate resource reservation upon execution of all tasks of first workflow (True/False)

write_workflow

write workflow and mapping to rts entities to a file (post-termination)

rts

Specify RTS to use. Current options: ‘mock’, ‘radical.pilot’ (default if unspecified)

rts_config

Configuration for the RTS, accepts {‘sandbox_cleanup’: True/False,’db_cleanup’: True/False} when RTS is RP

name

Name of the Application. It should be unique between executions. (default is randomly assigned)

base_path

base path of logger and profiler with a session id, None (default) will use a current working directory

_get_message_to_sync(qname)[source]

Reads a message from the queue, and exchange the message to where it was published by update_task

_read_config(config_path, reattempts, resubmit_failed, autoterminate, write_workflow, rts, rts_config, base_path)[source]
_run_workflow()[source]
_setup_zmq()[source]

Purpose: Setup ZMQ system on the client side. We instantiate queue(s) ‘pendingq-’ for communication between the enqueuer thread and the task manager process. We instantiate queue(s) ‘completedq-’ for communication between the task manager and dequeuer thread. We instantiate queue ‘sync-to-master’ for communication from enqueuer/dequeuer/task_manager to the synchronizer thread. We instantiate queue ‘sync-ack’ for communication from synchronizer thread to enqueuer/dequeuer/task_manager.

_start_all_comps()[source]
_submit_rts_tmgr(rts_info)[source]

Purpose: Update the runtime system information in the task manager

_synchronizer()[source]
_synchronizer_work()[source]
Purpose: Thread in the master process to keep the workflow data

structure in appmanager up to date. We receive only tasks objects from the task manager.

Details: Important to note that acknowledgements of the type

channel.basic_ack() is an acknowledgement to the server that the msg was received. This is not to be confused with the Ack sent to the task_manager through the sync-ack queue.

_update_task(tdict)[source]
property name

Name for the application manager. Allows the user to setup the name of the application manager, as well as its session ID. This name should be unique between different EnTK executions, otherwise it will produce an error.

Getter

Returns the name of the application manager

Setter

Assigns the name of the application manager

Type

String

property outputs
Return list of filenames that are to be staged out after

execution

Setter

Assign a list of names of files that need to be staged from the remote machine

Type

getter

property resource_desc

The resource description is a dictionary that holds information about the resource(s) that will be used to execute the workflow.

The following keys are mandatory in all resource descriptions:
‘resource’ : Label of the platform with resources.
‘walltime’ : Amount of time the workflow is expected to run.
‘cpus’ : Number of CPU cores/threads.
Optional keys include:
‘project’ : The project that will be charged.
‘gpus’ : Number of GPUs to be used by the workflow.
‘access_schema’ : The key of an access mechanism to use.
‘queue’ : The name of the batch queue.
‘job_name’ : The specific name of a batch job.
Getter

Returns the resource description

Setter

Assigns a resource description

resource_terminate()[source]
run()[source]

Purpose: Run the application manager. Once the workflow and resource manager have been assigned. Invoking this method will start the setting up the communication infrastructure, submitting a resource request and then submission of all the tasks.

property services

Returns the list of tasks used to start “global” services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution

Type

getter

property shared_data
Return list of filenames that are shared between multiple tasks

of the application

Setter

Assign a list of names of files that need to be staged to the remote machine

Type

getter

property sid

Get the session ID of the current EnTK execution

Getter

Returns the session ID of the EnTK execution

Type

String

terminate()[source]
property workflow

Return the last workflow assigned for execution :setter: Assign a new workflow to be executed

Type

getter

property workflows

Return a list of workflows assigned for execution

Type

getter

Pipeline API

class radical.entk.Pipeline[source]

A pipeline represents a collection of objects that have a linear temporal execution order. In this case, a pipeline consists of multiple ‘Stage’ objects. Each `Stage_i` can execute only after all stages up to `Stage_(i-1)` have completed execution.

_decrement_stage()[source]

Purpose: Decrement stage pointer. Reset completed flag.

_increment_stage()[source]

Purpose: Increment stage pointer. Also check if Pipeline has completed.

_validate()[source]

Purpose: Validate that the state of the current Pipeline is ‘DESCRIBED’ (user has not meddled with it). Also validate that the current Pipeline contains Stages.

classmethod _validate_entities(stages)[source]

Purpose: Validate whether the argument ‘stages’ is of list of Stage objects

Argument

list of Stage objects

add_stages(value)[source]

Appends stages to the current Pipeline

Argument

List of Stage objects

as_dict()[source]

Convert current Pipeline (i.e. its attributes) into a dictionary

Returns

python dictionary

property completed

Returns whether the Pipeline has completed

Returns

Boolean

property current_stage

Returns the current stage being executed

Returns

Integer

from_dict(d)[source]

Create a Pipeline from a dictionary. The change is in inplace.

Argument

python dictionary

Returns

None

property lock

Returns the lock over the current Pipeline

Returns

Lock object

property luid

Unique ID of the current pipeline (fully qualified). For the pipeline class, his is an alias to uid.

Example

>>> pipeline.luid
pipe.0001
Getter

Returns the fully qualified uid of the current pipeline

Type

String

property name

Name of the pipeline useful for bookkeeping and to refer to this pipeline while data staging. Do not use a ‘,’ or ‘_’ in an object’s name.

Getter

Returns the name of the pipeline

Setter

Assigns the name of the pipeline

Type

String

resume()[source]

Continue execution of paused stages and tasks.

  • The resume() method can only be called on a suspended pipeline, an exception will be raised if that condition is not met.

  • The state of a resumed pipeline will be set to the state the pipeline had before suspension.

property stages

Stages of the list

Getter

Returns the stages in the current Pipeline

Setter

Assigns the stages to the current Pipeline

Type

List

property state

Current state of the pipeline

Getter

Returns the state of the current pipeline

Type

String

property state_history

Returns a list of the states obtained in temporal order

Returns

list

suspend()[source]

Pause execution of the pipeline: stages and tasks that are executing will continue to execute, but no new stages and tasks will be eligible for execution until resume() is called.

  • The suspend() method can not be called on a suspended or completed pipeline, doing so will result in an exeption.

  • The state of the pipeline will be set to SUSPENDED.

property uid

Unique ID of the current pipeline

Getter

Returns the unique id of the current pipeline

Type

String

Base ResourceManager API

class radical.entk.execman.base.resource_manager.Base_ResourceManager(resource_desc, sid, rts, rts_config)[source]

A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs.

Arguments
resource_desc

dictionary with details of the resource request and access credentials of the user

example

resource_desc = { | ‘resource’ : ‘xsede.stampede’, | ‘walltime’ : 120, | ‘cpus’ : 64, | ‘gpus’ : 0, # optional | ‘project’ : ‘TG-abcxyz’, | ‘memory’ : ‘16000’ # optional | ‘queue’ : ‘abc’, # optional | ‘access_schema’ : ‘ssh’ # optional | ‘job_name’ : ‘test_job’ # optional}

_populate()[source]
Purpose: Populate the ResourceManager class with the validated

resource description

_terminate_resource_request()[source]
Purpose: Cancel resource request by terminating any reservation

on any acquired resources or resources pending acquisition

_validate_resource_desc()[source]

Purpose: Validate the provided resource description

property access_schema

Return user specified access schema – ‘ssh’ or ‘gsissh’ or None

Type

getter

property cpus

Return user specified number of cpus

Type

getter

get_completed_states()[source]

Purpose: Test if a resource allocation was submitted

get_resource_allocation_state()[source]

Purpose: Get the state of the resource allocation

get_rts_info()[source]

Purpose: Return the RTS information as a dict.

property gpus

Return user specified number of gpus

Type

getter

property job_name

Return user specified job_name

Type

getter

property memory

Return user specified amount of memory

Type

getter

property outputs

list of files to be staged from remote after execution :setter: Assign a list of names of files that need to be staged from the

remote machine

Type

getter

property project

Return user specified project ID

Type

getter

property queue

Return user specified resource queue to be used

Type

getter

property resource

Return user specified resource name

Type

getter

property services

Returns the list of tasks used to start “global” services :setter: Assigns a list of service tasks, which are launched before any stage starts and run during the whole workflow execution

Type

getter

property shared_data
list of files to be staged to remote and that are common to

multiple tasks

Setter

Assign a list of names of files that need to be accessible to tasks

Type

getter

submit_resource_request()[source]

Purpose: Submit resource request per provided description

property walltime

Return user specified walltime

Type

getter

RP ResourceManager API

class radical.entk.execman.rp.resource_manager.ResourceManager(resource_desc, sid, rts_config)[source]

A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs. This ResourceManager uses the RADICAL Pilot as the underlying runtime system.

Arguments
resource_desc

dictionary with details of the resource request and access credentials of the user

example

resource_desc = { | ‘resource’ : ‘xsede.stampede’, | ‘walltime’ : 120, | ‘cpus’ : 64, | ‘project’ : ‘TG-abcxyz’, | ‘queue’ : ‘abc’, # optional | ‘access_schema’ : ‘ssh’ # optional}

_terminate_resource_request()[source]

Purpose: Cancel the RADICAL Pilot Job

get_completed_states()[source]

Purpose: return states which signal completed resource allocation

get_resource_allocation_state()[source]

Purpose: Get the state of the resource allocation

get_rts_info()[source]

Purpose: Return the RTS information as a dict.

property pilot

Return the submitted Pilot

Type

getter

property pmgr

Return the Radical Pilot manager currently being used

Type

getter

property session

Return the Radical Pilot session currently being used

Type

getter

submit_resource_request()[source]
Purpose: Create and submits a RADICAL Pilot Job as per the user

provided resource description

Dummy ResourceManager API

class radical.entk.execman.mock.resource_manager.ResourceManager(resource_desc, sid, rts_config)[source]

A resource manager takes the responsibility of placing resource requests on different, possibly multiple, DCIs. This ResourceManager uses mocks an implementation by doing nothing, it is only usable for testing.

Arguments
resource_desc

dictionary with details of the resource request and access credentials of the user

example

resource_desc = { | ‘resource’ : ‘xsede.stampede’, | ‘walltime’ : 120, | ‘cpus’ : 64, | ‘project’ : ‘TG-abcxyz’, | ‘queue’ : ‘abc’, # optional | ‘access_schema’ : ‘ssh’ # optional}

_populate()[source]

Purpose: evaluate attributes provided in the resource description

_terminate_resource_request()[source]

Purpose: Cancel the resource

_validate_resource_desc()[source]

Purpose: validate the provided resource description

get_completed_states()[source]

Purpose: test if a resource allocation was submitted

get_resource_allocation_state()[source]

Purpose: get the state of the resource allocation

get_rts_info()[source]

Purpose: Return the RTS information as a dict.

submit_resource_request(*args)[source]

Purpose: Create a resourceas per provided resource description

Stage API

class radical.entk.Stage[source]

A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of ‘Task’ objects. All tasks of the same stage may execute concurrently.

_check_stage_complete()[source]
Purpose: Check if all tasks of the current stage have completed, i.e.,

are in either DONE or FAILED state.

_set_tasks_state(value)[source]

Purpose: Set state of all tasks of the current stage.

Arguments

String

_validate()[source]
Purpose: Validate that the state of the current Stage is ‘DESCRIBED’

(user has not meddled with it). Also validate that the current Stage contains Tasks

classmethod _validate_entities(tasks)[source]
Purpose: Validate whether the ‘tasks’ is of type set. Validate the

description of each Task.

add_tasks(value)[source]

Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won’t be added but updated with changes.

Argument

iterable task object

as_dict()[source]

Convert current Stage into a dictionary

Returns

python dictionary

from_dict(d)[source]

Create a Stage from a dictionary. The change is in inplace.

Argument

python dictionary

Returns

None

property luid

Unique ID of the current stage (fully qualified).

Example

>>> stage.luid
pipe.0001.stage.0004
Getter

Returns the fully qualified uid of the current stage

Type

String

property name

Name of the stage. Do not use a ‘,’ or ‘_’ in an object’s name.

Getter

Returns the name of the current stage

Setter

Assigns the name of the current stage

Type

String

property parent_pipeline

Returns the pipeline this stage belongs to :setter: Assigns the pipeline uid this stage belongs to

Type

getter

property post_exec

The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished.

Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon.

Example:

s1.post_exec = func_post

def func_post():

if condition is met:

s = Stage() t = Task() t.executable = ‘/bin/sleep’ t.arguments = [‘30’] s.add_tasks(t) p.add_stages(s)

else:

# do nothing pass

property state

Current state of the stage

Getter

Returns the state of the current stage

Type

String

property state_history

Returns a list of the states obtained in temporal order

Returns

list

property tasks

Tasks of the stage

Getter

Returns all the tasks of the current stage

Setter

Assigns tasks to the current stage

Type

set of Tasks

property uid

Unique ID of the current stage

Getter

Returns the unique id of the current stage

Type

String

Task API

class radical.entk.Task(from_dict=None)[source]

A Task is an abstraction of a computational unit. In this case, a Task consists of its executable along with its required software environment, files to be staged as input and output.

At the user level a Task is to be populated by assigning attributes. Internally, an empty Task is created and population using the from_dict function. This is to avoid creating Tasks with new uid as tasks with new uid offset the uid count file in radical.utils and can potentially affect the profiling if not taken care.

uid

[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RE if the field is not set, and cannot be re-assigned (immutable attribute).

name

[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads. Such characters as ‘,’ or ‘_’ are not allowed in a name.

state

[type: str | default: “DESCRIBED”] Current state of the task, its initial value re.states.INITIAL is set during the task initialization, and this value is also appended into state_history attribute.

state_history

[type: list | default: [“DESCRIBED”]] List of the states obtained in temporal order. Every time new state is set, it is appended to state_history automatically.

executable

[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.

arguments

[type: list | default: []] List of arguments to be supplied to the executable.

environment

[type: dict | default: {}] Environment variables to set in the

environment before the execution process.

sandbox

[type: str | default: “”] This specifies the working directory of the task. By default, the task’s uid is used as a sandbox for the task.

pre_launch

[type: list | default: []] List of actions (shell commands) to perform before the task is launched.

Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of resources! Deviating from that rule will likely result in reduced overall throughput.

post_launch

[type: list | default: []] List of actions (shell commands) to perform after the task finishes.

The same remarks as on pre_launch apply

pre_exec

[type: list | default: []] List of actions (shell commands) to perform after task is launched, but before rank(s) starts execution.

The same remarks as on pre_launch apply

post_exec

[type: list | default: []] List of actions (shell commands) to perform right after rank(s) finishes.

The same remarks as on pre_launch apply

cpu_reqs

[type: CpuReqs | default: CpuReqs()] The CPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.

The expected format is dict-like:

task.cpu_reqs = {‘cpu_processes’X,

‘cpu_process_type’ : None/’MPI’, ‘cpu_threads’ : Y, ‘cpu_thread_type’ : None/’OpenMP’}

This description means that the Task is going to spawn X processes and Y threads per each of these processes to run on CPUs. Hence, the total number of cpus required by the Task is X * Y for all the processes and threads to execute concurrently.

By default, 1 CPU process and 1 CPU thread per process are requested.

gpu_reqs

[type: GpuReqs | default: GpuReqs()] The GPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.

The expected format is dict-like:

task.gpu_reqs = {‘gpu_processes’X,

‘gpu_process_type’ : None/’CUDA’/’ROCm’, ‘gpu_threads’ : Y, ‘gpu_thread_type’ : None}

This description means that each rank of the task is going to use X GPUs with Y GPU-threads. By default, 0 GPUs are requested.

lfs_per_process

[type: int | default: 0] Local File Storage per process - amount of space (MB) required on the local file system of the node by the task.

mem_per_process

[type: int | default: 0] Amount of memory required by the task.

upload_input_data

[type: list | default: []] List of file names required to be transferred from a local client storage to the location of the task or data staging area before task starts.

copy_input_data

[type: list | default: []] List of file names required to be copied from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

Example of a file location format:

$Pipeline_%s_Stage_%s_Task_%s, where %s is replaced entity name/uid

# output.txt is copied from a t1 task to a current task sandbox t2.copy_input_data = [‘$Pipeline_%s_Stage_%s_Task_%s/output.txt’ %

(p.name, s1.name, t1.name)]

[type: list | default: []] List of file names required to be symlinked from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

move_input_data

[type: list | default: []] List of file names required to be moved from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

copy_output_data

[type: list | default: []] List of file names required to be copied from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

Example of defining data to be copied:

# results.txt will be copied to a data staging area $SHARED t.copy_output_data = [‘results.txt > $SHARED/results.txt’]

[type: list | default: []] List of file names required to be symlinked from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

move_output_data

[type: list | default: []] List of file names required to be moved from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

download_output_data

[type: list | default: []] List of file names required to be downloaded from a current task to a local client storage space when a task is finished.

Example of defining data to be downloaded:

# results.txt is transferred to a local client storage space t.download_output_data = [‘results.txt’]

stdout

[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.

stderr

[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.

stage_on_error

[type: bool | default: False] Flag to allow staging out data if task got failed (output staging is normally skipped on FAILED or CANCELED tasks).

exit_code

[type: int | default: None] Get the exit code for finished tasks: 0 - for successful tasks; 1 - for failed tasks.

exception

[type: str | default: None] Get the representation of the exception which caused the task to fail.

exception_detail

[type: str | default: None] Get additional details (traceback or error messages) to the exception which caused this task to fail.

path

[type: str | default: “”] Get the path of the task on the remote machine. Useful to reference files generated in the current task.

tags

[type: dict | default: None] The tags for the task that can be used while scheduling by the RTS (configuration specific tags, which influence task scheduling and execution, e.g., tasks co-location).

rts_uid

[type: str | default: None] Unique RTS ID of the current task.

parent_stage

[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the stage, which contains the current task.

parent_pipeline

[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the pipeline, which contains the current task.

annotations

[type: Annotations | default: None] Annotations to describe task’s input and output files, and sets dependencies between tasks.

metadata

[type: dict | default: {}] User/system defined metadata.

Read-only attributes
property luid

[type: str] Unique ID of the current task (fully qualified).

> task.luid pipe.0001.stage.0004.task.0234

_cast = True
_check = True
_defaults = {'arguments': [], 'copy_input_data': [], 'copy_output_data': [], 'cpu_reqs': {'cpu_process_type': None, 'cpu_processes': 1, 'cpu_thread_type': None, 'cpu_threads': 1}, 'download_output_data': [], 'environment': {}, 'exception': None, 'exception_detail': None, 'executable': '', 'exit_code': None, 'gpu_reqs': {'gpu_process_type': None, 'gpu_processes': 0.0, 'gpu_thread_type': None, 'gpu_threads': 1}, 'lfs_per_process': 0, 'link_input_data': [], 'link_output_data': [], 'mem_per_process': 0, 'metadata': {}, 'move_input_data': [], 'move_output_data': [], 'name': '', 'parent_pipeline': {'name': None, 'uid': None}, 'parent_stage': {'name': None, 'uid': None}, 'path': '', 'post_exec': [], 'post_launch': [], 'pre_exec': [], 'pre_launch': [], 'rts_uid': None, 'sandbox': '', 'stage_on_error': False, 'state': '', 'state_history': [], 'stderr': '', 'stdout': '', 'tags': None, 'uid': '', 'upload_input_data': []}
_post_verifier(k, v)[source]
_schema = {'annotations': <class 'radical.entk.task.Annotations'>, 'arguments': [<class 'str'>], 'copy_input_data': [<class 'str'>], 'copy_output_data': [<class 'str'>], 'cpu_reqs': <class 'radical.entk.task.CpuReqs'>, 'download_output_data': [<class 'str'>], 'environment': {<class 'str'>: <class 'str'>}, 'exception': <class 'str'>, 'exception_detail': <class 'str'>, 'executable': <class 'str'>, 'exit_code': <class 'int'>, 'gpu_reqs': <class 'radical.entk.task.GpuReqs'>, 'lfs_per_process': <class 'int'>, 'link_input_data': [<class 'str'>], 'link_output_data': [<class 'str'>], 'mem_per_process': <class 'int'>, 'metadata': {<class 'str'>: None}, 'move_input_data': [<class 'str'>], 'move_output_data': [<class 'str'>], 'name': <class 'str'>, 'parent_pipeline': {<class 'str'>: None}, 'parent_stage': {<class 'str'>: None}, 'path': <class 'str'>, 'post_exec': [<class 'str'>], 'post_launch': [<class 'str'>], 'pre_exec': [<class 'str'>], 'pre_launch': [<class 'str'>], 'rts_uid': <class 'str'>, 'sandbox': <class 'str'>, 'stage_on_error': <class 'bool'>, 'state': <class 'str'>, 'state_history': [<class 'str'>], 'stderr': <class 'str'>, 'stdout': <class 'str'>, 'tags': {<class 'str'>: None}, 'uid': <class 'str'>, 'upload_input_data': [<class 'str'>]}
_self_default = False
_uids = []
_validate()[source]

Purpose: Validate that the state of the task is ‘DESCRIBED’ and that an executable has been specified for the task.

_verify()[source]

Can be overloaded

_verify_setter(k, v)[source]
annotate(inputs: Optional[Union[Dict, List, str]] = None, outputs: Optional[Union[List, str]] = None) None[source]

Adds dataflow annotations with provided input and output files, and defines dependencies between tasks.

inputs

List of input files. If a file is produced by the previously executed task, then the corresponding input element is provided as a dictionary with the task instance as a key. Example: inputs=[‘file1’, {task0: ‘file2’, task1: [‘file2’]}]

Type

list, optional

outputs

List of produced/generated files.

Type

list, optional

from_dict(d)[source]

Re-initialization, resets all attributes with provided input data.

property luid

Unique ID of the current task (fully qualified).

Example

> task.luid pipe.0001.stage.0004.task.123456

Luid

Returns the fully qualified uid of the current task

Type

str

Base TaskManager API

class radical.entk.execman.base.task_manager.Base_TaskManager(sid, rmgr, rts, zmq_info)[source]

A Task Manager takes the responsibility of dispatching tasks it receives from a ‘pending’ queue for execution on to the available resources using a runtime system. Once the tasks have completed execution, they are pushed on to the completed queue for other components of EnTK to thread.

Arguments
rmgr

(ResourceManager) Object to be used to access the Pilot where the tasks can be submitted

Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption.

_advance(obj, obj_type, new_state, qname)[source]
_setup_zmq(zmq_info)[source]
_sync_with_master(obj, obj_type, qname)[source]
_tmgr(uid, rmgr, zmq_info)[source]
Purpose: Method to be run by the tmgr thread. This method receives

a Task from the ‘pending’ queue and submits it to the RTS. At all state transititons, they are synced (blocking) with the AppManager in the master thread.

Details: The AppManager can re-invoke the tmgr thread with this

function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholder_dict, which stores the path of each of the tasks on the remote machine.

check_manager()[source]

Purpose: Check if the tmgr thread is alive and running

start_manager()[source]

Purpose: Method to start the tmgr thread. The tmgr function is not to be accessed directly. The function is started in a separate thread using this method.

terminate_manager()[source]

Purpose: Method to terminate the tmgr thread. This method is blocking as it waits for the tmgr thread to terminate (aka join).

RP TaskManager API

class radical.entk.execman.rp.task_manager.TaskManager(sid, rmgr, zmq_info)[source]

A Task Manager takes the responsibility of dispatching tasks it receives from a queue for execution on to the available resources using a runtime system. In this case, the runtime system being used RADICAL Pilot. Once the tasks have completed execution, they are pushed on to another queue for other components of EnTK to access.

Arguments
rmgr

(ResourceManager) Object to be used to access the Pilot where the tasks can be submitted

Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption.

_process_tasks(task_queue, tmgr)[source]
Purpose: The new thread that gets spawned by the main tmgr thread

invokes this function. This function receives tasks from ‘task_queue’ and submits them to the RADICAL Pilot RTS.

_tmgr(uid, rmgr, zmq_info)[source]
Purpose: This method has 2 purposes: receive tasks from the

‘pending’ queue, start a new thread that processes these tasks and submits to the RTS.

The new thread is responsible for pushing completed tasks (returned by the RTS) to the dequeueing queue. It also converts Tasks into TDs and CUs into (partially described) Tasks. This conversion is necessary since the current RTS is RADICAL Pilot. Once Tasks are recovered from a CU, they are then pushed to the completed queue. At all state transitions, they are synced (blocking) with the AppManager in the master process.

Details: The AppManager can re-invoke the tmgr thread with this

function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholders, which stores the path of each of the tasks on the remote machine.

_update_resource(pilot, tmgr)[source]

Update used pilot.

start_manager()[source]
Purpose: Method to start the tmgr thread. The tmgr function

is not to be accessed directly. The function is started in a separate thread using this method.

Dummy TaskManager API

class radical.entk.execman.mock.task_manager.TaskManager(sid, rmgr, zmq_info)[source]

A Task Manager takes the responsibility of dispatching tasks it receives from a ‘pending’ queue for execution on to the available resources using a runtime system. Once the tasks have completed execution, they are pushed on to the completed queue for other components of EnTK to process.

Arguments
rmgr

(ResourceManager) Object to be used to access the Pilot where the tasks can be submitted

Currently, EnTK is configured to work with one pending queue and one completed queue. In the future, the number of queues can be varied for different throughput requirements at the cost of additional Memory and CPU consumption.

_process_tasks(task_queue, rmgr)[source]
Purpose: The new thread that gets spawned by the main tmgr process

invokes this function. This function receives tasks from ‘task_queue’ and submits them to the RADICAL Pilot RTS.

_tmgr(uid, rmgr, zmq_info)[source]
Purpose: Method to be run by the tmgr process. This method receives

a Task from the ‘pending’ and submits it to the RTS. Currently, it also converts Tasks into CUDs and CUs into (partially described) Tasks. This conversion is necessary since the current RTS is RADICAL Pilot. Once Tasks are recovered from a CU, they are then pushed to the completed queue. At all state transititons, they are synced (blocking) with the AppManager in the master process.

Details: The AppManager can re-invoke the tmgr process with this

function if the execution of the workflow is still incomplete. There is also population of a dictionary, placeholders, which stores the path of each of the tasks on the remote machine.

start_manager()[source]
Purpose: Method to start the tmgr process. The tmgr function

is not to be accessed directly. The function is started in a separate thread using this method.

WFProcessor API

class radical.entk.appman.wfprocessor.WFprocessor(sid, workflow, resubmit_failed, zmq_info)[source]

An WFprocessor (workflow processor) takes the responsibility of dispatching tasks from the various pipelines of the workflow according to their relative order to the TaskManager. All state updates are relflected in the AppManager as we operate on the reference of the same workflow object. The WFprocessor also retrieves completed tasks from the TaskManager and updates states of PST accordingly.

Arguments
sid

(str) session id used by the profiler and loggers

workflow

(set) REFERENCE of the AppManager’s workflow

resubmit_failed

(bool) True if failed tasks should be resubmitted

zmq_info

(dict) zmq queue addresses

_advance(obj, obj_type, new_state)[source]

transition obj of type obj_type into state new_state

_create_workload()[source]
_dequeue()[source]

Purpose: This is the function that is run in the dequeue thread. This function extracts Tasks from the completed queues and updates the workflow.

_enqueue()[source]

Purpose: This is the function that is run in the enqueue thread. This function extracts Tasks from the workflow that exists in the WFprocessor object and pushes them to the queues in the pending_q list.

_execute_post_exec(pipe, stage)[source]

Purpose: This method executes the post_exec step of a stage for a pipeline.

_execute_workload(workload, scheduled_stages)[source]
_setup_zmq(zmq_info)[source]
_update_dequeued_task(deq_task)[source]
check_processor()[source]
reset_workflow()[source]

When a component is restarted we reset all the tasks that did not finish to the first state. Then they are scheduled again for execution.

start_processor()[source]

Purpose: Method to start the wfp process. The wfp function is not to be accessed directly. The function is started in a separate process using this method.

terminate_processor()[source]

Purpose: Method to terminate the wfp process. This method is blocking as it waits for the wfp process to terminate (aka join).

property workflow
workflow_incomplete()[source]

Purpose: Method to check if the workflow execution is incomplete.

Unit tests & Integration tests

All tests are run on a Travis. Tests are run using pytest and test coverage is measured using coverage and reported using codecov .

Unit tests test the functionality of basic methods and functions offered by EnTK. We thrive to create and include a new test for every new feature offered by EnTK.

Integration tests test the correct communication between different EnTK components and packages and services used by EnTK, such as RADICAL-Pilot.

Writing tests for EnTK requires to follow the test coding guidelines of RADICAL. An example can be found here.