4.1. Getting Started¶
In this section we will run through the Ensemble Toolkit API. We will develop an example application consisting of a simple bag of Tasks.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in
your virtualenv under share/radical.entk/user_guide/scripts
.
4.1.1. Importing components from the Ensemble Toolkit Module¶
To create any application using Ensemble Toolkit, you need to import five modules: Pipeline, Stage, Task, AppManager, ResourceManager. We have already discussed these components in the earlier sections.
1 | from radical.entk import Pipeline, Stage, Task, AppManager
|
4.1.2. Creating the workflow¶
We first create a Pipeline, Stage and Task object. Then we assign the ‘executable’ and ‘arguments’ for the Task. For this example, we will create one Pipeline consisting of one Stage that contains one Task.
In the below snippet, we first create a Pipeline then a Stage.
22 23 24 25 26 27 28 | if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s = Stage()
|
Next, we create a Task and assign its name, executable and arguments of the executable.
30 31 32 33 34 | # Create a Task object
t = Task()
t.name = 'my-first-task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['Hello World'] # Assign arguments for the task executable
|
Now, that we have a fully described Task, a Stage and a Pipeline. We create our workflow by adding the Task to the Stage and adding the Stage to the Pipeline.
37 38 39 40 41 | s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
|
4.1.3. Creating the AppManager¶
Now that our workflow has been created, we need to specify where it is to be executed. For this example, we will
simply execute the workflow locally. We create an AppManager object, describe a resource request for 1 core for 10
minutes on localhost, i.e. your local machine. We assign the resource request description and the workflow to the
AppManager and run
our application.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | # Create Application Manager
appman = AppManager(hostname=hostname, port=port, username=username,
password=password)
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p])
# Run the Application Manager
appman.run()
|
Warning
If the python version your system has by default is Anaconda python, please change line 51 in the above code block to
'resource': 'local.localhost_anaconda',
To run the script, simply execute the following from the command line:
python get_started.py
Warning
The first run may fail for different reasons, most of which
related to setting up the execution environment or requesting the correct
resources. Upon failure, Python may incorrectly raise the exception
KeyboardInterrupt
. This may be confusion because it is reported even when
no keyboard interrupt has been issued. Currently, we did not find a way to
avoid to raise that exception.
And that’s it! That’s all the steps in this example. You can generate more verbose output by setting the environment variable `export RADICAL_LOG_TGT=radical.log;export RADICAL_LOG_LVL=DEBUG`.
After the execution of the example, you may want to check the output. Under your home folder, you will find a folder named radical.pilot.sandbox. In that folder, there will be a re.session.* folder and a ve.local.localhost folder. Inside, re.session.*, there is a pilot.0000 folder and in there a unit.000000 folder. In the unit folder, you will see several files including a unit.000000.out and unit.000000.err files. The unit.000000.out holds the messages from the standard output and unit.000000.err holds the messages from standard error. The unit.000000.out file should have a Hello World message.
Let’s look at the complete code for this example:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') is None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
# Description of how the RabbitMQ process is accessible
# No need to change/set any variables if you installed RabbitMQ has a system
# process. If you are running RabbitMQ under a docker container or another
# VM, set "RMQ_HOSTNAME" and "RMQ_PORT" in the session where you are running
# this script.
hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = int(os.environ.get('RMQ_PORT', 5672))
username = os.environ.get('RMQ_USERNAME')
password = os.environ.get('RMQ_PASSWORD')
if __name__ == '__main__':
# Create a Pipeline object
p = Pipeline()
# Create a Stage object
s = Stage()
# Create a Task object
t = Task()
t.name = 'my-first-task' # Assign a name to the task (optional, do not use ',' or '_')
t.executable = '/bin/echo' # Assign executable to the task
t.arguments = ['Hello World'] # Assign arguments for the task executable
# Add Task to the Stage
s.add_tasks(t)
# Add Stage to the Pipeline
p.add_stages(s)
# Create Application Manager
appman = AppManager(hostname=hostname, port=port, username=username,
password=password)
# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus': 1
}
# Assign resource request description to the Application Manager
appman.resource_desc = res_dict
# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p])
# Run the Application Manager
appman.run()