Cloud Executor
1. Introduction
The CloudExecutor plugin for Covalent Cloud is a testament to the power of abstracted cloud computing. Designed for those who wish to dive straight into their work without the intricacies of setup, this plugin offers:
- Abstracted Cloud Experience: Dive into computational tasks without the overhead of understanding the underlying cloud infrastructure.
- Zero Infrastructure Setup: Unlike other open-source plugins, CloudExecutor requires no initial setup, letting you start immediately.
- On-Demand Resources: Access high-performance CPU and GPU cores whenever you need them.
- Prebuilt Environments: Say goodbye to dependency installations. CloudExecutor operates within pre-configured environments, ensuring compatibility and efficiency.
- Fully Managed: From resource allocation to Covalent server management, everything is taken care of. Focus solely on your tasks and let the CloudExecutor handle the rest.
2. Installation
To use this plugin with Covalent Cloud, simply install the Covalent Cloud SDK using pip
:
pip install covalent-cloud --upgrade
Note that this plugin requires a Covalent Cloud account
Usage & API
CloudExecutor represents a configuration tailored for executing a Covalent workflow on the Covalent Cloud. This class empowers users to specify resources and the software environment for their workflows.
Attributes
- num_cpus (int, optional): Number of CPUs for the workflow. Range: 1 to 600 (based on the provided configurations). Defaults to 1.
- memory (Union[int, str], optional): Amount of memory to be used for the workflow. Can be provided as an integer (in MB) or as a string with units (e.g.,
"1024 MB"
or"1 GB"
). If provided as a string, valid units include MB, GB, and GiB. The value is internally converted to MB for processing. Default 1024 Mb - num_gpus (int, optional): Number of GPUs for the workflow. Range: 0 to 8 (based on the provided configurations). Defaults to 0.
- gpu_type (str, optional): GPU type for the workflow, e.g.
"h100"
,"a100"
,"v100"
,"a10"
, and others. See here for a complete list. Each GPU type has a specific core count. Defaults to an empty string. - env (str, optional): Software environment for the workflow. Defaults to "default".
- time_limit (Union[int, timedelta, str], optional): Workflow execution time limit, either in seconds or as a timedelta. Defaults to 60. Can also accept a human-readable string like
'in 30 minutes'
.
Example
Unlike other Open-Source Plugins, CloudExecutor
does not need covalent Open-Source server to be started using covalent start
one can simply just use the covalent primitives to construct the workflow and use the cloud dispatch
function to dispatch which would ship the workflow to our cloud platform.
import covalent as ct
import covalent_cloud as cc
from numpy.random import permutation
from sklearn import svm, datasets
# Replace this with your Covalent Cloud API Key
cc.save_api_key("<MY_API_KEY>")
# Create a custom runtime environment
cc.create_env(
"sklearn",
conda=["python=3.8", "pip"],
pip=["numpy", "scikit-learn"],
)
# Define some executors
high_compute = cc.CloudExecutor(
env="sklearn",
num_cpus=64,
memory=16000, # in MB
time_limit=3600, # in seconds
)
low_compute = cc.CloudExecutor(
env="sklearn",
num_cpus=1,
memory=1000,
time_limit=30,
)
# Describe the workflow tasks (electrons)
@ct.electron(executor=high_compute)
def train_svm(data, C, gamma):
X, y = data
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X[90:], y[90:])
return clf
@ct.electron
def load_data():
iris = datasets.load_iris()
perm = permutation(iris.target.size)
iris.data = iris.data[perm]
iris.target = iris.target[perm]
return iris.data, iris.target
@ct.electron
def score_svm(data, clf):
X_test, y_test = data
return clf.score(
X_test[:90],
y_test[:90]
)
# Construct the workflow (lattice)
@ct.lattice(
executor=low_compute,
workflow_executor=low_compute
)
def run_experiment(C=1.0, gamma=0.7):
data = load_data()
clf = train_svm(
data=data,
C=C,
gamma=gamma
)
score = score_svm(
data=data,
clf=clf
)
return score
# Dispatch the workflow
dispatch_id = cc.dispatch(run_experiment)(
C=1.0,
gamma=0.7
)
print(dispatch_id)
# Synchronously query the result manifest
manifest = ct.get_result(
dispatch_id=dispatch_id,
wait=True
)
# Download the workflow result
manifest.result.load()
print(manifest.result)