Sequence of Workflows
Another common execution pattern consists of same-session sequential workflows with mulitple concurrent Pipelines with multiple Stages where each Stage consists of several Tasks. We call this a Sequence of Workflows.
Note
The reader is assumed to be familiar with the PST Model and to have read through the Introduction of Ensemble Toolkit.
Note
This chapter assumes that you have successfully installed Ensemble Toolkit, if not see Installation.
In the following example, we create 2 sequential workflows, each with 2 Pipelines and 3 Stages per Pipeline. For demonstration purposes, each Task does nothing but “sleep” for 3 seconds. The example suggests starting AppManager with autoterminate=False and using appman.terminate() once all pipelines are finished. This allows you to use the same application manager for the second workflow.
You can download the complete code discussed in this section here
or find it in
your virtualenv under share/radical.entk/simple/scripts
.
python sow.py
Let’s take a look at the complete code in the example. You can generate a more verbose output by setting the environment
variable RADICAL_ENTK_VERBOSE=DEBUG
.
A look at the complete code in this section:
#!/usr/bin/env python
from radical.entk import Pipeline, Stage, Task, AppManager
def generate_pipeline():
p = Pipeline()
s1 = Stage()
t1 = Task()
t1.executable = '/bin/sleep'
t1.arguments = ['3']
s1.add_tasks(t1)
p.add_stages(s1)
s2 = Stage()
t2 = Task()
t2.executable = '/bin/sleep'
t2.arguments = ['3']
s2.add_tasks(t2)
p.add_stages(s2)
s3 = Stage()
t3 = Task()
t3.executable = '/bin/sleep'
t3.arguments = ['3']
s3.add_tasks(t3)
p.add_stages(s3)
return p
if __name__ == '__main__':
appman = AppManager()
res_dict = {
'resource': 'local.localhost',
'walltime': 10,
'cpus' : 8
}
appman.resource_desc = res_dict
pipelines = list()
for cnt in range(2):
pipelines.append(generate_pipeline())
appman.workflow = set(pipelines)
appman.run()
print('1 ===================================================')
pipelines = list()
for cnt in range(2):
pipelines.append(generate_pipeline())
appman.workflow = set(pipelines)
appman.run()
print('2 ===================================================')
appman.terminate()
print('t ===================================================')