Stage API
- class radical.entk.Stage[source]
A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of ‘Task’ objects. All tasks of the same stage may execute concurrently.
- _check_stage_complete()[source]
- Purpose: Check if all tasks of the current stage have completed, i.e.,
are in either DONE or FAILED state.
- _set_tasks_state(value)[source]
Purpose: Set state of all tasks of the current stage.
- Arguments
String
- _validate()[source]
- Purpose: Validate that the state of the current Stage is ‘DESCRIBED’
(user has not meddled with it). Also validate that the current Stage contains Tasks
- classmethod _validate_entities(tasks)[source]
- Purpose: Validate whether the ‘tasks’ is of type set. Validate the
description of each Task.
- add_tasks(value)[source]
Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won’t be added but updated with changes.
- Argument
iterable task object
- from_dict(d)[source]
Create a Stage from a dictionary. The change is in inplace.
- Argument
python dictionary
- Returns
None
- property luid
Unique ID of the current stage (fully qualified).
Example
>>> stage.luid pipe.0001.stage.0004
- Getter
Returns the fully qualified uid of the current stage
- Type
String
- property name
Name of the stage. Do not use a ‘,’ or ‘_’ in an object’s name.
- Getter
Returns the name of the current stage
- Setter
Assigns the name of the current stage
- Type
String
- property parent_pipeline
Returns the pipeline this stage belongs to :setter: Assigns the pipeline uid this stage belongs to
- Type
getter
- property post_exec
The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished.
Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon.
Example:
s1.post_exec = func_post
def func_post():
- if condition is met:
s = Stage() t = Task() t.executable = ‘/bin/sleep’ t.arguments = [‘30’] s.add_tasks(t) p.add_stages(s)
- else:
# do nothing pass
- property state
Current state of the stage
- Getter
Returns the state of the current stage
- Type
String
- property state_history
Returns a list of the states obtained in temporal order
- Returns
list
- property tasks
Tasks of the stage
- Getter
Returns all the tasks of the current stage
- Setter
Assigns tasks to the current stage
- Type
set of Tasks
- property uid
Unique ID of the current stage
- Getter
Returns the unique id of the current stage
- Type
String