5.1. Pipeline of Ensembles

One of the most common execution patterns consists of one Pipeline with multiple Stages where each Stage consists of several Tasks. We call this a Pipeline of Ensembles. A pictorial representation of this pattern is provided below.

Pipeline of Ensembles

Figure 1: Pipeline of Ensembles

We encourage you to take a look at the Ensemble of Pipelines example on the next page and compare it with the above pattern.


The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.


This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

In the following example, we create one Pipeline with three Stages. The Task in the first Stage creates a file. There are 10 Tasks in the second Stage that perform a character count on that file. The 10 Tasks in the third Stage perform a checksum on the output of each Task from the second stage.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/simple/scripts.

python poe.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'

# Description of how the RabbitMQ process is accessible
# No need to change/set any variables if you installed RabbitMQ has a system
# process. If you are running RabbitMQ under a docker container or another
# VM, set "RMQ_HOSTNAME" and "RMQ_PORT" in the session where you are running
# this script.
hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = int(os.environ.get('RMQ_PORT', 5672))

def generate_pipeline():

    # Create a Pipeline object
    p = Pipeline()
    p.name = 'p1'

    # Create a Stage object
    s1 = Stage()
    s1.name = 's1'

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.name = 't1'
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage

    # Add Stage to the Pipeline

    # Create another Stage object to hold character count tasks
    s2 = Stage()
    s2.name = 's2'
    s2_task_uids = []

    for cnt in range(30):

        # Create a Task object
        t2 = Task()
        t2.name = 't%s' % (cnt + 1)
        t2.executable = '/bin/bash'
        t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
        # Copy data from the task in the first stage to the current task's location
        t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)]

        # Add the Task to the Stage

    # Add Stage to the Pipeline

    # Create another Stage object to hold checksum tasks
    s3 = Stage()
    s3.name = 's3'

    for cnt in range(30):

        # Create a Task object
        t3 = Task()
        t3.name = 't%s' % (cnt + 1)
        t3.executable = '/bin/bash'
        t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt']
        # Copy data from the task in the first stage to the current task's location
        t3.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/ccount.txt' % (p.name, s2.name, s2_task_uids[cnt])]
        # Download the output of the current task to the current location
        t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt]

        # Add the Task to the Stage

    # Add Stage to the Pipeline

    return p

if __name__ == '__main__':

    # Create Application Manager
    appman = AppManager(hostname=hostname, port=port)

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {

    #    'resource': 'ncsa.bw_aprun',
    #    'walltime': 10,
    #    'cpus': 32,
    #'project': 'bamm',
    #'queue': 'high'
	'resource': 'local.localhost',
	'walltime': 10,

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set([generate_pipeline()])

    # Run the Application Manager