Skip to main content

Dask Executor

Executing tasks (electrons) in a Dask cluster. This is the default executor when covalent is started without the --no-cluster flag.

from dask.distributed import LocalCluster

cluster = LocalCluster()
print(cluster.scheduler_address)

The address will look like tcp://127.0.0.1:55564 when running locally. Note that the Dask cluster does not persist when the process terminates.

This cluster can be used with Covalent by providing the scheduler address:

import covalent as ct

dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://127.0.0.1:55564"
)

@ct.electron(executor=dask_executor)
def my_custom_task(x, y):
return x + y

...

class covalent.executor.executor_plugins.dask.DaskExecutor

#

Dask executor class that submits the input function to a running dask cluster.

Methods:

cancel(task_metadata, job_handle)

Cancel the task being executed by the dask executor currently

from_dict (object_dict)

Rehydrate a dictionary representation

Get if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Submit the function and inputs to the dask cluster

Save the job handle to database

setup(task_metadata)

Executor specific setup method

teardown(task_metadata)

Executor specific teardown method

Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

async cancel

#

Cancel the task being executed by the Dask executor currently.

Arg(s)

task_metadata

Metadata associated with the task job_handle

job_handle

Key assigned to the job by Dask.

Return(s)

True by default

Return Type

Literal[True]

from_dict

#

Rehydrate a dictionary representation.

Parameters

object_dict (dict) – A dictionary representation returned by to_dict.

Return Type

BaseExecutor

Returns

self

Instance attributes will be overwritten.

async get_cancel_requested

#

Get if the task was requested to be canceled.

Arg(s)

None

Return(s)

Whether the task has been requested to be canceled.

Return Type

Any

get_dispatch_context

#

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside the current context.

Return Type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

async run

#

Submit the function and inputs to the Dask cluster.

async set_job_handle

#

Save the job handle to the database.

Arg(s)

handle

JSONable type identifying the job being executed by the backend.

Return(s)

Response from the listener that handles inserting the job handle to the database.

Return Type

Any

async setup

#

Executor-specific setup method.

async teardown

#

Executor-specific teardown method.

to_dict

#

Return a JSON-serializable dictionary representation of self.

Return Type

dict

async write_streams_to_file

#

Write the contents of stdout and stderr to respective files.

This uses aiofiles to avoid blocking the event loop.

Parameters

stream_strings (Iterable[str]) – The stream_strings to be written to files.

filepaths (Iterable[str]) – The filepaths to be used for writing the streams.

dispatch_id (str) – The ID of the dispatch which initiated the request.

results_dir (str) – The location of the results directory.

Return Type

None

This uses aiofiles to avoid blocking the event loop.