Skip to main content

Dask Executor

Executes tasks (electrons) in a Dask cluster. This is the default executor when covalent is started without the --no-cluster flag.

from dask.distributed import LocalCluster

cluster = LocalCluster()
print(cluster.scheduler_address)

The address will look like tcp://127.0.0.1:55564 when running locally. Note that the Dask cluster does not persist when the process terminates.

This cluster can be used with Covalent by providing the scheduler address:

import covalent as ct

dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://127.0.0.1:55564"
)

@ct.electron(executor=dask_executor)
def my_custom_task(x, y):
return x + y

...

class covalent.executor.executor_plugins.dask.DaskExecutor

#

Dask executor class that submits the input function to a running dask cluster.

Methods:

cancel(task_metadata, job_handle)

Cancel the task being executed by the dask executor currently

from_dict (object_dict)

Rehydrate a dictionary representation

Get if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Submit the function and inputs to the dask cluster

Save the job handle to database

setup(task_metadata)

Executor specific setup method

teardown(task_metadata)

Executor specific teardown method

Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

async cancel

#

Cancel the task being executed by the dask executor currently

Arg(s)

task_metadata: Metadata associated with the task job_handle: Key assigned to the job by Dask

Result

True by default

Return Type

Literal[True]

from_dict

#

Rehydrate a dictionary representation

Parameters

object_dict(dict)- a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

async get_cancel_requested

#

Get if the task was requested to be canceled

Arg(s)

None

Renturn(s)

Whether the task has been requested to be canceled

Return Type

Any

get_dispatch_context

#

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info(DispatchInfo) – The dispatch info to be used inside current context.

Return Type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

async run

#

Submit the function and inputs to the dask cluster

async set_job_handle

#

Save the job handle to database

Arg(s)

handle: JSONable type identifying the job being executed by the backend

Returns

Response from the listener that handles inserting the job handle to database

Return Type

Any

async setup

#

Executor specific setup method

async teardown

#

Executor specific teardown method

to_dict

#

Return a JSON-serializable dictionary representation of self

Return Type

dict

async write_streams_to_file

#

Write the contents of stdout and stderr to respective files.

Parameters

stream_strings (Iterable[str]) – The stream_strings to be written to files.

filepaths (Iterable[str]) – The filepaths to be used for writing the streams.

dispatch_id (str) – The ID of the dispatch which initiated the request.

results_dir (str) – The location of the results directory.

results_dir (str) – The location of the results directory.

This uses aiofiles to avoid blocking the event loop.

Return Type

None