Task API

class radical.entk.Task(from_dict=None)[source]

A Task is an abstraction of a computational unit. In this case, a Task consists of its executable along with its required software environment, files to be staged as input and output.

At the user level a Task is to be populated by assigning attributes. Internally, an empty Task is created and population using the from_dict function. This is to avoid creating Tasks with new uid as tasks with new uid offset the uid count file in radical.utils and can potentially affect the profiling if not taken care.

uid

[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RE if the field is not set, and cannot be re-assigned (immutable attribute).

name

[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads. Such characters as ‘,’ or ‘_’ are not allowed in a name.

state

[type: str | default: “DESCRIBED”] Current state of the task, its initial value re.states.INITIAL is set during the task initialization, and this value is also appended into state_history attribute.

state_history

[type: list | default: [“DESCRIBED”]] List of the states obtained in temporal order. Every time new state is set, it is appended to state_history automatically.

executable

[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.

arguments

[type: list | default: []] List of arguments to be supplied to the executable.

environment

[type: dict | default: {}] Environment variables to set in the

environment before the execution process.

sandbox

[type: str | default: “”] This specifies the working directory of the task. By default, the task’s uid is used as a sandbox for the task.

pre_launch

[type: list | default: []] List of actions (shell commands) to perform before the task is launched.

Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of resources! Deviating from that rule will likely result in reduced overall throughput.

post_launch

[type: list | default: []] List of actions (shell commands) to perform after the task finishes.

The same remarks as on pre_launch apply

pre_exec

[type: list | default: []] List of actions (shell commands) to perform after task is launched, but before rank(s) starts execution.

The same remarks as on pre_launch apply

post_exec

[type: list | default: []] List of actions (shell commands) to perform right after rank(s) finishes.

The same remarks as on pre_launch apply

cpu_reqs

[type: CpuReqs | default: CpuReqs()] The CPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.

The expected format is dict-like:

task.cpu_reqs = {‘cpu_processes’X,

‘cpu_process_type’ : None/’MPI’, ‘cpu_threads’ : Y, ‘cpu_thread_type’ : None/’OpenMP’}

This description means that the Task is going to spawn X processes and Y threads per each of these processes to run on CPUs. Hence, the total number of cpus required by the Task is X * Y for all the processes and threads to execute concurrently.

By default, 1 CPU process and 1 CPU thread per process are requested.

gpu_reqs

[type: GpuReqs | default: GpuReqs()] The GPU requirements for the current Task. The requirements are described in terms of the number of processes and threads to be run in this Task.

The expected format is dict-like:

task.gpu_reqs = {‘gpu_processes’X,

‘gpu_process_type’ : None/’CUDA’/’ROCm’, ‘gpu_threads’ : Y, ‘gpu_thread_type’ : None}

This description means that each rank of the task is going to use X GPUs with Y GPU-threads. By default, 0 GPUs are requested.

lfs_per_process

[type: int | default: 0] Local File Storage per process - amount of space (MB) required on the local file system of the node by the task.

mem_per_process

[type: int | default: 0] Amount of memory required by the task.

upload_input_data

[type: list | default: []] List of file names required to be transferred from a local client storage to the location of the task or data staging area before task starts.

copy_input_data

[type: list | default: []] List of file names required to be copied from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

Example of a file location format:

$Pipeline_%s_Stage_%s_Task_%s, where %s is replaced entity name/uid

# output.txt is copied from a t1 task to a current task sandbox t2.copy_input_data = [‘$Pipeline_%s_Stage_%s_Task_%s/output.txt’ %

(p.name, s1.name, t1.name)]

[type: list | default: []] List of file names required to be symlinked from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

move_input_data

[type: list | default: []] List of file names required to be moved from another task-/pilot-sandbox to a current task (or data staging area) before task starts.

copy_output_data

[type: list | default: []] List of file names required to be copied from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

Example of defining data to be copied:

# results.txt will be copied to a data staging area $SHARED t.copy_output_data = [‘results.txt > $SHARED/results.txt’]

[type: list | default: []] List of file names required to be symlinked from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

move_output_data

[type: list | default: []] List of file names required to be moved from a current task to another task-/pilot-sandbox (or data staging area) when a task is finished.

download_output_data

[type: list | default: []] List of file names required to be downloaded from a current task to a local client storage space when a task is finished.

Example of defining data to be downloaded:

# results.txt is transferred to a local client storage space t.download_output_data = [‘results.txt’]

stdout

[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.

stderr

[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.

stage_on_error

[type: bool | default: False] Flag to allow staging out data if task got failed (output staging is normally skipped on FAILED or CANCELED tasks).

exit_code

[type: int | default: None] Get the exit code for finished tasks: 0 - for successful tasks; 1 - for failed tasks.

exception

[type: str | default: None] Get the representation of the exception which caused the task to fail.

exception_detail

[type: str | default: None] Get additional details (traceback or error messages) to the exception which caused this task to fail.

path

[type: str | default: “”] Get the path of the task on the remote machine. Useful to reference files generated in the current task.

tags

[type: dict | default: None] The tags for the task that can be used while scheduling by the RTS (configuration specific tags, which influence task scheduling and execution, e.g., tasks co-location).

rts_uid

[type: str | default: None] Unique RTS ID of the current task.

parent_stage

[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the stage, which contains the current task.

parent_pipeline

[type: dict | default: {‘uid’: None, ‘name’: None}] Identification of the pipeline, which contains the current task.

annotations

[type: Annotations | default: None] Annotations to describe task’s input and output files, and sets dependencies between tasks.

Read-only attributes

property luid

[type: str] Unique ID of the current task (fully qualified).

> task.luid pipe.0001.stage.0004.task.0234

_cast = True
_check = True
_defaults = {'arguments': [], 'copy_input_data': [], 'copy_output_data': [], 'cpu_reqs': {'cpu_process_type': None, 'cpu_processes': 1, 'cpu_thread_type': None, 'cpu_threads': 1}, 'download_output_data': [], 'environment': {}, 'exception': None, 'exception_detail': None, 'executable': '', 'exit_code': None, 'gpu_reqs': {'gpu_process_type': None, 'gpu_processes': 0.0, 'gpu_thread_type': None, 'gpu_threads': 1}, 'lfs_per_process': 0, 'link_input_data': [], 'link_output_data': [], 'mem_per_process': 0, 'move_input_data': [], 'move_output_data': [], 'name': '', 'parent_pipeline': {'name': None, 'uid': None}, 'parent_stage': {'name': None, 'uid': None}, 'path': '', 'post_exec': [], 'post_launch': [], 'pre_exec': [], 'pre_launch': [], 'rts_uid': None, 'sandbox': '', 'stage_on_error': False, 'state': '', 'state_history': [], 'stderr': '', 'stdout': '', 'tags': None, 'uid': '', 'upload_input_data': []}
_post_verifier(k, v)[source]
_schema = {'annotations': <class 'radical.entk.task.Annotations'>, 'arguments': [<class 'str'>], 'copy_input_data': [<class 'str'>], 'copy_output_data': [<class 'str'>], 'cpu_reqs': <class 'radical.entk.task.CpuReqs'>, 'download_output_data': [<class 'str'>], 'environment': {<class 'str'>: <class 'str'>}, 'exception': <class 'str'>, 'exception_detail': <class 'str'>, 'executable': <class 'str'>, 'exit_code': <class 'int'>, 'gpu_reqs': <class 'radical.entk.task.GpuReqs'>, 'lfs_per_process': <class 'int'>, 'link_input_data': [<class 'str'>], 'link_output_data': [<class 'str'>], 'mem_per_process': <class 'int'>, 'move_input_data': [<class 'str'>], 'move_output_data': [<class 'str'>], 'name': <class 'str'>, 'parent_pipeline': {<class 'str'>: None}, 'parent_stage': {<class 'str'>: None}, 'path': <class 'str'>, 'post_exec': [<class 'str'>], 'post_launch': [<class 'str'>], 'pre_exec': [<class 'str'>], 'pre_launch': [<class 'str'>], 'rts_uid': <class 'str'>, 'sandbox': <class 'str'>, 'stage_on_error': <class 'bool'>, 'state': <class 'str'>, 'state_history': [<class 'str'>], 'stderr': <class 'str'>, 'stdout': <class 'str'>, 'tags': {<class 'str'>: None}, 'uid': <class 'str'>, 'upload_input_data': [<class 'str'>]}
_self_default = False
_uids = []
_validate()[source]

Purpose: Validate that the state of the task is ‘DESCRIBED’ and that an executable has been specified for the task.

_verify()[source]

Can be overloaded

_verify_setter(k, v)[source]
annotate(inputs: Optional[Union[Dict, List, str]] = None, outputs: Optional[Union[List, str]] = None) None[source]

Adds dataflow annotations with provided input and output files, and defines dependencies between tasks.

inputs

List of input files. If a file is produced by the previously executed task, then the corresponding input element is provided as a dictionary with the task instance as a key. Example: inputs=[‘file1’, {task0: ‘file2’, task1: [‘file2’]}]

Type

list, optional

outputs

List of produced/generated files.

Type

list, optional

from_dict(d)[source]

Re-initialization, resets all attributes with provided input data.

property luid

Unique ID of the current task (fully qualified).

Example

> task.luid pipe.0001.stage.0004.task.123456

Luid

Returns the fully qualified uid of the current task

Type

str