Skip to main content

Covalent API

This is the component reference for the Covalent Python API.

Index

Here is an alphabetical index.

Contents

Workflow Contents

Task Helper

Executors

Quantum Executor Plugins

Dispatch Infrastructure

Covalent CLI Tool

API

Workflow Components

Electron

@covalent.electron(_func=None, *, backend=None, executor=None, files=[], deps_bash=None, deps_pip=None, call_before=[], call_after=[])

Electron decorator to be applied to a function. Returns the wrapper function with the same functionality as _func.

Parameters

_func (Optional[Callable]) – function to be decorated

Keyword Arguments

  • backend – DEPRECATED: Same as executor.
  • executor – Alternative executor object to be used in the execution of each node. If not passed, the local executor is used by default..
  • deps_bash – An optional DepsBash object specifying a list of shell commands to run before _func.
  • deps_pip – An optional DepsPip object specifying a list of PyPI packages to install before running _func.
  • call_before – An optional list of DepsCall objects specifying python functions to invoke before the electron
  • call_after – An optional list of DepsCall objects specifying python functions to invoke after the electron
  • files – An optional list of FileTransfer objects which copy files to/from remote or local filesystems.

Returns

Electron object containing the decorated function.

Return Type

class covalent._workflow.electron.Electron(function, node_id=None, metadata=None)         [source]

An electron (or task) object that is a modular component of a workflow and is returned by electron

function

Function to be executed.

node_id

Node id of the electron.

metadata

Metadata to be used for the function execution.

kwargs

Keyword arguments if any.

Methods:

Adds the node to the lattice’s transport graph in the case where a collection of electrons is passed as an argument to another electron.
Adds a node along with connecting edges for all the arguments to the electron.
Get value of the metadata of given name.

get_op_function(operand_1, operand_2, op)

Function to handle binary operations with electrons as operands.

set_metadata(name, value)

Function to add/edit metadata of given name and value to electron’s metadata.

wait_for(electrons)

Waits for the given electrons to complete before executing this one.

Attributes:

Get transportable electron object and metadata.

add_collection_node_to_graph(graph, prefix)         [source]

Adds the node to the lattice’s transport graph in the case where a collection of electrons is passed as an argument to another electron.

Parameters

  • graph (_TransportGraph) – Transport graph of the lattice
  • prefix (str) – Prefix of the node

Returns

Node id of the added node

Return Type

node_id

property as_transportable_dict: Dict

Get transportable electron object and metadata.

Return Type

Dict

connect_node_with_others(node_id, param_name, param_value, param_type, arg_index, transport_graph)         [source]

Adds a node along with connecting edges for all the arguments to the electron.

Parameters

  • node_id (int) – Node number of the electron
  • param_name (str) – Name of the parameter
  • param_value (Union[Any, ForwardRef]) – Value of the parameter
  • param_type (str) – Type of parameter, positional or keyword
  • transport_graph (_TransportGraph) – Transport graph of the lattice

Returns

None

get_metadata(name)         [source]

Get value of the metadata of given name.

Parameters

name (str) – Name of the metadata whose value is needed.

Returns

Value of the metadata of given name.

Return Type

value

Raises

KeyError – If metadata of given name is not present.

get_op_function(operand_1, operand_2, op)         [source]

Function to handle binary operations with electrons as operands. This will not execute the operation but rather create another electron whose execution will be postponed according to the default electron configuration/metadata.

This also makes sure that if these operations are being performed outside of a lattice, then they are performed as is.

Parameters

operand_1 (Union[Any, Electron]) – First operand of the binary operation.

operand_2 (Union[Any, Electron]) – Second operand of the binary operation.

op (str) – Operator to be used in the binary operation.

Returns

Electron object corresponding to the operation execution. Behaves as a normal function call if outside a lattice.

Return Type

electron

set_metadata(name, value)         [source]

Function to add/edit metadata of given name and value to an electron’s metadata.

Parameters

  • name (str) – Name of the metadata to be added/edited.
  • value (Any) – Name of the metadata to be added/edited.

Return Type

None

Returns

None

wait_for(electrons)         [source]

Waits for the given electrons to complete before executing the electron object (implicit parameter) on which the method is being called. Adds the necessary edges between this and those electrons without explicitly connecting their inputs/outputs.

Useful when execution of this electron relies on a side-effect from the another one.

Parameters

electrons (UnionElectron]), IterableElectron])]) – Electron(s) which will be waited for to complete execution before starting execution for this one

Returns

Electron

Examples

Lattice

@covalent.lattice(_func=None, *, backend=None, executor=None, workflow_executor=None, deps_bash=None, deps_pip=None, call_before=[], call_after=[], triggers=None)

Lattice decorator to be called upon a function. Returns a new Lattice <covalent._workflow.lattice.Lattice> object.

Parameters

_func (Optional[Callable]) – function to be decorated

Keyword Arguments

  • backend – DEPRECATED: Same as executor.
  • executor – Alternative executor object to be used in the execution of each node. If not passed, the local executor is used by default..
  • workflow_executor – Executor for postprocessing the workflow. Defaults to the built-in dask executor or the local executor depending on whether Covalent is started with the –no-cluster.
  • deps_bash – An optional DepsBash object specifying a list of shell commands to run before _func.
  • deps_pip – An optional DepsPip object specifying a list of PyPI packages to install before running _func.
  • call_before – An optional list of DepsCall objects specifying python functions to invoke before the electron
  • call_after – An optional list of DepsCall objects specifying python functions to invoke after the electron
  • triggers – Any triggers that need to be attached to this lattice, default is None

Returns

Lattice object containing the decorated function.

Return Type

class covalent._workflow.lattice.Lattice(workflow_function, transport_graph=None)        [source]

A lattice workflow object that holds the workflow graph and is returned by lattice decorator.

workflow_function

The workflow function that is decorated by the lattice decorator.

transport_graph

The transport graph which will be the basis on how the workflow is executed.

metadata

Dictionary of metadata of the lattice.

post_processing

post_processing

kwargs

Keyword arguments passed to the workflow function.

electron_outputs

Dictionary of electron outputs received after workflow execution.

Methods:

build_graph(*args, **kwargs)

Builds the transport graph for the lattice by executing the workflow function which triggers the call of all underlying electrons; these get added to the transport graph for later execution.

dispatch(*args, **kwargs)

DEPRECATED: Function to dispatch workflows.

dispatch_sync(*args, **kwargs)

DEPRECATED: Function to dispatch workflows synchronously by waiting for the result too.

draw(*args, **kwargs)

Generate lattice graph and display in UI taking into account passed in arguments.

get_metadata(name) 

Get value of the metadata of given name.

set_metadata(name, value)

Function to add/edit metadata of given name and value to lattice’s metadata.

build_graph(*args, **kwargs)         [source]

Builds the transport graph for the lattice by executing the workflow function which will trigger the call of all underlying electrons and they will get added to the transport graph for later execution.

Also redirects any print statements inside the lattice function to null and ignores any exceptions caused while executing the function.

GRAPH WILL NOT BE BUILT AFTER AN EXCEPTION HAS OCCURRED.

Parameters

  • *args – Positional arguments to be passed to the workflow function.
  • **kwargs – Keyword arguments to be passed to the workflow function.

Return Type

None

Returns

None

dispatch(*args, **kwargs)         [source]

DEPRECATED: Function to dispatch workflows.

Parameters

  • *args – Positional arguments for the workflow
  • **kwargs – Keyword arguments for the workflow

Return Type

str

Returns

Dispatch id assigned to job

dispatch_sync(*args, **kwargs)         [source]

DEPRECATED: Function to dispatch workflows synchronously by waiting for the result too.

Parameters

  • *args – Positional arguments for the workflow
  • **kwargs – Keyword arguments for the workflow

Return Type

Returns

Result of workflow execution

draw(*args, **kwargs)         [source]

Generate lattice graph and display in UI taking into account passed in arguments.

Parameters

  • *args – Positional arguments to be passed to build the graph.
  • **kwargs – Keyword arguments to be passed to build the graph.

Return Type

None

Returns

None

get_metadata(name)        [source]

Get value of the metadata of given name.

Parameters

name (str) – Name of the metadata whose value is needed.

Returns

Value of the metadata of given name.

Return Type

value

Raises

KeyError – If metadata of given name is not present.

set_metadata(name, value)         [source]

Function to add/edit metadata of given name and value to lattice’s metadata.

Parameters

  • name (str) – Name of the metadata to be added/edited.
  • value (Any) – Value of the metadata to be added/edited.

Return Type

None

Returns

None

Examples

QElectron

@covalent.qelectron(qnode=None, *, executors=None, name=None, description=None, selector='cyclic')

QElectron decorator to be called upon a Pennylane QNode. Adds multi-backend execution functionality to the original QNode.

Parameters:

qnode(Optional[QNode]) - The Pennylane QNode to wrap.

Keyword Arguments:

  • executors- The quantum executor(s) to use for running the QNode. A single executor, list of executors, or a QCluster instance are accepted. If a list of multiple executors is passed, a quantum cluster is initialized from this list automatically and selector is used as the cluster's selector. Defaults to a thread-based Simulator.
  • name- An optional name for the QElectron. Defaults to the circuit function's name.
  • description- An optional description for the QElectron. Defaults to the circuit function's docstring.
  • selector- A callable that selects an executor, or one of the strings "cyclic" or "random". The "cyclic" selector (default) cycle through executors and returns the next executor for each circuit. The "random" selector chooses an executor from executors at random for each circuit. Any user-defined selector must be callable with two positional arguments, a circuit and a list of executors. A selector must also return exactly one executor.

Raises

ValueError: If any invalid executors are passed.

Returns:

A sub-type of QNode that integrates QElectrons.

Return Type:

QNodeQE

Quantum Clusters

pydantic model covalent.executor.QCluster [source]

A cluster of quantum executors.

Parameters:

  • executors- A sequence of quantum executors.
  • selector- A callable that selects an executor, or one of the strings "cyclic" or "random". The "cyclic" selector (default) cycles through executors and returns the next executor for each circuit. The "random" selector chooses an executor from executors at random for each circuit. Any user-defined selector must be callable with two positional arguments, a circuit and a list of executors. A selector must also return exactly one executor.
Show JSON Schema
{
"title": "QCluster",
"description": "A cluster of quantum executors.\n\nArgs:\n executors: A sequence of quantum executors.\n selector: A callable that selects an executor, or one of the strings \"cyclic\"\n or \"random\". The \"cyclic\" selector (default) cycles through `executors`\n and returns the next executor for each circuit. The \"random\" selector\n chooses an executor from `executors` at random for each circuit. Any\n user-defined selector must be callable with two positional arguments,\n a circuit and a list of executors. A selector must also return exactly\n one executor.",
"type": "object",
"properties": {
"persist_data": {
"title": "Persist Data",
"default": true,
"type": "boolean"
},
"qnode_device_import_path": {
"title": "Qnode Device Import Path",
"type": "array",
"minItems": 2,
"maxItems": 2,
"items": [
{
"type": "string"
},
{
"type": "string"
}
]
},
"qnode_device_shots": {
"title": "Qnode Device Shots",
"type": "integer"
},
"qnode_device_wires": {
"title": "Qnode Device Wires",
"type": "integer"
},
"pennylane_active_return": {
"title": "Pennylane Active Return",
"type": "boolean"
},
"device": {
"title": "Device",
"default": "default.qubit",
"type": "string"
},
"executors": {
"title": "Executors",
"type": "array",
"items": {
"$ref": "#/definitions/BaseQExecutor"
}
}
},
"required": [
"executors"
],
"definitions": {
"BaseQExecutor": {
"title": "BaseQExecutor",
"description": "Helper class that provides a standard way to create an ABC using\ninheritance.",
"type": "object",
"properties": {
"persist_data": {
"title": "Persist Data",
"default": true,
"type": "boolean"
},
"qnode_device_import_path": {
"title": "Qnode Device Import Path",
"type": "array",
"minItems": 2,
"maxItems": 2,
"items": [
{
"type": "string"
},
{
"type": "string"
}
]
},
"qnode_device_shots": {
"title": "Qnode Device Shots",
"type": "integer"
},
"qnode_device_wires": {
"title": "Qnode Device Wires",
"type": "integer"
},
"pennylane_active_return": {
"title": "Pennylane Active Return",
"type": "boolean"
}
}
}
}
}

CONFIG

extra: EXTRA = allow

field device: str = 'default.qubit'

Validated by

  • set_name

field executors: Sequence[BaseQExecutor]   [Required]

Validated by

  • set_name

field pennylane_active_return: bool = None

Validated by

  • set_name

field persist_data: bool = True

Validated by

  • set_name

field qnode_device_import_path: Tuple[str, str] = None

Validated by

  • set_name

field qnode_device_shots: Optional[int] = None

Validated by

  • set_name

field qnode_device_wires: int = None

Validated by

  • set_name

field selector: Union[str, Callable] = 'cyclic'

Validated by

  • set_name

batch_submit(qscripts_list) [source]

deserialize_selector() [source]

Deserializes the cluster’s selector function.

Return Type

Union[str, Callable]

dict(*args, **kwargs) [source]

Custom dict method to create a hashable executors attribute.

Return Type

get_selector() [source]

Wraps self.selector to return defaults corresponding to string values.

This method is called inside batch_submit.

Return Type

callable

serialize_selector() [source]

Serializes the cluster’s selector function.

Return Type

None

Lepton

Language Decorators

Decorator to use languages other than Python, including scripting languages

Lepton wrappers.

Functions:

bash([_func, display_name, executor, files, …])

Bash decorator which wraps a Python function as a Bash Lepton.

covalent.leptons.bash(_func=None, *, display_name='', executor='dask', files=[], deps_bash=[], deps_pip=None, call_before=[], call_after=[])         [source]

Bash decorator which wraps a Python function as a Bash Lepton.

Return Type

Callable

Language Classes

More robust definition of languages other than Python.

Examples

Task Helpers

Dependencies

Generic dependencies for an electron

class covalent._workflow.deps.Deps(apply_fn=None, apply_args=[], apply_kwargs={}, *, retval_keyword='') [source]

Generic dependency class used in specifying any kind of dependency for an electron.

apply_fn

function to be executed in the backend environment

apply_args

list of arguments to be applied in the backend environment

apply_kwargs

dictionary of keyword arguments to be applied in the backend environment

Methods:

apply()

Encapsulates the exact function and args/kwargs to be executed in the backend environment.

apply() [source]

Encapsulates the exact function and args/kwargs to be executed in the backend environment.

Parameters

None-

Return Type

Returns

A tuple of transportable objects containing the function and optional args/kwargs

Main Covalent public functionality.

Classes:

DepsBash([commands])

Shell commands to run before an electron

DepsCall([func, args, kwargs, …])

Functions, shell commands, PyPI packages, and other types of dependencies to be called in an electron’s execution environment

DepsPip([packages, reqs_path])

PyPI packages to be installed before executing an electron

class covalent.DepsBash(commands=[])

Shell commands to run before an electron

Deps class to encapsulate Bash dependencies for an electron.

The specified commands will be executed as subprocesses in the same environment as the electron.

commands

A list of bash commands to execute before the electron runs.

from_dict(object_dict)        [source]

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

to_dict()        [source]

Return a JSON-serializable dictionary representation of self

Return Type

dict

class covalent.DepsCall(func=None, args=[], kwargs={}, *, retval_keyword='', override_reserved_retval_keys=False)

Functions, shell commands, PyPI packages, and other types of dependencies to be called in an electron’s execution environment

Deps class to encapsulate python functions to be called in the same execution environment as the electron.

func

A callable

args

args list

kwargs

kwargs list

retval keyword

An optional string referencing the return value of func.

If retval_keyword is specified, the return value of func will be passed during workflow execution as an argument to the electron corresponding to the parameter of the same name.

NOTES

Electron parameters to be injected during execution must have default parameter values.

It is the user’s responsibility to ensure that retval_keyword is actually a parameter of the electron. Unexpected behavior may occur otherwise.

from_dict(object_dict)       [source]

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

to_dict()       [source]

Return a JSON-serializable dictionary representation of self

Return Type

dict

Parameters

object_dict – a dictionary representation returned by to_dict

class covalentDepsPip(packages=[], reqs_path=' ')

PyPI packages to be installed before executing an electron

A specification of Pip packages to be installed

packages

A list of PyPI packages to install

reqs_path

Path to requirements.txt (overrides packages)

These packages are installed in an electron’s execution environment just before the electron is run.

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

from_dict(object_dict) [source]

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

to_dict() [source]

Return a JSON-serializable dictionary representation of self

Return Type

dict

Examples

  • Add a Bash dependency to an electron
  • Add a callable dependency to an electron
  • Add a Pip dependency to an electron

Bash Dependencies

Shell commands to run before an electron

class covalent._workflow.depsbash.DepsBash(commands=[])

Shell commands to run before an electron

Deps class to encapsulate Bash dependencies for an electron.

The specified commands will be executed as subprocesses in the same environment as the electron.

commands

A list of bash commands to execute before the electron runs.

Methods:

from_dict(object_dict)

Rehydrate a dictionary representation

to_dict()

Return a JSON-serializable dictionary representation of self

from_dict(object_dict) [source]

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

to_dict() [source]

Return a JSON-serializable dictionary representation of self

Return Type

dict

Examples

  • Add a Bash dependency to an electron

Call Dependencies

Functions, shell commands, PyPI packages, and other types of dependencies to be called in an electron’s execution environment

class covalent._workflow.depscall.DepsCall(func=None, args=[], kwargs={}, *, retval_keyword='', override_reserved_retval_keys=False)

Functions, shell commands, PyPI packages, and other types of dependencies to be called in an electron’s execution environment

Deps class to encapsulate python functions to be called in the same execution environment as the electron.

func

A callable

args

args list

kwargs

kwargs list

retval keyword

An optional string referencing the return value of func.

If retval_keyword is specified, the return value of func will be passed during workflow execution as an argument to the electron corresponding to the parameter of the same name.

NOTES

Electron parameters to be injected during execution must have default parameter values.

It is the user’s responsibility to ensure that retval_keyword is actually a parameter of the electron. Unexpected behavior may occur otherwise.

Methods:

from_dict(object_dict)

Rehydrate a dictionary representation

to_dict()

Return a JSON-serializable dictionary representation of self

from_dict(object_dict)        [source]

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

to_dict()        [source]

Return a JSON-serializable dictionary representation of self

Return Type

dict

Examples

  • Add a callable dependency to an electron

Pip Dependencies

PyPI packages to be installed before executing an electron

class covalent._workflow.depspip.DepsPip(packages=[], reqs_path='')

PyPI packages to be installed before executing an electron

A specification of Pip packages to be installed

packages

A list of PyPI packages to install

reqs_path

Path to requirements.txt (overrides packages)

These packages are installed in an electron’s execution environment just before the electron is run.

Methods:

from_dict(object_dict)

Rehydrate a dictionary representation
Return a JSON-serializable dictionary representation of self

from_dict(object_dict) [source]

Rehydrate a dictionary representation

Parameters

object_dict – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

to_dict() [source]

Return a JSON-serializable dictionary representation of self

Return Type

dict

Examples

  • Add a Pip dependency to an electron

File Transfer

File Transfer from (source) and to (destination) local or remote files prior/post electron execution. Instances are are provided to files keyword argument in an electron decorator.

class covalent._file_transfer.file.File(filepath=None, is_remote=False, is_dir=False, include_folder=False)        [source]

File class to store components of provided URI including scheme (s3://, file://, ect.) determine if the file is remote, and acts a facade to facilitate filesystem operations.

filepath

File path corresponding to the file.

is_remote

Flag determining if file is remote (override). Default is resolved automatically from file scheme.

is_dir

Flag determining if file is a directory (override). Default is determined if file uri contains trailing slash.

include_folder

Flag that determines if the folder should be included in the file transfer, if False only contents of folder are transfered.

class covalent._file_transfer.folder.Folder(filepath=None, is_remote=False, is_dir=True, include_folder=False)        [source]

Folder class to store components of provided URI including scheme (s3://, file://, ect.), determine if the file is remote, and act as facade to facilitate filesystem operations. Folder is a child of the File class which sets is_dir flag to True.

include_folder

Flag that determines if the folder should be included in the file transfer, if False only contents of folder are transfered.

class covalent._file_transfer.file_transfer.FileTransfer(from_file=None, to_file=None, order=<Order.BEFORE: 'before'>, strategy=None)        [source]

FileTransfer object class that takes two File objects or filepaths (from, to) and a File Transfer Strategy to perform remote or local file transfer operations.

from_file

Filepath or File object corresponding to the source file.

to_file

Filepath or File object corresponding to the destination file.

order

Order (enum) to execute the file transfer before (Order.BEFORE) or after (Order.AFTER) electron execution.

strategy

Optional File Transfer Strategy to perform file operations - default will be resolved from provided file schemes.

covalent._file_transfer.file_transfer.TransferFromRemote(from_filepath, to_filepath=None, strategy=None, order=<Order.BEFORE: 'before'>)        [source]

Factory for creating a FileTransfer instance where from_filepath is implicitly created as a remote File Object, and the order (Order.BEFORE) is set so that this file transfer will occur prior to electron execution.

Parameters

  • from_filepath (str) – File path corresponding to remote file (source).
  • to_filepath (Optional[str]) – File path corresponding to local file (destination)
  • strategy (Optional[FileTransferStrategy]) – Optional File Transfer Strategy to perform file operations - default will be resolved from provided file schemes.
  • order (Order) – Order (enum) to execute the file transfer before (Order.BEFORE) or after (Order.AFTER) electron execution - default is BEFORE

Return Type

Returns

FileTransfer instance with implicit Order.BEFORE enum set and from (source) file marked as remote

covalent._file_transfer.file_transfer.TransferToRemote(to_filepath, from_filepath=None, strategy=None, order=<Order.AFTER: 'after'>)        [source]

Factory for creating a FileTransfer instance where to_filepath is implicitly created as a remote File Object, and the order (Order.AFTER) is set so that this file transfer will occur post electron execution.

Parameters

  • to_filepath (Optional[str]) – File path corresponding to local file (destination)
  • from_filepath (str) – File path corresponding to remote file (source).
  • strategy (Optional[FileTransferStrategy]) – Optional File Transfer Strategy to perform file operations - default will be resolved from provided file schemes.
  • order (Order – Order (enum) to execute the file transfer before (Order.BEFORE) or after (Order.AFTER) electron execution - default is BEFORE

Return Type

Returns

FileTransfer instance with implicit Order.AFTER enum set and to (destination) file marked as remote

Examples

File Transfer Strategies

A set of classes with a shared interface to perform copy, download, and upload operations given two (source & destination) File objects that support various protocols.

Classes:

Blob([client_id, client_secret, tenant_id])

Implements FileTransferStrategy class to transfer files to/from Azure Blob Storage.

GCloud([credentials, project_id])

Implements FileTransferStrategy class to transfer files to/from Google Cloud Storage.

HTTP()

Implements Base FileTransferStrategy class to use HTTP to download files from public URLs.

Rsync([user, host, private_key_path])

Implements Base FileTransferStrategy class to use rsync to move files to and from remote or local filesystems.

S3([credentials, profile, region_name])

Implements Base FileTransferStrategy class to upload/download files from S3 Bucket.

class covalent.fs_strategies.Blob(client_id=None, client_secret=None, tenant_id=None)

Implements FileTransferStrategy class to transfer files to/from Azure Blob Storage.

PARAMETERS

  • client_id (Optional[str]) - ID of a service principal authorized to perform the transfer
  • client_secret (Optional[str]) - Corresponding secret key for the service principal credentials
  • tenant_id (Optional[str]) - The Azure Active Directory tenant ID which owns the cloud resources.
credentials

A tuple containing (client_id, client_secret, tenant_id)

Methods:

cp(from_file[, to_file])

RTYPE

File

download(from_file[, to_file])

Download files or the contents of folders from Azure Blob Storage.

upload(from_file[, to_file])

Upload files or the contents of folders to Azure Blob Storage.

cp(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

download(from_file, to_file=<covalent._file_transfer.file.File object>)

Download files or the contents of folders from Azure Blob Storage.

PARAMETERS

  • from_file (File) - File object referencing an object in Azure Blob storage
  • to_file (File) - File object referencing a path in the local filesystem

RETURNS

Download function that is injected into wrapper_fn

RETURN TYPE

callable

upload(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

Upload files or the contents of folders to Azure Blob Storage.

PARAMETERS

  • from_file (File) - File object referencing a path in the local filesystem
  • to_file (File) - File object referencing an object in Azure Blob storage

RETURNS

Upload function that is injected into wrapper_fn

RETURN TYPE

callable

class covalent.fs_strategies.GCloud(client_id=None, client_secret=None, tenant_id=None)

Implements FileTransferStrategy class to transfer files to/from Google Cloud Storage.

PARAMETERS

  • credentials (Optional[str]) - IPath to OAuth 2.0 credentials JSON file for a service account
  • project_id (Optional[str]) - ID of a project in GCP
credentials

String containing OAuth 2.0 credentials.

project_id

ID of a project in GCP

Methods:

cp(from_file[, to_file])

RTYPE

File

download(from_file[, to_file])

callable

upload(from_file[, to_file])

callable

cp(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

download(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

callable

upload(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

callable

class covalent.fs_strategies.HTTP

Bases: covalent._file_transfer.strategies.transfer_strategy_base.FileTransferStrategy

Implements Base FileTransferStrategy class to use HTTP to download files from public URLs.

cp(from_file[, to_file])

RTYPE File

download(from_file[, to_file])

RTYPE File

upload(from_file[, to_file])

RTYPE File

cp(from_file, to_file=<covalent._file_transfer.file.File object>)

Return Type

download(from_file, to_file=<covalent._file_transfer.file.File object>)

Return Type

upload(from_file, to_file=<covalent._file_transfer.file.File object>)

Return Type

class covalent.fs_strategies.Rsync(user='', host='', private_key_path=None)

Bases: covalent._file_transfer.strategies.transfer_strategy_base.FileTransferStrategy

Implements Base FileTransferStrategy class to use rsync to move files to and from remote or local filesystems. Rsync via ssh is used if one of the provided files is marked as remote.

user

(optional) Determine user to specify for remote host if using rsync with ssh

host

(optional) Determine what host to connect to if using rsync with ssh

private_key_path

(optional) Filepath for ssh private key to use if using rsync with ssh

Methods:

cp(from_file[, to_file])

RTYPE None

download(from_file[, to_file])

RTYPE File

get_rsync_cmd(from_file, to_file[, …])

RTYPE str

get_rsync_ssh_cmd(local_file, remote_file[, …])

RTYPE str

return_subprocess_callable(cmd)

RTYPE None

upload(from_file, to_file)

RTYPE None

cp(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

None

download(from_file, to_file=<covalent._file_transfer.file.File object>)

Return Type

get_rsync_cmd(from_file, to_file, transfer_from_remote=False)

Return Type

str

get_rsync_ssh_cmd(local_file, remote_file, transfer_from_remote=False)

Return Type

str

return_subprocess_callable(cmd)

Return Type

None

upload(from_file, to_file)

Return Type

None

class covalent.fs_strategies.S3(credentials=None, profile=None, region_name=None)

Bases: covalent._file_transfer.strategies.transfer_strategy_base.FileTransferStrategy

Implements Base FileTransferStrategy class to upload/download files from S3 Bucket.

Methods:

cp(from_file[, to_file])

RTYPE File

download(from_file[, to_file])

Download files or the contents of folders from S3 bucket.

upload(from_file[, to_file])

Upload files or folders to S3 bucket.

cp(from_file, to_file=<covalent._file_transfer.file.File object>)

RETURN TYPE

download(from_file, to_file=<covalent._file_transfer.file.File object>)

Download files or the contents of folders from S3 bucket.

Return Type

upload(from_file, to_file=<covalent._file_transfer.file.File object>)

Upload files or folders to S3 bucket.

Return Type

Examples

Executors

Synchronous Base Executor Class

class covalent.executor.base.BaseExecutor(*args, **kwargs)[source]

Base executor class to be used for defining any executor plugin. Subclassing this class will allow you to define your own executor plugin which can be used in covalent.

log_stdout

The path to the file to be used for redirecting stdout.

log_stderr

The path to the file to be used for redirecting stderr.

cache_dir

The location used for cached files in the executor.

time_limit

time limit for the task

retries

Number of times to retry execution upon failure

Methods:

cancel(task_metadata, job_handle)

Method to cancel the job identified uniquely by the job_handle (base class)

execute(function, args, kwargs, dispatch_id, …)

Execute the function with the given arguments.

from_dict (object_dict)

Rehydrate a dictionary representation

get_cancel_requested()

Check if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Abstract method to run a function in the executor.

set_job_handle(handle)

Save the job_id/handle returned by the backend executing the task

setup(task_metadata)

Placeholder to run any executor specific tasks

teardown(task_metadata)

Placeholder to run nay executor specific cleanup/teardown actions

to_dict()

Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Executor specific teardown method

cancel(task_metadata, job_handle)       [source]

Method to cancel the job identified uniquely by the job_handle (base class)

Arg(s)

task_metadata: Metadata of the task to be canceled job_handle: Unique ID of the job assigned by the backend

Return(s)

False by default

Return Type

Literal[False]

execute(function, args, kwargs, dispatch_id, results_dir, node_id=- 1)       [source]

Execute the function with the given arguments.

This calls the executor-specific run() method.

Parameters

  • function (Callable) – The input python function which will be executed and whose result is ultimately returned by this function.
  • args (List) – List of positional arguments to be used by the function.
  • kwargs (Dict) – Dictionary of keyword arguments to be used by the function.
  • dispatch_id (str) – The unique identifier of the external lattice process which is calling this function.
  • results_dir (str) – The location of the results directory.
  • node_id (int) – ID of the node in the transport graph which is using this executor

Return(s)

The result of the function execution.

Return Type

output

from_dict(object_dict)

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

get_cancel_requested()       [source]

Check if the task was requested to be canceled by the user

Arg(s)

None

Return(s)

True/False whether task cancelation was requested

Return Type

bool

get_dispatch_context(dispatch_info)       [source]

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return Type

AbstractContextManager[DispatchInfo]

Return(s)

A context manager object that handles the dispatch info.

abstract run(function, args, kwargs, task_metadata)       [source]

Abstract method to run a function in the executor.

Parameters

  • function (Callable) – The function to run in the executor
  • args (List) – List of positional arguments to be used by the function.
  • kwargs (Dict) – Dictionary of keyword arguments to be used by the function.
  • task_metadata(Dict) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id

Returns

A context manager object that handles the dispatch info

Return Type

output

set_job_handle(handle)       [source]

Save the job_id/handle returned by the backend executing the task

Arg(s)

handle: Any JSONable type to identifying the task being executed by the backend

Return(s)

Response from saving the job handle to database

Return Type

Any

setup(task_metadata)       [source]

Placeholder to run any executor specific tasks

Return Type

Any

teardown(task_metadata)       [source]

Placeholder to run any executor specific tasks

Return Type

Any

to_dict()

Return a JSON-serializable dictionary representation of self

Return Type

dict

write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)       [source]

Write the contents of stdout and stderr to respective files.

Parameters

  • stream_strings (Iterable[str]) – The stream_strings to be written to files.
  • filepaths (Iterable[str]) – The location of the results directory.
  • dispatch_id (str) – The ID of the dispatch which initiated the request.
  • results_dir (str) – The location of the results directory.

Return Type

None

Asynchronous Base Executor Class

class covalent.executor.base.AsyncBaseExecutor(*args, **kwargs)[source]

Async base executor class to be used for defining any executor plugin. Subclassing this class will allow you to define your own executor plugin which can be used in covalent.

This is analogous to BaseExecutor except the run() method, together with the optional setup() and teardown() methods, are coroutines.

log_stdout

The path to the file to be used for redirecting stdout.

log_stderr

The path to the file to be used for redirecting stderr.

cache_dir

The location used for cached files in the executor.

time_limit

time limit for the task

retries

Number of times to retry execution upon failure

Methods:

cancel(task_metadata, job_handle)

Executor specific task cancelation method

from_dict (object_dict)

Rehydrate a dictionary representation
Get if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Abstract method to run a function in the executor.
Save the job handle to database

setup(task_metadata)

Executor specific setup method

teardown(task_metadata)

Executor specific teardown method
Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

async cancel(task_metadata, job_handle)       [source]

Executor specific task cancelation method

Arg(s)

task_metadata: Metadata of the task to be canceled job_handle: Unique ID of the job assigned by the backend

Return(s)

False by default

Return Type

Literal[False]

from_dict(object_dict)

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

async get_cancel_requested()       [source]

Get if the task was requested to be canceled

Arg(s)

None

Return(s)

Whether the task has been requested to be canceled

Return Type

Any

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return Type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

abstract run(function, args, kwargs, task_metadata)       [source]

Abstract method to run a function in the executor.

Parameters

  • function (Callable) – The function to run in the executor
  • args (List) – List of positional arguments to be used by the function.
  • kwargs (Dict) – Dictionary of keyword arguments to be used by the function.
  • task_metadata(Dict) – Dictionary of metadata for the task. Current keys are dispatch_id and node_id

Returns

A context manager object that handles the dispatch info

Return Type

output

async set_job_handle(handle)       [source]

Save the job_id/handle returned by the backend executing the task

Arg(s)

handle: Any JSONable type to identifying the task being executed by the backend

Return(s)

Response from saving the job handle to database

Return Type

Any

async setup(task_metadata)       [source]

Executor specific setup method

async teardown(task_metadata)       [source]

Executor specific teardown method

to_dict()

Return a JSON-serializable dictionary representation of self

Return Type

dict

async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)       [source]

Write the contents of stdout and stderr to respective files.

Parameters

  • stream_strings (Iterable[str]) – The stream_strings to be written to files.
  • filepaths (Iterable[str]) – The filepaths to be used for writing the streams.
  • dispatch_id (str) – The ID of the dispatch which initiated the request.
  • results_dir (str) – The location of the results directory.

Return Type

None

Dask Executor

Executing tasks (electrons) in a Dask cluster. This is the default executor when covalent is started without the --no-cluster flag.

from dask.distributed import LocalCluster

cluster = LocalCluster()
print(cluster.scheduler_address)

The address will look like tcp://127.0.0.1:55564 when running locally. Note that the Dask cluster does not persist when the process terminates.

This cluster can be used with Covalent by providing the scheduler address:

import covalent as ct

dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://127.0.0.1:55564"
)

@ct.electron(executor=dask_executor)
def my_custom_task(x, y):
return x + y

...

class covalent.executor.executor_plugins.dask.DaskExecutor(scheduler_address='', log_stdout='stdout.log', log_stderr='stderr.log', conda_env='', cache_dir='', current_env_on_conda_fail=False)       [source]

Dask executor class that submits the input function to a running dask cluster.

Methods:

cancel(task_metadata, job_handle)

Cancel the task being executed by the dask executor currently

from_dict (object_dict)

Rehydrate a dictionary representation

get_cancel_requested()

Get if the task was requested to be canceled

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Submit the function and inputs to the dask cluster

set_job_handle(handle)

Save the job handle to database

setup(task_metadata)

Executor specific setup method

teardown(task_metadata)

Executor specific teardown method

to_dict()

Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

async cancel(task_metadata, job_handle)        [source]

Cancel the task being executed by the dask executor currently

Arg(s)

task_metadata: Metadata associated with the task job_handle: Key assigned to the job by Dask

Return(s)

True by default

Return Type

Literal[True]

from_dict(object_dict)

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

async get_cancel_requested()

Get if the task was requested to be canceled

Arg(s)

None

Return(s)

Whether the task has been requested to be canceled

Return Type

Any

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return Type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

async run(function, args, kwargs, task_metadata)        [source]

Submit the function and inputs to the dask cluster
async set_job_handle(handle)

Save the job handle to database

Arg(s)

handle: JSONable type identifying the job being executed by the backend

Return(s)

Response from the listener that handles inserting the job handle to database

Return Type

Any

async setup(task_metadata)

Executor specific setup method

async teardown(task_metadata)

Executor specific teardown method

to_dict()

Return a JSON-serializable dictionary representation of self

Return Type

dict

async write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)

Write the contents of stdout and stderr to respective files.

Parameters

  • stream_strings (Iterable[str]) – The stream_strings to be written to files.
  • filepaths (Iterable[str]) – The filepaths to be used for writing the streams.
  • dispatch_id (str) – The ID of the dispatch which initiated the request.
  • results_dir (str) – The location of the results directory.

This uses aiofiles to avoid blocking the event loop.

Return Type

None

Local Executor

Executing tasks (electrons) directly on the local machine

class covalent.executor.executor_plugins.local.LocalExecutor(*args, **kwargs)       [source]

Local executor class that directly invokes the input function.

Methods:

cancel(task_metadata, job_handle)

Method to cancel the job identified uniquely by the job_handle (base class)

execute(function, args, kwargs, dispatch_id, …)

Execute the function with the given arguments.

from_dict (object_dict)

Rehydrate a dictionary representation
Check if the task was requested to be canceled by the user

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

run(function, args, kwargs, task_metadata)

Execute the function locally
Save the job_id/handle returned by the backend executing the task

setup(task_metadata)

Placeholder to run any executor specific tasks

teardown(task_metadata)

Placeholder to run nay executor specific cleanup/teardown actions
Return a JSON-serializable dictionary representation of self

write_streams_to_file(stream_strings, …)

Write the contents of stdout and stderr to respective files.

cancel(task_metadata, job_handle)

Method to cancel the job identified uniquely by the job_handle (base class)

Arg(s)

task_metadata: Metadata of the task to be canceled job_handle: Unique ID of the job assigned by the backend

Return(s)

False by default

Return Type

Literal[False]

execute(function, args, kwargs, dispatch_id, results_dir, node_id=- 1)

Execute the function with the given arguments.

This calls the executor-specific run() method.

Parameters

  • function (Callable) – The input python function which will be executed and whose result is ultimately returned by this function.
  • args (List) – List of positional arguments to be used by the function.
  • kwargs (Dict) – Dictionary of keyword arguments to be used by the function.
  • dispatch_id (str) – The unique identifier of the external lattice process which is calling this function.
  • results_dir (str) – The location of the results directory.
  • node_id (int) – ID of the node in the transport graph which is using this executor.

Returns

The result of the function execution.

Return Type

output

from_dict(object_dict)

Rehydrate a dictionary representation

Parameters

object_dict (dict) – a dictionary representation returned by to_dict

Return Type

Returns

self

Instance attributes will be overwritten.

get_cancel_requested()

Check if the task was requested to be canceled by the user

Arg(s)

None

Returns

True/False whether task cancelation was requested

Return Type

bool

get_dispatch_context(dispatch_info)

Start a context manager that will be used to access the dispatch info for the executor.

Parameters

dispatch_info (DispatchInfo) – The dispatch info to be used inside current context.

Return Type

AbstractContextManager[DispatchInfo]

Returns

A context manager object that handles the dispatch info.

run(function, args, kwargs, task_metadata)                          [source]

Execute the function locally

Arg(s)

function: Function to be executed args: Arguments passed to the function kwargs: Keyword arguments passed to the function task_metadata: Metadata of the task to be executed

Returns

Task output

Return Type

Any

set_job_handle(handle)

Save the job_id/handle returned by the backend executing the task

Arg(s)

handle: JSONable type identifying the job being executed by the backend

Returns

Response from saving the job handle to database

Return Type

Any

setup(task_metadata)

Placeholder to run any executor specific tasks

Return Type

Any

teardown(task_metadata)

Placeholder to run nay executor specific cleanup/teardown actions

Return Type

Any

to_dict()

Return a JSON-serializable dictionary representation of self

Return Type

dict

write_streams_to_file(stream_strings, filepaths, dispatch_id, results_dir)

Write the contents of stdout and stderr to respective files.

Parameters

  • stream_strings (Iterable[str]) – The stream_strings to be written to files.
  • filepaths (Iterable[str]) – The filepaths to be used for writing the streams.
  • dispatch_id (str) – The ID of the dispatch which initiated the request.
  • results_dir (str) – The location of the results directory.

Return Type

None

AWS Plugins

Covalent is a python based workflow orchestration tool used to execute HPC and quantum tasks in heterogenous environments.

By installing Covalent AWS Plugins users can leverage a broad plugin ecosystem to execute tasks using AWS resources best fit for each task.

Covalent AWS Plugins installs a set of executor plugins that allow tasks to be run in an EC2 instance, AWS Lambda, AWS ECS Cluster, AWS Batch Compute Environment, and as an AWS Braket Job for tasks requiring Quantum devices.

If you’re new to covalent visit our Getting Started Guide.

1. Installation

To use the AWS plugin ecosystem with Covalent, simply install it with pip:

pip install "covalent-aws-plugins[all]"

This will ensure that all the AWS executor plugins listed below are installed.

Note

Users will require Terraform to be installed in order to use the EC2 plugin.

2. Included Plugins

While each plugin can be seperately installed installing the above pip package installs all of the below plugins.

PluginsPlugin NameUse Case
AWS Batch Executor Useful for heavy compute workloads (high CPU/memory). Tasks are queued to execute in the user defined Batch compute environment.
AWS EC2 ExecutorGeneral purpose compute workloads where users can select compute resources. An EC2 instance is auto-provisioned using terraform with selected compute settings to execute tasks.
AWS Braket ExecutorSuitable for Quantum/Classical hybrid workflows. Tasks are executed using a combination of classical and quantum devices.
AWS ECS Executor Useful for moderate to heavy workloads (low memory requirements). Tasks are executed in an AWS ECS cluster as containers.
AWS Lambda Executor Suitable for short lived tasks that can be parallalized (low memory requirements). Tasks are executed in serverless AWS Lambda functions.

3. Usage Example

  • Firstly, import covalent
import covalent as ct
  • Secondly, define your executor
executor = ct.executor.AWSBatchExecutor(
s3_bucket_name = "covalent-batch-qa-job-resources",
batch_job_definition_name = "covalent-batch-qa-job-definition",
batch_queue = "covalent-batch-qa-queue",
batch_execution_role_name = "ecsTaskExecutionRole",
batch_job_role_name = "covalent-batch-qa-job-role",
batch_job_log_group_name = "covalent-batch-qa-log-group",
vcpu = 2, # Number of vCPUs to allocate
memory = 3.75, # Memory in GB to allocate
time_limit = 300, # Time limit of job in seconds
)
  • Lastly, define a workflow to execute a particular task using one of the above executors
@ct.electron(
executor=executor
)
def compute_pi(n):
# Leibniz formula for π
return 4 * sum(1.0/(2*i + 1)*(-1)**i for i in range(n))

@ct.lattice
def workflow(n):
return compute_pi(n)


dispatch_id = ct.dispatch(workflow)(100000000)
result = ct.get_result(dispatch_id=dispatch_id, wait=True)
print(result.result)

Which should output

3.141592643589326

AWS Batch Executor

Covalent is a Pythonic workflow tool used to execute tasks on advanced computing hardware.

This executor plugin interfaces Covalent with AWS Batch which allows tasks in a covalent workflow to be executed as AWS batch jobs.

Furthermore, this plugin is well suited for compute/memory intensive tasks such as training machine learning models, hyperparameter optimization, deep learning etc. With this executor, the compute backend is the Amazon EC2 service, with instances optimized for compute and memory intensive operations.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-awsbatch-plugin

This is an example of how a workflow can be adapted to utilize the AWS Batch Executor. Here we train a simple Support Vector Machine (SVM) model and use an existing AWS Batch Compute environment to run the train_svm electron as a batch job. We also note we require DepsPip to install the dependencies when creating the batch job.

from numpy.random import permutation
from sklearn import svm, datasets
import covalent as ct

deps_pip = ct.DepsPip(
packages=["numpy==1.23.2", "scikit-learn==1.1.2"]
)

executor = ct.executor.AWSBatchExecutor(
s3_bucket_name = "covalent-batch-qa-job-resources",
batch_job_definition_name = "covalent-batch-qa-job-definition",
batch_queue = "covalent-batch-qa-queue",
batch_execution_role_name = "ecsTaskExecutionRole",
batch_job_role_name = "covalent-batch-qa-job-role",
batch_job_log_group_name = "covalent-batch-qa-log-group",
vcpu = 2, # Number of vCPUs to allocate
memory = 3.75, # Memory in GB to allocate
time_limit = 300, # Time limit of job in seconds
)

# Use executor plugin to train our SVM model.
@ct.electron(
executor=executor,
deps_pip=deps_pip
)
def train_svm(data, C, gamma):
X, y = data
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X[90:], y[90:])
return clf

@ct.electron
def load_data():
iris = datasets.load_iris()
perm = permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
return iris.data, iris.target

@ct.electron
def score_svm(data, clf):
X_test, y_test = data
return clf.score(
X_test[:90],
y_test[:90]
)

@ct.lattice
def run_experiment(C=1.0, gamma=0.7):
data = load_data()
clf = train_svm(
data=data,
C=C,
gamma=gamma
)
score = score_svm(
data=data,
clf=clf
)
return score

# Dispatch the workflow
dispatch_id = ct.dispatch(run_experiment)(
C=1.0,
gamma=0.7
)

# Wait for our result and get result value
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

print(result)

During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the score of our model.

0.8666666666666667
Config KeyIs RequiredDefaultDescription
profileNodefaultNamed AWS profile used for authentication
regionYesus-east-1AWS Region to use to for client calls
credentialsNo~/.aws/credentialsThe path to the AWS credentials file
batch_queueYescovalent-batch-queueName of the Batch queue used for job management
s3_bucket_nameYescovalent-batch-job-resourcesName of an S3 bucket where covalent artifacts are stored.
batch_job_definition_nameYescovalent-batch-jobsName of the Batch job definition for a user, project, or experiment.
batch_execution_role_nameNoecsTaskExecutionRoleName of the IAM role used by the Batch ECS agent (the above role should already exist in AWS).
batch_job_role_nameYedCovalentBatchJobRoleName of the IAM role used within the container.
batch_job_log_group_nameYescovalent-batch-job-logsName of the CloudWatch log group where container logs are stored.
vcpuNo2The number of vCPUs available to a task
memoryNo3.75Memory (in GB) available to a task
num_gpusNo0Number of GPUs available to a task.
retry_attemptsNo3Number of times a job is retried if it fails.
time_limitNo300Time limit (in seconds) after which jobs are killed.
poll_freqNo10Frequency (in seconds) with which to poll a submitted task.
cache_dirNo/tmp/covalentCache directory used by this executor for temporary files.

This plugin can be configured in one of two ways:

  1. Configuration options can be passed in as constructor keys to the executor class ct.executor.AWSBatchExecutor
  2. By modifying the covalent configuration file under the section [executors.awsbatch]

The following shows an example of how a user might modify their covalent configuration file to support this plugin:

[executors.awsbatch]
s3_bucket_name = "covalent-batch-job-resources"
batch_queue = "covalent-batch-queue"
batch_job_definition_name = "covalent-batch-jobs"
batch_execution_role_name = "ecsTaskExecutionRole"
batch_job_role_name = "CovalentBatchJobRole"
batch_job_log_group_name = "covalent-batch-job-logs"
...

In order to run your workflows with covalent there are a few notable AWS resources that need to be provisioned first.

Config ValueIs RequiredDefaultDescription
AWS S3 BucketYescovalent-batch-job-resourcesS3 bucket must be created for covalent to store essential files that are needed during execution.
VPC & SubnetYesN/AA VPC must be associated with the AWS Batch Compute Environment along with a public or private subnet (there needs to be additional resources created for private subnets)
AWS Batch Compute EnvironmentYesN/AAn AWS Batch compute environment (EC2) that will provision EC2 instances as needed when jobs are submitted to the associated job queue.
AWS Batch QueueYesbatch_queueAn AWS Batch Job Queue that will queue tasks for execution in it’s associated compute environment.
AWS Batch Job DefinitionYesbatch_job_role_nameAn AWS Batch job definition that will be replaced by a new batch job definition when the workflow is executed.
AWS IAM Role (Job Role)Yesbatch_job_definition_nameThe IAM role used within the container.
AWS IAM Role (Execution Role)Nobatch_execution_role_nameThe IAM role used by the Batch ECS agent (default role ecsTaskExecutionRole should already exist).
Log GroupYesbatch_job_log_group_nameAn AWS CloudWatch log group where task logs are stored.
  1. To create an AWS S3 Bucket refer to the following AWS documentation.
  2. To create a VPC & Subnet refer to the following AWS documentation.
  3. To create an AWS Batch Queue refer to the following AWS documentation it must be a compute environment configured in EC2 mode.
  4. To create an AWS Batch Job Definition refer to the following AWS documentation the configuration for this can be trivial as covalent will update the Job Definition prior to execution.
  5. To create an AWS IAM Role for batch jobs (Job Role) one can provision a policy with the following permissions (below) then create a new role and attach with the created policy. Refer to the following AWS documentation for an example of creating a policy & role in IAM.
AWS Batch IAM Job Policy

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BatchJobMgmt",
"Effect": "Allow",
"Action": [
"batch:TerminateJob",
"batch:DescribeJobs",
"batch:SubmitJob",
"batch:RegisterJobDefinition"
],
"Resource": "*"
},
{
"Sid": "ECRAuth",
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken"
],
"Resource": "*"
},
{
"Sid": "ECRUpload",
"Effect": "Allow",
"Action": [
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"ecr:BatchCheckLayerAvailability",
"ecr:InitiateLayerUpload",
"ecr:UploadLayerPart",
"ecr:CompleteLayerUpload",
"ecr:PutImage"
],
"Resource": [
"arn:aws:ecr:<region>:<account>:repository/<ecr_repo_name>"
]
},
{
"Sid": "IAMRoles",
"Effect": "Allow",
"Action": [
"iam:GetRole",
"iam:PassRole"
],
"Resource": [
"arn:aws:iam::<account>:role/CovalentBatchJobRole",
"arn:aws:iam::<account>:role/ecsTaskExecutionRole"
]
},
{
"Sid": "ObjectStore",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:PutObject",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<s3_resource_bucket>/*",
"arn:aws:s3:::<s3_resource_bucket>"
]
},
{
"Sid": "LogRead",
"Effect": "Allow",
"Action": [
"logs:GetLogEvents"
],
"Resource": [
"arn:aws:logs:<region>:<account>:log-group:<cloudwatch_log_group_name>:log-stream:*"
]
}
]
}

AWS Braket Executor

Covalent is a Pythonic workflow tool used to execute tasks on advanced computing hardware.

This plugin allows executing quantum circuits and quantum-classical hybrid jobs in Amazon Braket when you use Covalent.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-braket-plugin

The following toy example executes a simple quantum circuit on one qubit that prepares a uniform superposition of the standard basis states and then measures the state. We use the Pennylane framework.

import covalent as ct
from covalent_braket_plugin.braket import BraketExecutor
import os

# AWS resources to pass to the executor
credentials = "~/.aws/credentials"
profile = "default"
region = "us-east-1"
s3_bucket_name = "braket_s3_bucket"
ecr_repo_name = "braket_ecr_repo"
iam_role_name = "covalent-braket-iam-role"

# Instantiate the executor
ex = BraketExecutor(
profile=profile,
credentials=credentials_file,
s3_bucket_name=s3_bucket_name,
ecr_image_uri=ecr_image_uri,
braket_job_execution_role_name=iam_role_name,
quantum_device="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
classical_device="ml.m5.large",
storage=30,
time_limit=300,
)


# Execute the following circuit:
# |0> - H - Measure
@ct.electron(executor=ex)
def simple_quantum_task(num_qubits: int):
import pennylane as qml

# These are passed to the Hybrid Jobs container at runtime
device_arn = os.environ["AMZN_BRAKET_DEVICE_ARN"]
s3_bucket = os.environ["AMZN_BRAKET_OUT_S3_BUCKET"]
s3_task_dir = os.environ["AMZN_BRAKET_TASK_RESULTS_S3_URI"].split(s3_bucket)[1]

device = qml.device(
"braket.aws.qubit",
device_arn=device_arn,
s3_destination_folder=(s3_bucket, s3_task_dir),
wires=num_qubits,
)

@qml.qnode(device=device)
def simple_circuit():
qml.Hadamard(wires=[0])
return qml.expval(qml.PauliZ(wires=[0]))

res = simple_circuit().numpy()
return res


@ct.lattice
def simple_quantum_workflow(num_qubits: int):
return simple_quantum_task(num_qubits=num_qubits)


dispatch_id = ct.dispatch(simple_quantum_workflow)(1)
result_object = ct.get_result(dispatch_id, wait=True)

# We expect 0 as the result
print("Result:", result_object.result)

During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the output of the quantum measurement.

>>> Result: 0
Config ValueIs RequiredDefaultDescription
credentialsNo“~/.aws/credentials”The path to the AWS credentials file
braket_job_execution_role_nameYes“CovalentBraketJobsExecutionRole”The name of the IAM role that Braket will assume during task execution.
profileNo“default”Named AWS profile used for authentication
regionYes:codeAWS_DEFAULT_REGION environment variableAWS Region to use to for client calls to AWS
s3_bucket_nameYesamazon-braket-covalent-job-resourcesThe S3 bucket where Covalent will store input and output files for the task.
ecr_image_uriYesAn ECR repository for storing container images to be run by Braket.
quantum_deviceNo“arn:aws:braket:::device/quantum-simulator/amazon/sv1”The ARN of the quantum device to use
classical_deviceNo“ml.m5.large”Instance type for the classical device to use
storageNo30Storage size in GB for the classical device
time_limitNo300Max running time in seconds for the Braket job
poll_freqNo30How often (in seconds) to poll Braket for the job status
cache_dirNo“/tmp/covalent”Location for storing temporary files generated by the Covalent server

This plugin can be configured in one of two ways:

  1. Configuration options can be passed in as constructor keys to the executor class ct.executor.BraketExecutor
  2. By modifying the covalent configuration file under the section [executors.braket]

The following shows an example of how a user might modify their covalent configuration file to support this plugin:

[executors.braket]
quantum_device = "arn:aws:braket:::device/qpu/ionq/ionQdevice"
time_limit = 3600

The Braket executor requires some resources to be provisioned on AWS. Precisely, users will need an S3 bucket, an ECR repo, and an IAM role with the appropriate permissions to be passed to Braket.

ResourceIs RequiredConfig KeyDescription
IAM roleYesbraket_job_execution_role_nameAn IAM role granting permissions to Braket, S3, ECR, and a few other resources.
ECR repositoryYesecr_image_uriAn ECR repository for storing container images to be run by Braket.
S3 BucketYess3_bucketAn S3 bucket for storing task-specific data, such as Braket outputs or function inputs.

One can either follow the below instructions to manually create the resources or use the provided terraform script to auto-provision the resources needed.

  1. The AWS documentation on S3 details how to configure an S3 bucket.
  2. The permissions required for the the IAM role are documented in the article “managing access to Amazon Braket”. The following policy is attached to the default role “CovalentBraketJobsExecutionRole”:
  3. In order to use the Braket executor plugin one must create a private ECR registry with a container image that will be used to execute the Braket jobs using covalent. One can either create an ECR repository manually or use the terraform script provided below. We host the image in our public repository at public.ecr.aws/covalent/covalent-braket-executor:stable

Note

The container image can be uploaded to a private ECR as follows

docker pull public.ecr.aws/covalent/covalent-braket-executor:stable

Once the image has been obtained, user’s can tag it with their registry information and upload to ECR as follows

aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws_account_id>.dkr.ecr.<region>.amazonaws.com
docker tag public.ecr.aws/covalent/covalent-braket-executor:stable <aws_account_id>.dkr.ecr.<region>.amazonaws.com/<my-repository>:tag
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/<my-repository>:tag

Sample IAM Policy for Braket's execution role

{ “Version”: “2012-10-17”, “Statement”: [

{ “Sid”: “VisualEditor0”, “Effect”: “Allow”, “Action”: “cloudwatch:PutMetricData”, “Resource”: “*”, “Condition”: {

“StringEquals”: { “cloudwatch:namespace”: “/aws/braket” }

}

}, {

“Sid”: “VisualEditor1”, “Effect”: “Allow”, “Action”: [

“logs:CreateLogStream”, “logs:DescribeLogStreams”, “ecr:GetDownloadUrlForLayer”, “ecr:BatchGetImage”, “logs:StartQuery”, “logs:GetLogEvents”, “logs:CreateLogGroup”, “logs:PutLogEvents”, “ecr:BatchCheckLayerAvailability” ], “Resource”: [

“arn:aws:ecr::348041629502:repository/”, “arn:aws:logs:::log-group:/aws/braket*

]

}, {

“Sid”: “VisualEditor2”, “Effect”: “Allow”, “Action”: “iam:PassRole”, “Resource”: “arn:aws:iam::348041629502:role/CovalentBraketJobsExecutionRole”, “Condition”: {

“StringLike”: { “iam:PassedToService”: “braket.amazonaws.com” }

}

}, {

“Sid”: “VisualEditor3”, “Effect”: “Allow”, “Action”: [

“braket:SearchDevices”, “s3:CreateBucket”, “ecr:BatchDeleteImage”, “ecr:BatchGetRepositoryScanningConfiguration”, “ecr:DeleteRepository”, “ecr:TagResource”, “ecr:BatchCheckLayerAvailability”, “ecr:GetLifecyclePolicy”, “braket:CreateJob”, “ecr:DescribeImageScanFindings”, “braket:GetJob”, “ecr:CreateRepository”, “ecr:PutImageScanningConfiguration”, “ecr:GetDownloadUrlForLayer”, “ecr:DescribePullThroughCacheRules”, “ecr:GetAuthorizationToken”, “ecr:DeleteLifecyclePolicy”, “braket:ListTagsForResource”, “ecr:PutImage”, “s3:PutObject”, “s3:GetObject”, “braket:GetDevice”, “ecr:UntagResource”, “ecr:BatchGetImage”, “ecr:DescribeImages”, “braket:CancelQuantumTask”, “ecr:StartLifecyclePolicyPreview”, “braket:CancelJob”, “ecr:InitiateLayerUpload”, “ecr:PutImageTagMutability”, “ecr:StartImageScan”, “ecr:DescribeImageReplicationStatus”, “ecr:ListTagsForResource”, “s3:ListBucket”, “ecr:UploadLayerPart”, “ecr:CreatePullThroughCacheRule”, “ecr:ListImages”, “ecr:GetRegistryScanningConfiguration”, “braket:TagResource”, “ecr:CompleteLayerUpload”, “ecr:DescribeRepositories”, “ecr:ReplicateImage”, “ecr:GetRegistryPolicy”, “ecr:PutLifecyclePolicy”, “s3:PutBucketPublicAccessBlock”, “ecr:GetLifecyclePolicyPreview”, “ecr:DescribeRegistry”, “braket:SearchJobs”, “braket:CreateQuantumTask”, “iam:ListRoles”, “ecr:PutRegistryScanningConfiguration”, “ecr:DeletePullThroughCacheRule”, “braket:UntagResource”, “ecr:BatchImportUpstreamImage”, “braket:GetQuantumTask”, “s3:PutBucketPolicy”, “braket:SearchQuantumTasks”, “ecr:GetRepositoryPolicy”, “ecr:PutReplicationConfiguration”

], “Resource”: “*

}, {

“Sid”: “VisualEditor4”, “Effect”: “Allow”, “Action”: “logs:GetQueryResults”, “Resource”: “arn:aws:logs:::log-group:*

}, {

“Sid”: “VisualEditor5”, “Effect”: “Allow”, “Action”: “logs:StopQuery”, “Resource”: “arn:aws:logs:::log-group:/aws/braket*

}

]

}

Users can use the following Terraform snippet as a starting point to spin up the required resources


provider "aws" {}

data "aws_caller_identity" "current" {}


resource "aws_s3_bucket" "braket_bucket" {
bucket = "my-s3-bucket-name"
force_destroy = true
}

resource "aws_ecr_repository" "braket_ecr_repo" {
name = "amazon-braket-base-executor-repo"
image_tag_mutability = "MUTABLE"

force_delete = true
image_scanning_configuration {
scan_on_push = false
}

provisioner "local-exec" {
command = "docker pull public.ecr.aws/covalent/covalent-braket-executor:stable && aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin ${data.aws_caller_identity.current.account_id}.dkr.ecr.${var.aws_region}.amazonaws.com && docker tag public.ecr.aws/covalent/covalent-braket-executor:stable ${aws_ecr_repository.braket_ecr_repo.repository_url}:stable && docker push ${aws_ecr_repository.braket_ecr_repo.repository_url}:stable"
}
}

resource "aws_iam_role" "braket_iam_role" {
name = "amazon-braket-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Sid = ""
Principal = {
Service = "braket.amazonaws.com"
}
},
]
})
managed_policy_arns = ["arn:aws:iam::aws:policy/AmazonBraketFullAccess"]
}

AWS EC2 Executor

Covalent is a Pythonic workflow tool used to execute tasks on advanced computing hardware.

This plugin allows tasks to be executed in an AWS EC2 instance (which is auto-created) when you execute your workflow with covalent.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-ec2-plugin

Note

Users will also need to have Terraform installed on their local machine in order to use this plugin.

This is a toy example of how a workflow can be adapted to utilize the EC2 Executor. Here we train a Support Vector Machine (SVM) and spin up an EC2 automatically to execute the train_svm electron. We also note we require DepsPip to install the dependencies on the EC2 instance.

from numpy.random import permutation
from sklearn import svm, datasets
import covalent as ct

deps_pip = ct.DepsPip(
packages=["numpy==1.23.2", "scikit-learn==1.1.2"]
)

executor = ct.executor.EC2Executor(
instance_type="t2.micro",
volume_size=8, #GiB
ssh_key_file="~/.ssh/ec2_key" # default key_name will be "ec2_key"
)

# Use executor plugin to train our SVM model.
@ct.electron(
executor=executor,
deps_pip=deps_pip
)
def train_svm(data, C, gamma):
X, y = data
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X[90:], y[90:])
return clf

@ct.electron
def load_data():
iris = datasets.load_iris()
perm = permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
return iris.data, iris.target

@ct.electron
def score_svm(data, clf):
X_test, y_test = data
return clf.score(
X_test[:90],
y_test[:90]
)

@ct.lattice
def run_experiment(C=1.0, gamma=0.7):
data = load_data()
clf = train_svm(
data=data,
C=C,
gamma=gamma
)
score = score_svm(
data=data,
clf=clf
)
return score

# Dispatch the workflow
dispatch_id = ct.dispatch(run_experiment)(
C=1.0,
gamma=0.7
)

# Wait for our result and get result value
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

print(result)

During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the score of our model.

0.8666666666666667
Config KeyIs RequiredDefaultDescription
profileNodefaultNamed AWS profile used for authentication
regionNous-east-1AWS Region to use to for client calls to AWS
credentials_fileYes~/.aws/credentialsThe path to the AWS credentials file
ssh_key_fileYes~/.ssh/id_rsaThe path to the private key that corresponds to the EC2 Key Pair
instance_typeYest2.microThe EC2 instance type that will be spun up automatically.
key_nameYesName of key specified in ssh_key_file.The name of the AWS EC2 Key Pair that will be used to SSH into EC2 instance
volume_sizeNo8The size in GiB of the GP2 SSD disk to be provisioned with EC2 instance.
vpcNo(Auto created)The VPC ID that will be associated with the EC2 instance, if not specified a VPC will be created.
subnetNo(Auto created)The Subnet ID that will be associated with the EC2 instance, if not specified a public Subnet will be created.
remote_cacheNo~/.cache/covalentThe location on the EC2 instance where covalent artifacts will be created.

This plugin can be configured in one of two ways:

  1. Configuration options can be passed in as constructor keys to the executor class ct.executor.EC2Executor
  2. By modifying the covalent configuration file under the section [executors.ec2]

The following shows an example of how a user might modify their covalent configuration file to support this plugin:

[executors.ec2]
ssh_key_file = "/home/user/.ssh/ssh_key.pem"
key_name = "ssh_key"

This plugin requires users have an AWS account. New users can follow instructions here to create a new account. In order to run workflows with Covalent and the AWS EC2 plugin, there are a few notable resources that need to be provisioned first. Whenever interacting with AWS resources, users strongly recommended to follow best practices for managing cloud credentials. Users are recommended to follow the principle of least privilege. For this executor, users who wish to deploy required infrastructure may use the AWS Managed Policy AmazonEC2FullAccess although some administrators may wish to further restrict instance families, regions, or other options according to their organization’s cloud policies.

The required resources include an EC2 Key Pair, and optionally a VPC & Subnet that can be used instead of the EC2 executor automatically creating it.

ResourceIs RequiredDefaultDescription
AWS EC2 Key PairYeskey_nameAn EC2 Key Pair must be created and named corresponding to the key_name config value. This key pair is used by the executor to connect to the EC2 instance via SSH. This key must also be present in the user’s local machine that is dispatching the workflow and it’s filepath specified under the ssh_key_file config value.
VPCNovpcA VPC ID can be provided corresponding to the vpc config value. Otherwise a VPC will be auto-created for each electron.
SubnetNosubnetA Subnet ID can be provided corresponding to the subnet config value. Otherwise a public Subnet will be auto-created for each electron.
Security GroupNo(Auto created)A security group will be auto created and attached to the VPC in order to give the local machine (dispatching workflow) SSH access to the EC2 instance.
EC2 InstanceNo(Auto created)An EC2 Instance will be automatically provisioned for each electron in the workflow that utilizes this executor.
  1. To create an AWS EC2 Key pair refer to the following AWS documentation.
  2. To create a VPC & Subnet refer to the following AWS documentation.

When tasks are run using this executor, the following infrastructure is ephemerally deployed.

This includes the minimal infrastructure needed to deploy an EC2 instance in a public subnet connected to an internet gateway. Users can validate that resources are correctly provisioned by monitoring the EC2 dashboard in the AWS Management Console. The overhead added by using this executor is on the order of several minutes, depending on the complexity of any additional user-specified runtime dependencies. Users are advised not to use any sensitive data with this executor without careful consideration of security policies. By default, data in transit is cached on the EBS volume attached to the EC2 instance in an unencrypted format.

These resources are torn down upon task completion and not shared across tasks in a workflow. Deployment of these resources will incur charges for EC2 alone; refer to AWS EC2 pricing for details. Note that this can be deployed in any AWS region in which the user is otherwise able to deploy EC2 instances. Some users may encounter quota limits when using EC2; this can be addressed by opening a support ticket with AWS.

AWS ECS Executor

With this executor, users can execute tasks (electrons) or entire lattices using the AWS Elastic Container Service (ECS). This executor plugin is well suited for low to medium compute intensive electrons with modest memory requirements. Since AWS ECS offers very quick spin up times, this executor is a good fit for workflows with a large number of independent tasks that can be dispatched simultaneously.

1. Installation

To use this plugin with Covalent, simply install it using pip:

pip install covalent-ecs-plugin

2. Usage Example

This is an example of how a workflow can be constructed to use the AWS ECS executor. In the example, we join two words to form a phrase and return an excited phrase.

import covalent as ct

executor = ct.executor.ECSExecutor(
s3_bucket_name="covalent-fargate-task-resources",
ecr_repo_name="covalent-fargate-task-images",
ecs_cluster_name="covalent-fargate-cluster",
ecs_task_family_name="covalent-fargate-tasks",
ecs_task_execution_role_name="ecsTaskExecutionRole",
ecs_task_role_name="CovalentFargateTaskRole",
ecs_task_subnet_id="subnet-871545e1",
ecs_task_security_group_id="sg-0043541a",
ecs_task_log_group_name="covalent-fargate-task-logs",
vcpu=1,
memory=2,
poll_freq=10,
)


@ct.electron(executor=executor)
def join_words(a, b):
return ", ".join([a, b])


@ct.electron(executor=executor)
def excitement(a):
return f"{a}!"


@ct.lattice
def simple_workflow(a, b):
phrase = join_words(a, b)
return excitement(phrase)


dispatch_id = ct.dispatch(simple_workflow)("Hello", "World")
result = ct.get_result(dispatch_id, wait=True)

print(result)

During the execution of the workflow, one can navigate to the UI to see the status of the workflow. Once completed, the above script should also output the result:

Hello, World

In order for the above workflow to run successfully, one has to provision the required AWS resources as mentioned in 4. Required AWS Resources.

3. Overview of configuration

The following table shows a list of all input arguments including the required arguments to be supplied when instantiating the executor:

Config ValueIs RequiredDefaultDescription
credentialsNo ~/.aws/credentials The path to the AWS credentials file
profileNodefaultThe AWS profile used for authentication
regionYesus-east-1AWS region to use for client calls to AWS
s3_bucket_nameNocovalent-fargate-task-resourcesThe name of the S3 bucket where objects are stored
ecr_repo_nameNocovalent-fargate-task-imagesThe name of the ECR repository where task images are stored
ecs_cluster_nameNocovalent-fargate-clusterThe name of the ECS cluster on which tasks run
ecs_task_family_nameNococovalent-fargate-tasksThe name of the ECS task family for a user, project, or experiment.
ecs_task_execution_role_nameNoCovalentFargateTaskRoleThe IAM role used by the ECS agent
ecs_task_role_nameNoCovalentFargateTaskRoleThe IAM role used by the container during runtime
ecs_task_subnet_idYesValid subnet ID
ecs_task_security_group_idYesValid security group ID
ecs_task_log_group_nameNocovalent-fargate-task-logsThe name of the CloudWatch log group where container logs are stored
vcpuNo0.25The number of vCPUs available to a task
memoryNo0.5The memory (in GB) available to a task
poll_freqNo10The frequency (in seconds) with which to poll a submitted task
cache_dirNo /tmp/covalent The cache directory used by the executor for storing temporary files

The following snippet shows how users may modify their Covalent configuration to provide the necessary input arguments to the executor:

[executors.ecs]
credentials = "~/.aws/credentials"
profile = "default"
s3_bucket_name = "covalent-fargate-task-resources"
ecr_repo_name = "covalent-fargate-task-images"
ecs_cluster_name = "covalent-fargate-cluster"
ecs_task_family_name = "covalent-fargate-tasks"
ecs_task_execution_role_name = "ecsTaskExecutionRole"
ecs_task_role_name = "CovalentFargateTaskRole"
ecs_task_subnet_id = "<my-subnet-id>"
ecs_task_security_group_id = "<my-security-group-id>"
ecs_task_log_group_name = "covalent-fargate-task-logs"
vcpu = 0.25
memory = 0.5
cache_dir = "/tmp/covalent"
poll_freq = 10

Within a workflow, users can use this executor with the default values configured in the configuration file as follows:

import covalent as ct

@ct.electron(executor="ecs")
def task(x, y):
return x + y

Alternatively, users can customize this executor entirely by providing their own values to its constructor as follows:

import covalent as ct
from covalent.executor import ECSExecutor

ecs_executor = ECSExecutor(
credentials="my_custom_credentials",
profile="my_custom_profile",
s3_bucket_name="my_s3_bucket",
ecr_repo_name="my_ecr_repo",
ecs_cluster_name="my_ecs_cluster",
ecs_task_family_name="my_custom_task_family",
ecs_task_execution_role_name="myCustomTaskExecutionRole",
ecs_task_role_name="myCustomTaskRole",
ecs_task_subnet_id="my-subnet-id",
ecs_task_security_group_id="my-security-group-id",
ecs_task_log_group_name="my-task-log-group",
vcpu=1,
memory=2,
cache_dir="/home/<user>/covalent/cache",
poll_freq=10,
)

@ct.electron(executor=ecs_executor)
def task(x, y):
return x + y

4. Required AWS Resources

This executor uses different AWS services (S3, ECR, ECS, and Fargate) to successfully run a task. In order for the executor to work end-to-end, the following resources need to be configured either with Terraform or manually provisioned on the AWS Dashboard

ResourceConfig NameDescription
IAM Roleecs_task_execution_role_nameThe IAM role used by the ECS agent
IAM Roleecs_task_role_nameThe IAM role used by the container during runtime
S3 Buckets3_bucket_nameThe name of the S3 bucket where objects are stored
ECR repositoryecr_repo_nameThe name of the ECR repository where task images are stored
ECS Clusterecs_cluster_nameThe name of the ECS cluster on which tasks run
ECS Task Familyecs_task_family_nameThe name of the task family that specifies container information for a user, project, or experiment
VPC Subnetecs_task_subnet_idThe ID of the subnet where instances are created
Security groupecs_task_security_group_idThe ID of the security group for task instances
Cloudwatch log groupecs_task_log_group_nameThe name of the CloudWatch log group where container logs are stored
CPUvCPUThe number of vCPUs available to a task
MemorymemoryThe memory (in GB) available to a task

The following IAM roles and policies must be properly configured so that the executor has all the necessary permissions to interact with the different AWS services:

  1. ecs_task_execution_role_name is the IAM role used by the ECS agent
  2. ecs_task_role_name is the IAM role used by the container during runtime

If omitted, these IAM role names default to ecsTaskExecutionRole and CovalentFargateTaskRole, respectively. The IAM policy attached to the ecsTaskExecutionRole is the following:

ECS Task Execution IAM Policy

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}

These policies allow the service to download container images from ECR so that the tasks can be executed on an ECS cluster. The policy attached to the CovalentFargateTaskRole is as follows

AWS Fargate Task Role IAM Policy

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "braket:*",
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::covalent-fargate-task-resources/*",
"arn:aws:s3:::covalent-fargate-task-resources"
]
}
]

Users can provide their custom IAM roles/policies as long as they respect the permissions listed in the above documents. For more information on how to create IAM roles and attach policies in AWS, refer to IAM roles.

The executor also requires a proper networking setup so that the containers can be properly launched into their respective subnets. The executor requires that the user provide a subnet ID and a security group ID prior to using the executor in a workflow.

The executor uses Docker to build container images with the task function code baked into the image. The resulting image is pushed into the elastic container registry provided by the user. Following this, an ECS task definition using the user provided arguments is registered and the corresponding task container is launched. The output from the task is uploaded to the S3 bucket provided by the user and parsed to obtain the result object. In order for the executor to properly run and build images, users must have Docker installed and properly configured on their machines.

AWS Lambda Executor

With this executor, users can execute tasks (electrons) or entire lattices using the AWS Lambda serverless compute service. It is appropriate to use this plugin for electrons that are expected to be short lived, low in compute intensity. This plugin can also be used for workflows with a high number of electrons that are embarassingly parallel (fully independent of each other).

The following AWS resources are required by this executor

  • Container based AWS Lambda function
  • AWS S3 bucket for caching objects
  • IAM role for Lambda
  • ECR container registry for storing docker images

1. Installation

To use this plugin with Covalent, simply install it using pip :

    pip install covalent-awslambda-plugin

Note

Due to the isolated nature of AWS Lambda, the packages available on that environment are limited. This means that only the modules that come with python out-of-the-box are accessible to your function. Deps are also limited in a similar fashion. However, AWS does provide a workaround for pip package installations: https://aws.amazon.com/premiumsupport/knowledge-center/lambda-python-package-compatible/.

2. Usage Example

This is an example of how a workflow can be constructed to use the AWS Lambda executor. In the example, we join two words to form a phrase and return an excited phrase.

import covalent as ct
from covalent.executor import AWSLambdaExecutor

executor = AWSLambdaExecutor(
function_name = "my-lambda-function"
s3_bucket_name="covalent-lambda-job-resources"
)

@ct.electron(executor=executor)
def join_words(a, b):
return ",".join([a, b])

@ct.electron(executor=executor)
def excitement(a):
return f"{a}!"

@ct.lattice
def simple_workflow(a, b):
phrase = join_words(a, b)
return excitement(phrase)


dispatch_id = ct.dispatch(simple_workflow)("Hello", "World")
result = ct.get_result(dispatch_id, wait=True)

print(result)

During the execution of the workflow, one can navigate to the UI to see the status of the workflow. Once completed, the above script should also output the result:

    Hello, World!

In order for the above workflow to run successfully, one has to provision the required AWS resources as mentioned in 4. Required AWS Resources.

Note

Users may encounter failures with dispatching workflows on MacOS due to errors with importing the psutil module. This is a known issue and will be addressed in a future sprint.

3. Overview of configuration

The following table shows a list of all input arguments including the required arguments to be supplied when instantiating the executor:

Config ValueIs RequiredDefaultDescription
function_nameYes-Name of the AWS lambda function to be used at runtime
s3_bucket_nameYes-Name of an AWS S3 bucket that the executor must use to cache object files
credentials_fileNo~/.aws/credentialsThe path to your AWS credentials file
profileNodefaultAWS profile used for authentication
poll_freqNo5Time interval between successive polls to the lambda function
cache_dirNo~/.cache/covalentPath on the local file system to a cache
timeoutNo900Duration in seconds to keep polling the task for results/exceptions raised

The following snippet shows how users may modify their Covalent configuration to provide the necessary input arguments to the executor:

[executors.awslambda]
function_name = "my-lambda-function"
s3_bucket_name = "covalent-lambda-job-resources"
credentials_file = "/home/<user>/.aws/credentials"
profile = "default"
region = "us-east-1"
cache_dir = "/home/<user>/.cache/covalent"
poll_freq = 5
timeout = 60

Within a workflow, users can use this executor with the default values configured in the configuration file as follows:

import covalent as ct

@ct.electron(executor="awslambda")
def task(x, y):
return x + y

Alternatively, users can customize this executor entirely by providing their own values to its constructor as follows:

import covalent as ct
from covalent.executor import AWSLambdaExecutor

lambda_executor = AWSLambdaExecutor(
function_name = "my-lambda-function"
s3_bucket_name="my_s3_bucket",
credentials_file="my_custom_credentials",
profile="custom_profile",
region="us-east-1",
cache_dir="/home/<user>/covalent/cache",
poll_freq=5,
timeout=60
)

@ct.electron(executor=lambda_executor)
def task(x, y):
return x + y

4. Required AWS Resources

In order for the executor to work end-to-end, the following resources need to be provisioned apriori.

ResourceConfig NameDescription
IAM Rolelambda_role_nameThe IAM role this lambda will assume during execution of your tasks
S3 Buckets3_bucket_nameName of an AWS S3 bucket that the executor can use to store temporary files
AWS Lambda functionfunction_nameName of the AWS lambda function created in AWS
AWS Elastic Container Registry (ECR)-The container registry that contains the docker images used by the lambda function to execute tasks

The following JSON policy document shows the necessary IAM permissions required for the executor to properly run tasks using the AWS Lambda compute service:

IAM Policy

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*",
"s3-object-lambda:*"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Effect": "Allow",
"Action": [
"cloudformation:DescribeStacks",
"cloudformation:ListStackResources",
"cloudwatch:ListMetrics",
"cloudwatch:GetMetricData",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"kms:ListAliases",
"iam:GetPolicy",
"iam:GetPolicyVersion",
"iam:GetRole",
"iam:GetRolePolicy",
"iam:ListAttachedRolePolicies",
"iam:ListRolePolicies",
"iam:ListRoles",
"lambda:*",
"logs:DescribeLogGroups",
"states:DescribeStateMachine",
"states:ListStateMachines",
"tag:GetResources",
"xray:GetTraceSummaries",
"xray:BatchGetTraces"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "*",
"Condition": {
"StringEquals": {
"iam:PassedToService": "lambda.amazonaws.com"
}
}
},
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogStreams",
"logs:GetLogEvents",
"logs:FilterLogEvents"
],
"Resource": "arn:aws:logs:*:*:log-group:/aws/lambda/*"
}
]
}

where <bucket-name> is the name of an S3 bucket to be used by the executor to store temporary files generated during task execution. The lambda function interacts with the S3 bucket as well as with the AWS Cloudwatch service to route any log messages. Due to this, the lambda function must have the necessary IAM permissions in order to do so. Users must provision an IAM role that has the AWSLambdaExecute policy attached to it. The policy document is summarized here for convenience:

Covalent Lamda Execution Role Policy

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::*"
}
]
}

Users can use the following Terraform snippet as a starting point to spin up the required resources


provider aws {}

resource aws_s3_bucket bucket {
bucket = "my-s3-bucket"
}

resource aws_iam_role lambda_iam {
name = var.aws_lambda_iam_role_name
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Sid = ""
Principal = {
Service = "lambda.amazonaws.com"
}
},
]
})
managed_policy_arns = [ "arn:aws:iam::aws:policy/AWSLambdaExecute" ]
}

resource aws_ecr_repository lambda_ecr {
name = "lambda_container_registry"
}

resource aws_lambda_function lambda {
function_name = "my-lambda-function"
role = aws_iam_role.lambda_iam.arn
packge_type = "Image"
timeout = <timeout value in seconds, max 900 (15 minutes), defaults to 3>
memory_size = <Max memory in MB that the Lambda is expected to use, defaults to 128>
image_uri = aws_ecr_repository.lambda_ecr.repository_url
}

For more information on how to create IAM roles and attach policies in AWS, refer to IAM roles. For more information on AWS S3, refer to AWS S3.

Note

The lambda function created requires a docker image to execute the any tasks required by it. We distribute ready to use AWS Lambda executor docker images that user’s can pull and push to their private ECR registries before dispatching workflows.

The base docker image can be obtained as follows

docker pull public.ecr.aws/covalent/covalent-lambda-executor:stable

Once the image has been obtained, user’s can tag it with their registry information and upload to ECR as follows

aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws_account_id>.dkr.ecr.<region>.amazonaws.com
docker tag public.ecr.aws/covalent/covalent-lambda-executor:stable <aws_account_id>.dkr.ecr.<region>.amazonaws.com/<my-repository>:tag
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/<my-repository>:tag

5. Custom Docker images

As mentioned earlier, the AWS Lambda executor uses a docker image to execute an electron from a workflow. We distribute AWS Lambda executor base docker images that contain just the essential dependencies such as covalent and covalent-aws-plugins. However if the electron to be executed using the Lambda executor depends on Python packages that are not present in the base image by default, users will have to a build custom images prior to running their Covalent workflows using the AWS Lambda executor. In this section we cover the necessary steps required to extend the base executor image by installing additional Python packages and pushing the derived image to a private elastic container registry (ECR)

Note

Using PipDeps as described in the Dependencies section with the AWS Lambda executor is currently not supported as it modifies the execution environment of the lambda function at runtime. As per AWS best practices for Lambda it is recommended to ship the lambda function as a self-contained object that has all of its dependencies in a deployment package/container image as described in detail here

All of our base AWS executor images are available in the AWS public registries and can be downloaded locally for consumption as described here. For instance the stable AWS Lambda executor image can be downloaded from public ECR as follows

aws ecr-public get-login-password --region <aws-region> | docker login --username AWS --password-stdin public.ecr.aws
docker pull public.ecr.aws/covalent/covalent-lambda-executor:stable

Note

Executor images with the latest tag are also routinely pushed to the same registry. However, we strongly recommended using the stable tag when running executing workflows usin the AWS Lambda executor. The <aws-region> is a placeholder for the actual AWS region to be used by the user

Once the lambda base executor image has been downloaded, users can build upon that image by installing all the Python packages required by their tasks. The base executor uses a build time argument named LAMBDA_TASK_ROOT to set the install path of all python packages to /var/task inside the image. When extending the base image by installing additional python packages, it is recommended to install them to the same location so that they get resolved properly during runtime. Following is a simple example of how users can extend the AWS lambda base image by creating their own Dockerfile and installting additional packages such as numpy, pandas and scipy.


#Dockerfile

FROM public.ecr.aws/covalent/covalent-lambda-executor:stable as base

RUN pip install --target ${LAMBDA_TASK_ROOT} numpy pandas scipy

Warning

Do not override the entrypoint of the base image in the derived image when installing new packages. The docker ENTRYPOINT of the base image is what that gets trigged when AWS invokes your lambda function to execute the workflow electron

Once the Dockerfile has been created the derived image can be built as follows

docker build -f Dockerfile -t my-custom-lambda-executor:latest

After a successful build of the derived image, it needs to be uploaded to ECR so that it can be consumed by a lambda function when triggered by Covalent. As as first step, it is required to create an elastic container registry to hold the dervied executor images. This can be easily done by using the AWS CLI tool as follows

aws ecr create-repository --region <aws-region> --repository-name covalent/my-custom-lambda-executor

To upload the derived image to this registry, we would need to tag our local image as per the AWS guide and push the image to the registry as described here. To push an image, first one needs to authenticate with AWS and login their docker client

aws ecr get-login-password --region <aws-region> | docker login --username AWS --password-stdin <aws-account-id>.dkr.ecr.region.amazonaws.com

Once the login is successful, the local image needs to be re-tagged with the ECR repository information. If the image tag is omitted, latest is applied by default. In the following code block we show how to tag the derived image my-custom-lambda-executor:latest with the ECR information so that it can be uploaded successfully

docker tag my-custom-lambda-executor:latest <aws-account-id>.dkr.ecr.<aws-region>.amazonaws.com/my-custom-lambda-executor:latest

Note

<aws-account-id> and <aws-region> are placeholders for the actual AWS account ID and region to be used by the users

Once the derived image has been built and pushed to ECR, users need to create a Lambda function or update an existing one to use the new derived image instead of the base image executor image at runtime. A new AWS Lambda function can be quite easily created using the AWS Lambda CLI create-function command as follows

    aws lambda create-function --function-name "my-covalent-lambda-function" --region <aws-region> \
--package-type Image \
--code ImageUri=<aws-account-id>.dkr.ecr.<aws-region>.amazonaws.com/my-custom-lambda-executor:latest \
--role <Lambda executor role ARN> \
--memory-size 512 \
--timeout 900

The above CLI command will register a new AWS lambda function that will use the user’s custom derived image my-custom-lambda-executor:latest with a memory size of 512 MB and a timeout values of 900 seconds. The role argument is used to specify the ARN of the IAM role the AWS Lambda can assume during execution. The necessary permissions for the IAM role have been provided in Required AWS resources section. More details about creating and updating AWS lambda functions can be found here.

Azure Batch executor

Covalent Azure Batch executor is an interface between Covalent and Microsoft Azure Batch. This executor allows execution of Covalent tasks on Azure’s Batch service.

The batch executor is well suited for compute/memory intensive tasks since the resource pool of compute virtual machines can be scaled accordingly. Furthermore, Azure Batch allows running tasks in parallel on multiple virtual machines and their scheduling engine manages execution of the tasks.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-azurebatch-plugin

In this example, we train a Support Vector Machine (SVM) using an instance of the Azure Batch executor. The train_svm electron is submitted as a batch job in an existing Azure Batch Compute environment. Note that we also require DepsPip in order to install the python package dependencies before executing the electron in the batch environment.

from numpy.random import permutation
from sklearn import svm, datasets
import covalent as ct

from covalent.executor import AzureBatchExecutor

deps_pip = ct.DepsPip(
packages=["numpy==1.22.4", "scikit-learn==1.1.2"]
)

executor = AzureBatchExecutor(
tenant_id="tenant-id",
client_id="client-id",
client_secret="client-secret",
batch_account_url="https://covalent.eastus.batch.azure.com",
batch_account_domain="batch.core.windows.net",
storage_account_name="covalentbatch",
storage_account_domain="blob.core.windows.net",
pool_id="covalent-pool",
retries=3,
time_limit=300,
cache_dir="/tmp/covalent",
poll_freq=10
)

# Use executor plugin to train our SVM model
@ct.electron(
executor=executor,
deps_pip=deps_pip
)
def train_svm(data, C, gamma):
X, y = data
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X[90:], y[90:])
return clf

@ct.electron
def load_data():
iris = datasets.load_iris()
perm = permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
return iris.data, iris.target

@ct.electron
def score_svm(data, clf):
X_test, y_test = data
return clf.score(
X_test[:90],y_test[:90]
)

@ct.lattice
def run_experiment(C=1.0, gamma=0.7):
data = load_data()
clf = train_svm(
data=data,
C=C,
gamma=gamma
)
score = score_svm(
data=data,
clf=clf
)
return score

# Dispatch the workflow.
dispatch_id = ct.dispatch(run_experiment)(
C=1.0,
gamma=0.7
)

# Wait for our result and get result value
result = ct.get_result(dispatch_id, wait=True).result

print(result)

During the execution of the workflow, one can navigate to the UI to see the status of the workflow. Once completed, the above script should also output a value with the score of our model.

0.8666666666666667
Config KeyRequiredDefaultDescription
tenant_idYesNoneAzure tenant ID
client_idYesNoneAzure client IDcalls
client_secretYesNoneAzure client secret
batch_account_urlYesNoneAzure Batch account URL
batch_account_domainNobatch.core.windows.netAzure Batch account domain
storage_account_nameYesNoneAzure Storage account name
storage_account_domainNoblob.core.windows.netAzure Storage account domain
pool_idYesNoneAzure Batch pool ID
retriesNo3Number of retries for Azure Batch job
time_limitNo300Time limit for Azure Batch job
cache_dirNo/tmp/covalentDirectory to store cached files
poll_freqNo10Polling frequency for Azure Batch job
  1. Configuration options can be passed in as constructor keys to the executor class ct.executor.AzureBatchExecutor

  2. By modifying the covalent configuration file under the section [executors.azurebatch]

The following shows an example of how a user might modify their covalent configuration file to support this plugin:

[executors.azurebatch]
tenant_id="tenant-id",
client_id="client-id",
client_secret="client-secret",
batch_account_url="https://covalent.eastus.batch.azure.com",
batch_account_domain="batch.core.windows.net",
storage_account_name="covalentbatch",
storage_account_domain="blob.core.windows.net",
pool_id="covalent-pool",
retries=5,
time_limit=500,
...

In order to use this plugin, the following Azure resources need to be provisioned first. These resources can be created using the Azure Portal or the Azure CLI.

ResourceIs RequiredConfig KeyDescription
Batch AccountYesbatch_account_urlA batch account is required to submit jobs to Azure Batch. The URL can be found under the Account endpoint field in the Batch account. Furthermore, ensure that https:// is prepended to the value.
Storage AccountYesstorage_account_nameStorage account must be created with blob service enabled in order for covalent to store essential files that are needed during execution.
Resource GroupYesN/AThe resource group is a logical grouping of Azure resources that can be managed as one entity in terms of lifecycle and security.
Container RegistryYesN/AContainer registry is required to store the containers that are used to run Batch jobs.
Virtual NetworkNoN/AAzure Virtual Network is used by resources to securely communicate with each other.
Pool IDYespool_idA pool is a collection of compute nodes that are managed together. The pool ID is the name of the pool that will be used to execute the jobs.

More information on authentication with service principals and necessary permissions for this executor can be found here.

For more information on error handling and detection in Batch, refer to the Microsoft Azure documentation. Furthermore, information on best practices can be found here.

Google Batch Executor

Covalent Google Batch executor is an interface between Covalent and Google Cloud Platform’s Batch compute service. This executor allows execution of Covalent tasks on Google Batch compute service.

This batch executor is well suited for tasks with high compute/memory requirements. The compute resources required can be very easily configured/specified in the executor’s configuration. Google Batch scales really well thus allowing users to queue and execute multiple tasks concurrently on their resources efficiently. Google’s Batch job scheduler manages the complexity of allocating the resources needed by the task and de-allocating them once the job has finished.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-gpcbatch-plugin

Here we present an example on how a user can use the GCP Batch executor plugin in their Covalent workflows. In this example we train a simple SVM (support vector machine) model using the Google Batch executor. This executor is quite minimal in terms of the required cloud resoures that need to be provisioned prior to first use. The Google Batch executor needs the following cloud resources pre-configured

  • A Google storage bucket

  • Cloud artifact registry for Docker images

  • A service account with the following permissions

    • roles/batch.agentReporter
    • roles/batch.agentReporter
    • roles/logging.viewer
    • roles/artifactregistry.reader
    • roles/storage.objectCreator
    • roles/storage.objectViewer

Note

Details about Google services accounts and how to use them properly can be found here

from numpy.random import permutation
from sklearn import svm, datasets
import covalent as ct

deps_pip = ct.DepsPip(
packages=["numpy==1.23.2", "scikit-learn==1.1.2"]
)

executor = ct.executor.GCPBatchExecutor(
bucket_name = "my-gcp-bucket",
project_id = "my-gcp-project-id",
container_image_uri = "my-executor-container-image-uri",
service_account_email = "my-service-account-email",
vcpu = 2, # Number of vCPUs to allocate
memory = 512, # Memory in MB to allocate
time_limit = 300, # Time limit of job in seconds
poll_freq = 3 # Number of seconds to pause before polling for the job's status
)

# Use executor plugin to train our SVM model.
@ct.electron(
executor=executor,
deps_pip=deps_pip
)
def train_svm(data, C, gamma):
X, y = data
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X[90:], y[90:])
return clf

@ct.electron
def load_data():
iris = datasets.load_iris()
perm = permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
return iris.data, iris.target

@ct.electron
def score_svm(data, clf):
X_test, y_test = data
return clf.score(
X_test[:90],
y_test[:90]
)

@ct.lattice
def run_experiment(C=1.0, gamma=0.7):
data = load_data()
clf = train_svm(
data=data,
C=C,
gamma=gamma
)
score = score_svm(
data=data,
clf=clf
)
return score

# Dispatch the workflow
dispatch_id = ct.dispatch(run_experiment)(
C=1.0,
gamma=0.7
)

# Wait for our result and get result value
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

print(result)

During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the score of our model.

0.8666666666666667
Config KeyIs RequiredDefaultDescription
project_idYesNoneGoogle cloud project ID
regionNous-east1Google cloud region to use to for submitting batch jobs
bucket_nameYesNoneName of the Google storage bucket to use for storing temporary objects
container_image_uriYesNoneGCP Batch executor base docker image uri
service_account_emailYesNoneGoogle service account email address that is to be used by the batch job when interacting with the resources
vcpusNo2Number of vCPUs needed for the task.
memoryNo256Memory (in MB) needed by the task.
retriesNo3Number of times a job is retried if it fails.
time_limitNo300Time limit (in seconds) after which jobs are killed.
poll_freqNo5Frequency (in seconds) with which to poll a submitted task.
cache_dirNo/tmp/covalentCache directory used by this executor for temporary files.

This plugin can be configured in one of two ways:

  1. Configuration options can be passed in as constructor keys to the executor class ct.executor.GCPBatchExecutor

  2. By modifying the covalent configuration file under the section [executors.gcpbatch]

[executors.gcpbatch]
bucket_name = <my-gcp-bucket-name>
project_id = <my-gcp-project-id>
container_image_uri = <my-base-executor-image-uri>
service_account_email = <my-service-account-email>
region = <google region for batch>
vcpus = 2 # number of vcpus needed by the job
memory = 256 # memory in MB required by the job
retries = 3 # number of times to retry the job if it fails
time_limit = 300 # time limit in seconds after which the job is to be considered failed
poll_freq = 3 # Frequency in seconds with which to poll the job for the result
cache_dir = "/tmp" # Path on file system to store temporary objects

4. Required Cloud Resources

In order to successfully execute tasks using the Google Batch executor, some cloud resources need to be provisioned apriori.

  • Google storage bucket

    The executor uses a storage bucket to store/cache exception/result objects that get generated during execution

  • Google Docker artifact registry

    The executor submits a container job whose image is pulled from the provided container_image_uri argument of the executor

  • Service account

    Keeping good security practices in mind, the jobs are executed using a service account that only has the necessary permissions attached to it that are required for the job to finish.

Users can free to provision these resources as they see fit or they can use Covalent to provision these for them. Covalent CLI can be used to deploy the required cloud resources. Covalent behind the scenes uses Terraform to provision the cloud resources. The terraform HCL scripts can be found in the plugin’s Github repository here.

To run the scripts manually, users must first authenticate with Google cloud via their CLI

  gcloud auth login
Once the user has authenticated, the infrastructure can be stood up by simply apply the Terraform scripts i.e.
```
terraform plan -out tf.plan
terrafrom apply tf.plan

Note

For first time deployment, the terraform provides must be initialized properly via terraform init

The HCL scripts also build the base executor docker image and upload it to the artficat registry after it gets created. This way the user need not build and push an image separately as the process is fully automated via Covalent.

Slurm Executor

This executor plugin interfaces Covalent with HPC systems managed by Slurm. For workflows to be deployable, users must have SSH access to the Slurm login node, writable storage space on the remote filesystem, and permissions to submit jobs to Slurm.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-slurm-plugin

The following shows an example of a Covalent configuration that is modified to support Slurm:

[executors.slurm]
username = "user"
address = "login.cluster.org"
ssh_key_file = "/home/user/.ssh/id_rsa"
remote_workdir = "/scratch/user"
cache_dir = "/tmp/covalent"

[executors.slurm.options]
nodes = 1
ntasks = 4
cpus-per-task = 8
constraint = "gpu"
gpus = 4
qos = "regular"

[executors.slurm.srun_options]
cpu_bind = "cores"
gpus = 4
gpu-bind = "single:1"

The first stanza describes default connection parameters for a user who can connect to the Slurm login node using, for example:

ssh -i /home/user/.ssh/id_rsa user@login.cluster.org

The second and third stanzas describe default parameters for #SBATCH directives and default parameters passed directly to srun, respectively.

This example generates a script containing the following preamble:

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --cpus-per-task=8
#SBATCH --constraint=gpu
#SBATCH --gpus=4
#SBATCH --qos=regular

and subsequent workflow submission with:

srun --cpu_bind=cores --gpus=4 --gpu-bind=single:1

To use the configuration settings, an electron’s executor must be specified with a string argument, in this case:

import covalent as ct

@ct.electron(executor="slurm")
def my_task(x, y):
return x + y

Alternatively, passing a SlurmExecutor instance enables custom behavior scoped to specific tasks. Here, the executor’s prerun_commands and postrun_commands parameters can be used to list shell commands to be executed before and after submitting the workflow. These may include any additional srun commands apart from workflow submission. Commands can also be nested inside the submission call to srun by using the srun_append parameter.

More complex jobs can be crafted by using these optional parameters. For example, the instance below runs a job that accesses CPU and GPU resources on a single node, while profiling GPU usage via nsys and issuing complementary commands that pause/resume the central hardware counter.

executor = ct.executor.SlurmExecutor(
remote_workdir="/scratch/user/experiment1",
options={
"qos": "regular",
"time": "01:30:00",
"nodes": 1,
"constraint": "gpu",
},
prerun_commands=[
"module load package/1.2.3",
"srun --ntasks-per-node 1 dcgmi profile --pause"
],
srun_options={
"n": 4,
"c": 8,
"cpu-bind": "cores",
"G": 4,
"gpu-bind": "single:1"
}
srun_append="nsys profile --stats=true -t cuda --gpu-metrics-device=all",
postrun_commands=[
"srun --ntasks-per-node 1 dcgmi profile --resume",
]
)

@ct.electron(executor=executor)
def my_custom_task(x, y):
return x + y

Here the corresponding submit script contains the following commands:

module load package/1.2.3
srun --ntasks-per-node 1 dcgmi profile --pause

srun -n 4 -c 8 --cpu-bind=cores -G 4 --gpu-bind=single:1 \
nsys profile --stats=true -t cuda --gpu-metrics-device=all \
python /scratch/user/experiment1/workflow_script.py

srun --ntasks-per-node 1 dcgmi profile --resume

SSH Executor

Executing tasks (electrons) via SSH in remote machine. This executor plugin interfaces Covalent with other machines accessible to the user over SSH. It is appropriate to use this plugin to distribute tasks to one or more compute backends which are not controlled by a cluster management system, such as computers on a LAN, or even a collection of small-form-factor Linux-based devices such as Raspberry Pis, NVIDIA Jetsons, or Xeon Phi co-processors.

To use this plugin with Covalent, simply install it using pip:

pip install covalent-ssh-plugin

The following shows an example of how a user might modify their Covalent configuration to support this plugin:

[executors.ssh]
username = "user"
hostname = "host.hostname.org"
remote_dir = "/home/user/.cache/covalent"
ssh_key_file = "/home/user/.ssh/id_rsa"

This setup assumes the user has the ability to connect to the remote machine using ssh -i /home/user/.ssh/id_rsa user@host.hostname.org and has write-permissions on the remote directory /home/user/.cache/covalent (if it exists) or the closest parent directory (if it does not).

Within a workflow, users can decorate electrons using the default settings:

import covalent as ct

@ct.electron(executor="ssh")
def my_task():
import socket
return socket.gethostname()

or use a class object to customize behavior within particular tasks:

executor = ct.executor.SSHExecutor(
username="user",
hostname="host2.hostname.org",
remote_dir="/tmp/covalent",
ssh_key_file="/home/user/.ssh/host2/id_rsa",
)

@ct.electron(executor=executor)
def my_custom_task(x, y):
return x + y

Quantum Executors

Qiskit Runtime Executor

This quantum executor provides efficient access to IBM Quantum backends by using runtime sessions for submitting jobs. QiskitExecutor uses asyncio for scalable parallelization.

1. Installation

The Qiskit Runtime executor is included with Covalent. No additional installation is required.

2. Usage Example

Typical usage involves specifying a runtime primitive via the device argument and specifying an IBM backend via the backend argument. An access token from IBM Quantum can be provided explicitly as ibmqx_token or in the Covalent configuration file.

The following example shows several QiskitExecutor instances being utilized as a Quantum Cluster.

import covalent as ct
import pennylane as qml

# Default local qiskit executor.
qiskit_local = ct.executor.QiskitExecutor()

# Runtime qiskit executor that uses the "ibmq_qasm_simulator" backend.
qiskit_qasm = ct.executor.QiskitExecutor(
device="sampler",
backend="ibmq_qasm_simulator",
ibmqx_token="<token>", # required if not in config file
)

# Runtime qiskit executor that uses the "ibmq_lima" QPU.
qiskit_lima = ct.executor.QiskitExecutor(
device="sampler",
backend="ibmq_lima",
ibmqx_token="<token>",
instance="my-hub/my-group/my-project",

# Backend settings (optional)
options={
"optimization_level": 2,
"resilience_level": 1,
# ...
}
)

# Create quantum electron that uses a cluster of 3 qiskit executors.
@ct.qelectron(executors=[qiskit_local, qiskit_qasm, qiskit_lima])
@qml.qnode(qml.device("default.qubit", wires=2, shots=1024), interface="tf")
def circuit(x):
qml.IQPEmbedding(features=x, wires=[0, 1])
qml.Hadamard(wires=1)
return qml.probs(wires=range(2))

Once converted to a QElectron, the circuit can be called normally or asynchronously via circuit.run_later(). Since the example uses a quantum cluster with the default "cyclic" selector, circuit calls will repeatedly cycle through executors in order.

A synchronous example is shown below.

>>> circuit([0.6, -1.57])  # local

tf.Tensor([0.0546875 0.42773438 0.46777344 0.04980469], shape=(4,), dtype=float64)

>>> circuit([0.6, -1.57]) # ibmq_qasm_simulator

tf.Tensor([0.04589844 0.45507812 0.45898438 0.04003906], shape=(4,), dtype=float64)

>>> circuit([0.6, -1.57]) # ibmq_lima

tf.Tensor([0.04199219 0.44628906 0.46679688 0.04492188], shape=(4,), dtype=float64)

>>> circuit([0.6, -1.57]) # local (again)

tf.Tensor([0.04394531 0.4609375 0.43945312 0.05566406], shape=(4,), dtype=float64)

If instead doing this asynchronously:

>>> x = [0.6, -1.57]

>>> # Queue jobs for all four circuit calls simultaneously on IBM Quantum.
>>> # Uses same executor order as above (local, qasm, lima, local, ...).
>>> futs = [circuit.run_later(x) for _ in range(4)]

>>> # Wait for all circuits to finish.
>>> [fut.result() for fut in futs]

[tf.Tensor([0.0546875 0.42773438 0.46777344 0.04980469], shape=(4,), dtype=float64),
tf.Tensor([0.04589844 0.45507812 0.45898438 0.04003906], shape=(4,), dtype=float64),
tf.Tensor([0.04199219 0.44628906 0.46679688 0.04492188], shape=(4,), dtype=float64),
tf.Tensor([0.04394531 0.4609375 0.43945312 0.05566406], shape=(4,), dtype=float64)]

3. Overview of Configuration

The QiskitExecutor configuration is found under [qelectron.QiskitExecutor] in the Covalent configuration file.

ConfigIs RequiredDefaultDescription
deviceYeslocal_samplerThe qiskit (e.g. "local_sampler") or qiskit runtime (e.g. "sampler") primitive used for running circuits on an IBM backend.
backendYesibm_qasm_simulatorThe name of an IBM Quantum system or simulator.
ibmqx_tokenYes/NoAn access token obtained from IBM Quantum. Required for non-local execution.
hubNoibm-qHub name for IBM Quantum.
groupNoopenGroup name for IBM Quantum.
projectNomainProject name for IBM Quantum.

The following backend settings are also set by default under [qelectron.QiskitExecutor.options]. These represent maximum optimization/resilience levels for the Sampler primitive. Users can append additional settings to this configuration or specify them directly when instantiating a QiskitExecutor. See the Qiskit Runtime Options page for a complete list of valid fields.

ConfigIs RequiredDefaultDescription
optimization_levelNo3How much optimization to perform on the circuits.
resilience_levelNo1How much resilience to build against errors.

4. Required Cloud Resources

In order to access IBM backends, users must acquire an access token from IBM Quantum. This can be done by creating a free account on the IBM Quantum Experience.

pydantic model covalent.executor.QiskitExecutor [source]

A quantum executor that lets the user run circuits on IBM Quantum backends, using runtime sessions and Qiskit primitives. The attributes device, backend, ibmqx_token, hub, group, and project are taken from the Covalent configuration file by default, if available.

Keyword Arguments:

  • device - The Qiskit primitive used to execute circuits. Valid values are "sampler" and "local_sampler". The value "sampler" corresponds to the Qiskit Runtime Sampler primitive. The value "local_sampler" corresponds to the Qiskit Sampler primitive, which is entirely local.
  • backend - The name of the IBM Quantum backend device. Defaults to "ibmq_qasm_simulator".
  • ibmqx_token- The IBM Quantum API token.
  • hub - An IBM Quantum hub name. Defaults to "ibm-q"
  • group - An IBM Quantum group name. Defaults to "open"
  • project - An IBM Quantum project name. Defaults to "main"
  • shots - The number of shots to run per circuit. Defaults to 1024.
  • single_job - Indicates whether or not all circuits are submitted to a single job or as separate jobs. Defaults to True
  • max_time - An optional time limit for circuit execution on the IBM Quantum backend. Defaults to None, i.e. no time limit.
  • local_transpile - Indicates whether or not to transpile circuits before submitting to IBM Quantum. Defaults to False
  • ibmqx_url - An optional URL for the Qiskit Runtime API.
  • channel -An optional channel for the Qiskit Runtime API. Defaults to "ibm_quantum"
  • instance - An alternate means to specify hub, group, and project, formatted as "my-hub/my-group/my-project"
  • cloud_instance- Same as instance but for the case channel="ibm_cloud"
  • options- A dictionary of options to pass to Qiskit Runtime. See qiskit_ibm_runtime.options.Options for valid fields.
Show JSON Schema
{
"title": "QiskitExecutor",
"description": "A quantum executor that lets the user run circuits on IBM Quantum backends,\nusing runtime sessions and Qiskit primitives. The attributes :code:`device`,\n:code:`backend`, :code:`ibmqx_token`, :code:`hub`, :code:`group`, and\n:code:`project` are taken from the Covalent configuration file by default, if\navailable.\n\nKeyword Args:\n device: The Qiskit primitive used to execute circuits. Valid values are\n :code:`\"sampler\"` and :code:`\"local_sampler\"`. The value :code:`\"sampler\"`\n corresponds to the Qiskit Runtime :code:`Sampler` primitive. The value\n :code:`\"local_sampler\"` corresponds to the Qiskit :code:`Sampler` primitive,\n which is entirely local.\n backend: The name of the IBM Quantum backend device. Defaults to\n :code:`\"ibmq_qasm_simulator\"`.\n ibmqx_token: The IBM Quantum API token.\n hub: An IBM Quantum hub name. Defaults to :code:`\"ibm-q\"`.\n group: An IBM Quantum group name. Defaults to :code:`\"open\"`.\n project: An IBM Quantum project name. Defaults to :code:`\"main\"`.\n shots: The number of shots to run per circuit. Defaults to 1024.\n single_job: Indicates whether or not all circuits are submitted\n to a single job or as separate jobs. Defaults to :code:`True`.\n max_time: An optional time limit for circuit execution on the IBM Quantum\n backend. Defaults to :code:`None`, i.e. no time limit.\n local_transpile: Indicates whether or not to transpile circuits before\n submitting to IBM Quantum. Defaults to :code:`False`.\n ibmqx_url: An optional URL for the Qiskit Runtime API.\n channel: An optional channel for the Qiskit Runtime API. Defaults to\n :code:`\"ibm_quantum\"`.\n instance: An alternate means to specify :code:`hub`, :code:`group`, and\n :code:`project`, formatted as :code:`\"my-hub/my-group/my-project\"`.\n cloud_instance: Same as :code:`instance` but for the case :code:`channel=\"ibm_cloud\"`.\n options: A dictionary of options to pass to Qiskit Runtime. See\n :code:`qiskit_ibm_runtime.options.Options` for valid fields.",
"type": "object",
"properties": {
"persist_data": {
"title": "Persist Data",
"default": true,
"type": "boolean"
},
"qnode_device_import_path": {
"title": "Qnode Device Import Path",
"type": "array",
"minItems": 2,
"maxItems": 2,
"items": [
{
"type": "string"
},
{
"type": "string"
}
]
},
"qnode_device_shots": {
"title": "Qnode Device Shots",
"type": "integer"
},
"qnode_device_wires": {
"title": "Qnode Device Wires",
"type": "integer"
},
"pennylane_active_return": {
"title": "Pennylane Active Return",
"type": "boolean"
},
"device": {
"title": "Device",
"type": "string"
},
"backend": {
"title": "Backend",
"type": "string"
},
"ibmqx_token": {
"title": "Ibmqx Token",
"type": "string"
},
"hub": {
"title": "Hub",
"type": "string"
},
"group": {
"title": "Group",
"type": "string"
},
"project": {
"title": "Project",
"type": "string"
},
"shots": {
"title": "Shots",
"default": 1024,
"type": "integer"
},
"single_job": {
"title": "Single Job",
"default": false,
"type": "boolean"
},
"local_transpile": {
"title": "Local Transpile",
"default": false,
"type": "boolean"
},
"max_time": {
"title": "Max Time",
"anyOf": [
{
"type": "integer"
},
{
"type": "string"
}
]
},
"ibmqx_url": {
"title": "Ibmqx Url",
"type": "string"
},
"channel": {
"title": "Channel",
"default": "ibm_quantum",
"type": "string"
},
"instance": {
"title": "Instance",
"default": "",
"type": "string"
},
"cloud_instance": {
"title": "Cloud Instance",
"default": "",
"type": "string"
},
"options": {
"title": "Options"
}
}
}

CONFIG

extra: EXTRA = allow

field backend: str [Optional]

Validated by

  • set_name

field channel: str = 'ibm_quantum'

Validated by

  • set_name

field cloud_instance: str = ''

Validated by

  • set_name

field device: str [Optional]

Validated by

  • set_name

field group: str [Optional]

Validated by

  • set_name

field hub: str [Optional]

Validated by

  • set_name

field ibmqx_token: str [Optional]

Validated by

  • set_name

field ibmqx_url: str = None

Validated by

  • set_name

field instance: str = ''

Validated by

  • set_name

field local_transpile: bool = False

Validated by

  • set_name

field max_time: Union[int, str] = None

Validated by

  • set_name

field options:

covalent.experimental.covalent_qelectron.executors.plugins.qiskit_plugin.utils.RuntimeOptions [Optional]

Validated by

  • set_name

field project: str [Optional]

Validated by

  • set_name

field shots: Optional[int] = 1024

Validated by

  • set_name

field single_job: bool = False

Validated by

  • set_name

batch_submit(qscripts_list) [source]

execution_device() [source]

Create a subclasses execution device that ensure correct output typing.

Return Type

  • QubitDevice

async run_all_circuits(tapes, device, result_obj) [source]

Allows multiple circuits to be submitted asynchronously into a single IBM Qiskit Runtime Job.

async run_circuit(tapes, device, result_obj) [source]

Allows a circuit to be submitted asynchronously.

property device_init_kwargs(tapes, device, result_obj)

Keyword arguments to pass to the device constructor.

IBMQ Executor

This quantum executor accesses IBM Quantum backends through Pennylane's "qiskit.ibmq" device. IBMQExecutor introduces thread-based parallelism for circuit execution on the "qiskit.ibmq" device. Note that the more efficient QiskitExecutor is recommended over IBMQExecutor for production use.

1. Installation

The IBMQ executor is included with Covalent. No additional installation is required.

2. Usage Example

Using IBMQExecutor requires specifying an IBM Quantum backend through the backend argument. The ibmqx_token is required if not specified in the configuration (see next section).

import covalent as ct
import pennylane as qml

# IBMQ executor that uses "ibmq_qasm_simulator" (default).
ibmq_qasm = ct.executor.IBMQExecutor()

# IBMQ executor that uses the "ibmq_lima" QPU.
ibmq_lima = ct.executor.IBMQExecutor(
backend="ibmq_lima",
ibmqx_token="<token>",
)

@ct.qelectron(executors=[ibmq_qasm, ibmq_lima])
@qml.qnode(qml.device("default.qubit", wires=2, shots=1024), interface="jax")
def circuit(x):
qml.IQPEmbedding(features=x, wires=[0, 1])
qml.Hadamard(wires=1)
return qml.probs(wires=range(2))

As a QElectron, the circuit can be called either normally or asynchronously using circuit.run_later(). With the default "cyclic" selector, circuit calls will alternate between the executors, [ibmq_qasm, ibmq_lima].

A synchronous example is shown below.

>>> print(circuit([0.5, 0.1]))  # ibmq_qasm_simulator

DeviceArray([0.51660156, 0.00097656, 0.4814453 , 0.00097656], dtype=float32)

>>> print(circuit([0.5, 0.1])) # ibmq_lima

DeviceArray([0.5048828 , 0.00195312, 0.49316406, 0. ], dtype=float32)

>>> print(circuit([0.5, 0.1])) # ibmq_qasm_simulator (again)

DeviceArray([0.5097656 , 0.00292969, 0.4873047 , 0. ], dtype=float32)

Doing this asynchronously:

>>> x = [0.6, -1.57]

>>> # Queue jobs for all three circuit calls simultaneously on IBM Quantum.
>>> # Uses same executor order as above (qasm, lima, qasm, ...).
>>> futs = [circuit.run_later(x) for _ in range(3)]

>>> # Wait for all circuits to finish.
>>> [fut.result() for fut in futs]

[DeviceArray([0.51660156, 0.00097656, 0.4814453 , 0.00097656], dtype=float32),
DeviceArray([0.5048828 , 0.00195312, 0.49316406, 0. ], dtype=float32),
DeviceArray([0.5097656 , 0.00292969, 0.4873047 , 0. ], dtype=float32)]

3. Overview of Configuration

The IBMQExecutor configuration is found under [qelectron.IBMQExecutor] in the Covalent configuration file.

ConfigIs RequiredDefaultDescription
backendYesibm_qasm_simulatorThe name of an IBM Quantum system or simulator.
ibmqx_tokenYes/NoAn access token obtained from IBM Quantum. Required for non-local execution.
hubNoibm-qHub name for IBM Quantum.
groupNoopenGroup name for IBM Quantum.
projectNomainProject name for IBM Quantum.

4. Required Cloud Resources

In order to access IBM backends, users must acquire an access token from IBM Quantum. This can be done by creating a free account on the IBM Quantum Experience.

pydantic model covalent.executor.IBMQExecutor [source]

A quantum executor that uses the Pennylane native "qiskit.ibmq" device to run circuits on IBM Quantum backends. The attributes backend, ibmqx_token, hub, group, and project are taken from the Covalent configuration file by default, if available.

Keyword Arguments:

  • max_jobs - The maximum number of jobs that can be submitted to the backend concurrently. This number corresponds to the number of threads utilized by this executor. Defaults to 20.
  • shots - The number of shots to use for the execution device. Overrides the shots value from the original device if set to None or a positive int. The shots setting from the original device is used by default, when this argument is 0.
  • backend - The name of the IBM Quantum backend device. Defaults to "ibmq_qasm_simulator".
  • ibmqx_token - The IBM Quantum API token.
  • hub: An IBM Quantum hub name. Defaults to "ibm-q"
  • group: An IBM Quantum group name. Defaults to "open".
  • project: An IBM Quantum project name. Defaults to "main".
Show JSON Schema
{
"title": "IBMQExecutor",
"description": "A quantum executor that uses the Pennylane native :code:`\"qiskit.ibmq\"` device to run\ncircuits on IBM Quantum backends. The attributes :code:`backend`, :code:`ibmqx_token`,\n:code:`hub`, :code:`group`, and :code:`project` are taken from the Covalent\nconfiguration file by default, if available.\n\nKeyword Args:\n max_jobs: The maximum number of jobs that can be submitted to the backend\n concurrently. This number corresponds to the number of threads utilized\n by this executor. Defaults to 20.\n shots: The number of shots to use for the execution device. Overrides the\n :code:`shots` value from the original device if set to :code:`None` or\n a positive :code:`int`. The shots setting from the original device is\n is used by default, when this argument is 0.\n backend: The name of the IBM Quantum backend device. Defaults to\n :code:`\"ibmq_qasm_simulator\"`.\n ibmqx_token: The IBM Quantum API token.\n hub: An IBM Quantum hub name. Defaults to :code:`\"ibm-q\"`.\n group: An IBM Quantum group name. Defaults to :code:`\"open\"`.\n project: An IBM Quantum project name. Defaults to :code:`\"main\"`.",
"type": "object",
"properties": {
"persist_data": {
"title": "Persist Data",
"default": true,
"type": "boolean"
},
"qnode_device_import_path": {
"title": "Qnode Device Import Path",
"type": "array",
"minItems": 2,
"maxItems": 2,
"items": [
{
"type": "string"
},
{
"type": "string"
}
]
},
"qnode_device_shots": {
"title": "Qnode Device Shots",
"type": "integer"
},
"qnode_device_wires": {
"title": "Qnode Device Wires",
"type": "integer"
},
"pennylane_active_return": {
"title": "Pennylane Active Return",
"type": "boolean"
},
"device": {
"title": "Device",
"default": "default.qubit",
"type": "string"
},
"num_threads": {
"title": "Num Threads",
"default": 10,
"type": "integer"
},
"max_jobs": {
"title": "Max Jobs",
"default": 20,
"type": "integer"
},
"shots": {
"title": "Shots",
"default": 0,
"type": "integer"
},
"backend": {
"title": "Backend",
"type": "string"
},
"ibmqx_token": {
"title": "Ibmqx Token",
"type": "string"
},
"hub": {
"title": "Hub",
"type": "string"
},
"group": {
"title": "Group",
"type": "string"
},
"project": {
"title": "Project",
"type": "string"
}
}
}

CONFIG

extra: EXTRA = allow

field backend: str [Optional]

Validated by

  • set_name

field group: str [Optional]

Validated by

  • set_name

field hub: str [Optional]

Validated by

  • set_name

field ibmqx_token: str [Optional]

Validated by

  • set_name

field max_jobs: int = 20

Validated by

  • set_name

field project: str [Optional]

Validated by

  • set_name

field shots: int = 20

Validated by

  • set_name

batch_submit(qscripts_list) [source]

AWS Braket Qubit Executor

This quantum executor accesses quantum resources operating under the qubit model as made available through AWS ("braket.aws.qubit").

It utilizes the Pennylane plugin found here. BraketQubitExecutor introduces thread-based parallelism for circuit execution on the "braket.aws.qubit" device.

1. Installation

BraketQubitExecutor is included in Covalent. To use it, however, you will need to install the amazon-braket-pennylane-plugin:

pip install amazon-braket-pennylane-plugin

and have valid AWS credentials as specified here.

2. Usage Example

Using BraketQubitExecutor requires specifying an AWS Quantum backend through the device_arn argument.

# Statevector simulator
sv1 = ct.executor.BraketQubitExecutor(
device_arn="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
shots=1024,
s3_destination_folder=(),
)
# Tensor network simulator
tn1 = ct.executor.BraketQubitExecutor(
device_arn="arn:aws:braket:::device/quantum-simulator/amazon/tn1",
shots=1024,
s3_destination_folder=(),
)

@ct.qelectron(executors=[sv1, tn1])
@qml.qnode(qml.device("default.qubit", wires=2, shots=1000))
def circuit(x):
qml.IQPEmbedding(features=x, wires=[0, 1])
qml.Hadamard(wires=1)
return [qml.expval(qml.PauliZ(0)), qml.expval(qml.PauliZ(1))]

As a QElectron, the circuit can be called either normally or asynchronously using circuit.run_later(). With the default "cyclic" selector, circuit calls will alternate between the executors, [sv1, tn1].

Synchronous example output is below

>>> print(circuit([0.5, 0.1]))  # alternate between sv1 and tn1

[array(0.008), array(0.996)]

and asynchronously:

>>> x = [0.6, -1.57]

>>> # Queue jobs for all three circuit calls simultaneously on AWS Braket.
>>> # Uses same executor order as above (sv1, tn1, ...).
>>> futs = [circuit.run_later(x) for _ in range(3)]

>>> # Wait for all circuits to finish.
>>> [fut.result() for fut in futs]

[[array(-0.02), array(0.01)],
[array(0.014), array(-0.022)],
[array(-0.074), array(0.05)]]

3. Overview of Configuration

The BraketQubitExecutor configuration is found under [qelectron.BraketQubitExecutor] in the Covalent configuration file.

ConfigIs RequiredDefaultDescription
s3_destination_folderNo() an empty tupleThe location of the s3 bucket that simulation data will be stored in. I.e, you can set s3 = ("my-bucket", "my-prefix").

4. Required Cloud Resources

Users must acquire AWS credentials and make them discoverable following the instructions here.


pydantic model covalent.executor.BraketQubitExecutor [source]

The remote Braket executor based on the existing Pennylane Braket qubit device. Usage of this device requires valid AWS credentials as set up following the instructions at https://github.com/aws/amazon-braket-sdk-python#prerequisites.

max_jobs

maximum number of parallel jobs sent by threads on batch_submit.

shots

number of shots used to estimate quantum observables.

device_arn

an alpha-numeric code (arn=Amazon Resource Name) specifying a quantum device.

poll_timeout_seconds

number of seconds before a poll to remote device is considered timed-out.

poll_interval_seconds

number of seconds before a poll to remote device is considered timed-out.

aws_session

An AwsSession object created to manage interactions with AWS services, to be supplied if extra control is desired.

parallel

turn parallel execution on or off.

max_parallel

the maximum number of circuits to be executed in parallel.

max_connections

the maximum number of connections in the Boto3 connection pool.

max_retries

the maximum number of time a job will be re-sent if it failed.

s3_destination_folder

Name of the S3 bucket and folder, specified as a tuple.

run_kwargs

Variable length keyword arguments for braket.devices.Device.run()

Show JSON Schema
{
"title": "BraketQubitExecutor",
"description": "The remote Braket executor based on the existing Pennylane Braket\nqubit device. Usage of this device requires valid AWS credentials as\nset up following the instructions at\nhttps://github.com/aws/amazon-braket-sdk-python#prerequisites.\n\nAttributes:\n max_jobs:\n maximum number of parallel jobs sent by threads on :code:`batch_submit`.\n shots: number of shots used to estimate quantum observables.\n device_arn:\n an alpha-numeric code (arn=Amazon Resource Name) specifying a quantum device.\n poll_timeout_seconds:\n number of seconds before a poll to remote device is considered timed-out.\n poll_interval_seconds:\n number of seconds between polling of a remote device's status.\n aws_session:\n An :code:`AwsSession` object created to manage interactions with AWS services,\n to be supplied if extra control is desired.\n parallel: turn parallel execution on or off.\n max_parallel: the maximum number of circuits to be executed in parallel.\n max_connections: the maximum number of connections in the :code:`Boto3` connection pool.\n max_retries: the maximum number of time a job will be re-sent if it failed\n s3_destination_folder: Name of the S3 bucket and folder, specified as a tuple.\n run_kwargs: Variable length keyword arguments for :code:`braket.devices.Device.run()`",
"type": "object",
"properties": {
"persist_data": {
"title": "Persist Data",
"default": true,
"type": "boolean"
},
"qnode_device_import_path": {
"title": "Qnode Device Import Path",
"type": "array",
"minItems": 2,
"maxItems": 2,
"items": [
{
"type": "string"
},
{
"type": "string"
}
]
},
"qnode_device_shots": {
"title": "Qnode Device Shots",
"type": "integer"
},
"qnode_device_wires": {
"title": "Qnode Device Wires",
"type": "integer"
},
"pennylane_active_return": {
"title": "Pennylane Active Return",
"type": "boolean"
},
"device": {
"title": "Device",
"default": "default.qubit",
"type": "string"
},
"num_threads": {
"title": "Num Threads",
"default": 10,
"type": "integer"
},
"max_jobs": {
"title": "Max Jobs",
"default": 20,
"type": "integer"
},
"shots": {
"title": "Shots",
"default": [
null
],
"type": "integer"
},
"device_arn": {
"title": "Device Arn",
"type": "string"
},
"poll_timeout_seconds": {
"title": "Poll Timeout Seconds",
"default": 432000,
"type": "number"
},
"poll_interval_seconds": {
"title": "Poll Interval Seconds",
"default": 1,
"type": "number"
},
"aws_session": {
"title": "Aws Session",
"type": "string"
},
"parallel": {
"title": "Parallel",
"default": false,
"type": "boolean"
},
"max_parallel": {
"title": "Max Parallel",
"type": "integer"
},
"max_connections": {
"title": "Max Connections",
"default": 100,
"type": "integer"
},
"max_retries": {
"title": "Max Retries",
"default": 3,
"type": "integer"
},
"s3_destination_folder": {
"title": "S3 Destination Folder",
"type": "array",
"items": {}
},
"run_kwargs": {
"title": "Run Kwargs",
"default": {},
"type": "object"
}
}
}

CONFIG

extra: EXTRA = allow

field aws_session: Optional[str] = None

Validated by

  • set_name

field device_arn: str = None

Validated by

  • set_name

field max_connections: int = 100

Validated by

  • set_name

field max_jobs: int = 20

Validated by

  • set_name

field max_parallel: Optional[int] = None

Validated by

  • set_name

field max_retries: int = 3

Validated by

  • set_name

field parallel: bool = False

Validated by

  • set_name

field poll_interval_seconds: float = 1

Validated by

  • set_name

field poll_timeout_seconds: float = 432000

Validated by

  • set_name

field run_kwargs: dict = {}

Validated by

  • set_name

field s3_destination_folder: tuple [Optional]

Validated by

  • set_name

field shots: int = (None,)

Validated by

  • set_name

batch_submit(qscripts_list) [source]

Submit qscripts for execution using max_jobs-many threads.

PARAMETERS

qscripts_list – a list of Pennylane style QuantumScripts

RETURNS

a list of tasks subitted by threads.

RETURN TYPE

jobs

dict(*args, **kwargs) [source]

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Local Braket Qubit Executor

This quantum executor accesses the local Braket quantum circuit simulator ("braket.local.qubit").

It utilizes the Pennylane plugin found here. LocalBraketQubitExecutor introduces thread-based parallelism for circuit execution on the "braket.local.qubit" device.

1.Installation

LocalBraketQubitExecutor is included in Covalent. To use it, however, you will need to install the amazon-braket-pennylane-plugin:

pip install amazon-braket-pennylane-plugin

2. Usage Example

Using LocalBraketQubitExecutor is simple:

# Local simulator
executor = ct.executor.LocalBraketQubitExecutor(
device="default",
shots=1024,
num_threads=2
)

@ct.qelectron(executors=executor)
@qml.qnode(qml.device("default.qubit", wires=2, shots=1024))
def circuit(x):
qml.IQPEmbedding(features=x, wires=[0, 1])
qml.Hadamard(wires=1)
return [qml.expval(qml.PauliZ(0)), qml.expval(qml.PauliZ(1))]

As a QElectron, the circuit can be called either normally or asynchronously using circuit.run_later().

Synchronous example output is below

>>> print(circuit([0.5