Adding Data
Data management is one of the important elements of any application. In this section, we will take a look at how we can manage data between tasks.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or
find it in your virtualenv under share/radical.entk/user_guide/scripts
.
In the following example, we will create a Pipeline of two Stages, each with one Task. In the first stage, we will create a file of size 1MB. In the next stage, we will perform a character count on the same file and write to another file. Finally, we will bring that output to the current location.
Since we already know how to create this workflow, we simply present the code snippet concerned with the data movement. The task in the second stage needs to perform two data movements: a) copy the data created in the first stage to its current directory and b) bring the output back to the directory where the script is executed. Below we present the statements that perform these operations.
s2.add_tasks(t2)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
Intermediate data are accessible through a unique identification formed by $Pipeline_%s_Stage_%s_Task_%s/{filename} where %s is replaced by entity name of pipeline, stage and task. If name is not given, .uid is available to locate files across tasks. Task API has more information about the use of API.
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_LOG_LVL=DEBUG
.
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s1 = Stage()
# Create a Task object which creates a file named 'output.txt' of size 1 MB
t1 = Task()
t1.executable = '/bin/bash'
t1.arguments = ['-l', '-c', 'base64 /dev/urandom | head -c 1000000 > output.txt']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add Stage to the Pipeline
p.add_stages(s1)
# Create another Stage object
s2 = Stage()
s2.name = 'Stage.2'
# Create a Task object
t2 = Task()
t2.executable = '/bin/bash'
t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt']
# Copy data from the task in the first stage to the current task's location
t2.copy_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)]
# Download the output of the current task to the current location
t2.download_output_data = ['ccount.txt']
# Add the Task to the Stage
s2.add_tasks(t2)
# Add Stage to the Pipeline
p.add_stages(s2)
# Create Application Manager
appman = AppManager()
# Assign the workflow as a set or list of Pipelines to the Application Manager
appman.workflow = set([p])
# Create a dictionary describe four mandatory keys:
# resource, walltime, cpus and project
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Run the Application Manager
appman.run()
Handling data from login node
The following example shows how to configure a task to fetch data at runtime, when compute nodes do not have internet access.
t = Task()
t.name = 'download-task'
t.executable = '/usr/bin/ssh'
t.arguments = ["<username>@<hostname>",
"'bash -l /path/to/download_script.sh <input1> <input2>'"]
t.download_output_data = ['STDOUT', 'STDERR']
t.cpu_reqs = {'cpu_processes': 1,
'cpu_process_type': None,
'cpu_threads': 1,
'cpu_thread_type': None}
Note
bash -l
makes the shell act as if it had been directly invoked
by logging in.
Note
Verify that the command ssh localhost hostname
works on the
login node without a password prompt. If you are asked for a
password, please create an ssh keypair with the command
ssh-keygen
. That should create two keys in ~/.ssh
, named
id_rsa
and id_rsa.pub
. Now, execute the following commands:
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 0600 $HOME/.ssh/authorized_keys