6.1. Adaptive applications: Task-count

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.


The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.


This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/advanced/scripts.

For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.


def func_post():

s = Stage()
s.post_exec = func_post

In the following example, we initially create 1 Pipeline with one Stage. There are 10 tasks in the first Stage that each runs ‘sleep 30’. After the Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the number of new stages added is less than 4. If yes, we add a new Stage with similar tasks as before to the Pipeline. If 4 stages have already been added, no more stages are added.

python adapt_tc.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


def generate_pipeline():

    def func_post():



            CUR_NEW_STAGE += 1
            s = Stage()
            for i in range(10):
                t = Task()
                t.executable = '/bin/sleep'
                t.arguments  = [ '30']

        # Add post-exec to the Stage
        s.post_exec = func_post


    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    for i in range(10):

        t1 = Task()
        t1.executable = 'sleep'
        t1.arguments = [ '30']

        # Add the Task to the Stage

    # Add post-exec to the Stage
    s1.post_exec = func_post

    # Add Stage to the Pipeline

    return p

if __name__ == '__main__':

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

            'resource': 'local.localhost',
            'walltime': 15,
            'cpus': 2,

    # Create Application Manager
    appman = AppManager()
    appman.resource_desc = res_dict

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager