Adding Data

Data management is one of the important elements of any application. In this section, we will take a look at how we can manage data between tasks.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

In the following example, we will create a Pipeline of two Stages, each with one Task. In the first stage, we will create a file of size 1MB. In the next stage, we will perform a character count on the same file and write to another file. Finally, we will bring that output to the current location.

Since we already know how to create this workflow, we simply present the code snippet concerned with the data movement. The task in the second stage needs to perform two data movements: a) copy the data created in the first stage to its current directory and b) bring the output back to the directory where the script is executed. Below we present the statements that perform these operations.

s2.add_tasks(t2)

# Add Stage to the Pipeline
p.add_stages(s2)

# Create Application Manager
appman = AppManager()

Intermediate data are accessible through a unique identification formed by $Pipeline_%s_Stage_%s_Task_%s/{filename} where %s is replaced by entity name of pipeline, stage and task. If name is not given, .uid is available to locate files across tasks. Task API has more information about the use of API.

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_LOG_LVL=DEBUG.

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os

# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'


if __name__ == '__main__':

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    t1 = Task()
    t1.executable = '/bin/bash'
    t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']

    # Add the Task to the Stage
    s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)


    # Create another Stage object
    s2 = Stage()
    s2.name = 'Stage.2'

    # Create a Task object
    t2 = Task()
    t2.executable = '/bin/bash'
    t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
    # Copy data from the task in the first stage to the current task's location
    t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)]
    # Download the output of the current task to the current location
    t2.download_output_data = ['ccount.txt']

    # Add the Task to the Stage
    s2.add_tasks(t2)

    # Add Stage to the Pipeline
    p.add_stages(s2)

    # Create Application Manager
    appman = AppManager()

    # Assign the workflow as a set or list of Pipelines to the Application Manager
    appman.workflow = set([p])

    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cpus and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

        'resource': 'local.localhost',
        'walltime': 10,
        'cpus': 1
    }

    # Assign resource request description to the Application Manager
    appman.resource_desc = res_dict

    # Run the Application Manager
    appman.run()

Handling data from login node

The following example shows how to configure a task to fetch data at runtime, when compute nodes do not have internet access.

t = Task()
t.name = 'download-task'
t.executable = '/usr/bin/ssh'
t.arguments = ["<username>@<hostname>",
               "'bash -l /path/to/download_script.sh <input1> <input2>'"]
t.download_output_data = ['STDOUT', 'STDERR']
t.cpu_reqs = {'cpu_processes': 1,
              'cpu_process_type': None,
              'cpu_threads': 1,
              'cpu_thread_type': None}

Note

bash -l makes the shell act as if it had been directly invoked by logging in.

Note

Verify that the command ssh localhost hostname works on the login node without a password prompt. If you are asked for a password, please create an ssh keypair with the command ssh-keygen. That should create two keys in ~/.ssh, named id_rsa and id_rsa.pub. Now, execute the following commands:

cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 0600 $HOME/.ssh/authorized_keys