Adaptive applications: Task-attribute

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/advanced/scripts.

For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

s1.post_exec = func_post

def func_post():

    global CUR_NEW_STAGE, MAX_NEW_STAGE

    if CUR_NEW_STAGE <= MAX_NEW_STAGE:
        CUR_NEW_STAGE += 1
        s = Stage()

        for i in range(10):
            t = Task()
            t.executable = '/bin/sleep'
            t.arguments = ['30']
            s.add_tasks(t)

        s.post_exec = func_condition
        p.add_stages(s)

In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage, each running ‘sleep 30’. After a Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the arguments of the tasks of the next Stage are modified. If not, no operation is performed, and the pipeline will terminate.

python adapt_ta.py

Let’s take a look at the complete code in the example. Note: You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def generate_pipeline():

    def func_post():

        global CUR_NEW_STAGE, MAX_NEW_STAGE

        if CUR_NEW_STAGE <= MAX_NEW_STAGE:
            CUR_NEW_STAGE += 1
            for t in p.stages[CUR_NEW_STAGE].tasks:
                dur = randint(10,30)
                t.arguments = [str(dur)]
        else:
            print('Done')


    # Create a Pipeline object
    p = Pipeline()

    for s in range(MAX_NEW_STAGE+1):

        # Create a Stage object
        s1 = Stage()

        for _ in range(CUR_TASKS):

            t1 = Task()
            t1.executable = '/bin/sleep'
            t1.arguments = [ '30']

            # Add the Task to the Stage
            s1.add_tasks(t1)

        # Add post-exec to the Stage
        s1.post_exec = func_post

        # Add Stage to the Pipeline
        p.add_stages(s1)

    return p

if __name__ == '__main__':

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 15,
        'cpus': 2,
    }

    # Create Application Manager
    appman = AppManager()
    appman.resource_desc = res_dict

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()