4.6. Adding Shared Data

Data movement is one of the important elements of any application. In many applications, there exist an initial set of files, common to multiple tasks, that need to be transfered, once, to the remote machine. In this section, we will take a look at how we can move data shared between multiple tasks to the remote machine by a one-time transfer.

Note

The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.

Note

This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.

You can download the complete code discussed in this section here or find it in your virtualenv under share/radical.entk/user_guide/scripts.

In the following example, we will create a Pipeline with one Stage and 10 tasks. The tasks concatenate two input files and write the standard output to a file. Since the two input files are common between all the tasks, it will be efficient to transfer those files only once to the remote machine and have the tasks copy the input files when being executed.

Users can specify such shared data using the shared_data attribute of the AppManager object.

        'cpus': 1
}

# Create Application Manager
appman = AppManager(hostname=hostname, port=port, username=username,
        password=password)

The shared data can then be referenced for copying or linking using the $SHARED keyword for specifying the file movement description of the task.


# Create a Stage object
s1 = Stage()

# Create a Task object which creates a file named 'output.txt' of size 1 MB
for x in range(10):

In the example provided, the two files contain the words ‘Hello’ and ‘World’, respectively, and the output files are expected to contain ‘Hello World’

python add_shared_data.py

Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment variable RADICAL_ENTK_VERBOSE=DEBUG.

A look at the complete code in this section:

#!/usr/bin/env python

from radical.entk import Pipeline, Stage, Task, AppManager
import os
from glob import glob
import shutil

# ------------------------------------------------------------------------------
# Set default verbosity

if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
    os.environ['RADICAL_ENTK_REPORT'] = 'True'

hostname = os.environ.get('RMQ_HOSTNAME','localhost')
port = int(os.environ.get('RMQ_PORT',5672))
username = os.environ.get('RMQ_USERNAME')
password = os.environ.get('RMQ_PASSWORD')

cur_dir = os.path.dirname(os.path.abspath(__file__))

def generate_pipeline():

    # Create a Pipeline object
    p = Pipeline()

    # Create a Stage object
    s1 = Stage()

    # Create a Task object which creates a file named 'output.txt' of size 1 MB
    for x in range(10):
        t1 = Task()
        t1.executable = 'cat'
        t1.arguments = ['file1.txt','file2.txt','>','output.txt']
        t1.copy_input_data = ['$SHARED/file1.txt', '$SHARED/file2.txt']
        t1.download_output_data = ['output.txt > %s/output_%s.txt' %(cur_dir,x+1)]

        # Add the Task to the Stage
        s1.add_tasks(t1)

    # Add Stage to the Pipeline
    p.add_stages(s1)

    return p

if __name__ == '__main__':

    for f in glob('%s/file*.txt' %cur_dir):
        os.remove(f)

    os.system('echo "Hello" > %s/file1.txt' %cur_dir)
    os.system('echo "World" > %s/file2.txt' %cur_dir)


    # Create a dictionary describe four mandatory keys:
    # resource, walltime, cpus and project
    # resource is 'local.localhost' to execute locally
    res_dict = {

            'resource': 'local.localhost',
            'walltime': 1,
            'cpus': 1
    }

    # Create Application Manager
    appman = AppManager(hostname=hostname, port=port, username=username,
            password=password)

    # Assign resource manager to the Application Manager
    appman.resource_desc = res_dict
    appman.shared_data = ['%s/file1.txt' %cur_dir, '%s/file2.txt' %cur_dir]

    p = generate_pipeline()

    # Assign the workflow as a set of Pipelines to the Application Manager
    appman.workflow = [p]

    # Run the Application Manager
    appman.run()

    for x in range(10):
        with open('%s/output_%s.txt' %(cur_dir,x+1), 'r') as fp:
            print('Output %s: '%(x+1), fp.readlines())
        os.remove('%s/output_%s.txt' %(cur_dir,x+1))


    os.remove('%s/file1.txt' %cur_dir)
    os.remove('%s/file2.txt' %cur_dir)