5.2. Ensemble of Pipelines

One of the most common execution patterns consists of mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this an Ensemble of Pipelines. A pictorial representation of this pattern is provided below.

Ensemble of Pipelines

Figure 1: Ensemble of Pipelines

We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

In the following example, we create 10 Pipelines, each with three Stages. The Task in the first Stage creates a file. The Task in the second Stage performs a character count on that file. The Task in the third Stage performs a checksum on the output of the Task from the second stage.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/simple/scripts.

python eop.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


# Description of how the RabbitMQ process is accessible
# No need to change/set any variables if you installed RabbitMQ has a system
# process. If you are running RabbitMQ under a docker container or another
# VM, set "RMQ_HOSTNAME" and "RMQ_PORT" in the session where you are running
# this script.
hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = os.environ.get('RMQ_PORT', 5672)


def generate_pipeline():

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)

    # Create another Stage object to hold character count tasks
    s2 = Stage()

    # Create a Task object
    t2 = Task()
    t2.executable = '/bin/bash'
    t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
    # Copy data from the task in the first stage to the current task's location
    t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)]

    # Add the Task to the Stage
    s2.add_tasks(t2)

    # Add Stage to the Pipeline
    p.add_stages(s2)

    # Create another Stage object to hold checksum tasks
    s3 = Stage()

    # Create a Task object
    t3 = Task()
    t3.executable = '/bin/bash'
    t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt']
    # Copy data from the task in the first stage to the current task's location
    t3.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/ccount.txt' % (p.name, s2.name, t2.name)]
    # Download the output of the current task to the current location
    t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt]

    # Add the Task to the Stage
    s3.add_tasks(t3)

    # Add Stage to the Pipeline
    p.add_stages(s3)

    return p


if __name__ == '__main__':

    pipelines = []

    for cnt in range(10):
        pipelines.append(generate_pipeline())

    # Create Application Manager
    appman = AppManager(hostname=hostname, port=port)

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, and cpus
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    # Note: The list order is not guaranteed to be preserved
    appman.workflow = set(pipelines)

    # Run the Application Manager
    appman.run()