Adaptive applications: Task-order

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/advanced/scripts.

For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.

CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def func_post():
    if CUR_NEW_STAGE <= MAX_NEW_STAGE:
        ...

s = Stage()
s.post_exec = func_post

In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage that each run ‘sleep 30’. After a Stage is DONE (i.e. all tasks of the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the remaining stages are shuffled. If not, no operation is performed.

python adapt_to.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


CUR_NEW_STAGE=0
MAX_NEW_STAGE=4

def generate_pipeline():

    def func_post():

        global CUR_NEW_STAGE, MAX_NEW_STAGE

        if CUR_NEW_STAGE <= MAX_NEW_STAGE:
            CUR_NEW_STAGE += 1
            shuffle(p.stages[CUR_NEW_STAGE:])

        else:
            print('Done')

    # Create a Pipeline object
    p = Pipeline()

    for s in range(MAX_NEW_STAGE+1):

        # Create a Stage object
        s1 = Stage()

        for i in range(CUR_TASKS):

            t1 = Task()
            t1.executable = '/bin/sleep'
            t1.arguments = [ '30']

            # Add the Task to the Stage
            s1.add_tasks(t1)

        # Add post-exec to the Stage
        s1.post_exec = func_post

        # Add Stage to the Pipeline
        p.add_stages(s1)

    return p

if __name__ == '__main__':

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cores and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 15,
        'cpus': 2,
    }

    # Create Application Manager
    appman = AppManager()
    appman.resource_desc = res_dict

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()