Adaptive applications: Task-attribute
We encourage you to take a look at the Pipeline of Ensembles example on the next page and compare it with the above pattern.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
You can download the complete code discussed in this section here
or find it in your virtualenv under
share/radical.entk/advanced/scripts
.
For any adaptive capability within a Pipeline, we need to use the post execution property of a Stage object. Decisions can only be performed once all tasks of a Stage reached a final state (running tasks cannot be interrupted by design). The post execution property of a Stage requires a callable function that can influence the next stages of a workflow.
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
s1.post_exec = func_post
def func_post():
global CUR_NEW_STAGE, MAX_NEW_STAGE
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
CUR_NEW_STAGE += 1
s = Stage()
for i in range(10):
t = Task()
t.executable = '/bin/sleep'
t.arguments = ['30']
s.add_tasks(t)
s.post_exec = func_condition
p.add_stages(s)
In the following example, we create 1 Pipeline with five stages. There are 10 tasks in each Stage, each running ‘sleep 30’. After a Stage is DONE (i.e. all tasks in the Stage have completed execution), a condition is evaluated that checks whether the current stage is less than the max number of stages in the Pipeline. If yes, then the arguments of the tasks of the next Stage are modified. If not, no operation is performed, and the pipeline will terminate.
python adapt_ta.py
Let’s take a look at the complete code in the example. Note: You can generate
a more verbose output by setting the environment variable
RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
import os, sys
from random import shuffle
# ------------------------------------------------------------------------------
# Set default verbosity
if os.environ.get('RADICAL_ENTK_VERBOSE') == None:
os.environ['RADICAL_ENTK_REPORT'] = 'True'
CUR_NEW_STAGE=0
MAX_NEW_STAGE=4
def generate_pipeline():
def func_post():
global CUR_NEW_STAGE, MAX_NEW_STAGE
if CUR_NEW_STAGE <= MAX_NEW_STAGE:
CUR_NEW_STAGE += 1
for t in p.stages[CUR_NEW_STAGE].tasks:
dur = randint(10,30)
t.arguments = [str(dur)]
else:
print('Done')
# Create a Pipeline object
p = Pipeline()
for s in range(MAX_NEW_STAGE+1):
# Create a Stage object
s1 = Stage()
for _ in range(CUR_TASKS):
t1 = Task()
t1.executable = '/bin/sleep'
t1.arguments = [ '30']
# Add the Task to the Stage
s1.add_tasks(t1)
# Add post-exec to the Stage
s1.post_exec = func_post
# Add Stage to the Pipeline
p.add_stages(s1)
return p
if __name__ == '__main__':
# Create a dictionary describe four mandatory keys:
# resource, walltime, cores and project
# resource is 'local.localhost' to execute locally
res_dict = {
'resource': 'local.localhost',
'walltime': 15,
'cpus': 2,
}
# Create Application Manager
appman = AppManager()
appman.resource_desc = res_dict
p = generate_pipeline()
# Assign the workflow as a set of Pipelines to the Application Manager
appman.workflow = [p]
# Run the Application Manager
appman.run()