5.3. Sequence of Workflows

Another common execution pattern consists of same-session sequential workflows with mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this a Sequence of Workflows.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

In the following example, we create 2 sequential workflows, each with 2 Pipelines and 3 Stages per Pipeline. For demonstration purposes, each Task does nothing but “sleep” for 3 seconds. The example suggests starting AppManager with autoterminate=False and using appman.terminate() once all pipelines are finished. This allows you to use the same application manager for the second workflow.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/simple/scripts.

python sow.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

host = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = os.environ.get('RMQ_PORT',     5672)


def generate_pipeline():

    p = Pipeline()
    s1 = Stage()

    t1 = Task()
    t1.executable = '/bin/sleep'
    t1.arguments = ['3']

    s1.add_tasks(t1)

    p.add_stages(s1)
    s2 = Stage()
    t2 = Task()
    t2.executable = '/bin/sleep'
    t2.arguments = ['3']

    s2.add_tasks(t2)
    p.add_stages(s2)
    s3 = Stage()

    t3 = Task()
    t3.executable = '/bin/sleep'
    t3.arguments = ['3']

    s3.add_tasks(t3)
    p.add_stages(s3)

    return p


if __name__ == '__main__':

    appman   = AppManager(hostname=host, port=port, autoterminate=False)
    res_dict = {
        'resource': 'local.localhost',
        'walltime': 10,
        'cpus'    :  8
    }
    appman.resource_desc = res_dict


    pipelines = list()
    for cnt in range(2):
        pipelines.append(generate_pipeline())

    appman.workflow = set(pipelines)
    appman.run()

    print('1 ===================================================')


    pipelines = list()
    for cnt in range(2):
        pipelines.append(generate_pipeline())

    appman.workflow = set(pipelines)
    appman.run()

    print('2 ===================================================')

    appman.terminate()

    print('t ===================================================')