9.2.6. Stage API

class radical.entk.Stage[source]

A stage represents a collection of objects that have no relative order of execution. In this case, a stage consists of a set of ‘Task’ objects. All tasks of the same stage may execute concurrently.

_check_stage_complete()[source]

Purpose: Check if all tasks of the current stage have completed, i.e., are in either DONE or FAILED state.

_set_tasks_state(value)[source]

Purpose: Set state of all tasks of the current stage.

Arguments:String
_validate()[source]

Purpose: Validate that the state of the current Stage is ‘DESCRIBED’ (user has not meddled with it). Also validate that the current Stage contains Tasks

classmethod _validate_entities(tasks)[source]

Purpose: Validate whether the ‘tasks’ is of type set. Validate the description of each Task.

add_tasks(value)[source]

Adds task(s) to a Stage by using a set union operation. Every Task element is unique, no duplicate is allowed. Existing Task won’t be added but updated with changes.

Argument:iterable task object
from_dict(d)[source]

Create a Stage from a dictionary. The change is in inplace.

Argument:python dictionary
Returns:None
luid

Unique ID of the current stage (fully qualified).

example:
>>> stage.luid
pipe.0001.stage.0004
Getter:Returns the fully qualified uid of the current stage
Type:String
name

Name of the stage. Do not use a ‘,’ or ‘_’ in an object’s name.

Getter:Returns the name of the current stage
Setter:Assigns the name of the current stage
Type:String
parent_pipeline
Getter:Returns the pipeline this stage belongs to
Setter:Assigns the pipeline uid this stage belongs to
post_exec

The post_exec property enables adaptivity in EnTK. post_exec receives a Python callable object i.e. function, which will be evaluated when a stage is finished.

Note: if a post_exec callback resumes any suspended pipelines, it MUST return a list with the IDs of those pipelines - otherwise the resume will not be acted upon.

Example:

s1.post_exec = func_post

def func_post():

if condition is met:
s = Stage() t = Task() t.executable = ‘/bin/sleep’ t.arguments = [‘30’] s.add_tasks(t) p.add_stages(s)
else:
# do nothing pass
state

Current state of the stage

Getter:Returns the state of the current stage
Type:String
state_history

Returns a list of the states obtained in temporal order

Returns:list
tasks

Tasks of the stage

Getter:Returns all the tasks of the current stage
Setter:Assigns tasks to the current stage
Type:set of Tasks
to_dict()[source]

Convert current Stage into a dictionary

Returns:python dictionary
uid

Unique ID of the current stage

Getter:Returns the unique id of the current stage
Type:String