Running Asynchronous Workflows
Kick off your high-performance computing on Covalent Cloud with a simple install:
pip install --upgrade covalent-cloud
Next, define your workflow with @ct.electron
for tasks and @ct.lattice
for workflows. Attach a cloud executor like so:
resource=cc.CloudExecutor(env="<your environment>", num_cpus=1, memory="1GB")
Refer to our environment setup guide for more details. If your electron
depends on various package requirements, please use the environment setup guide to create the required reusable environment. For ease of use, all inputs and outputs within your workflow are automatically cached and securely stored on Covalent Cloud. They're readily accessible via the given run ID or through a direct link in the Covalent UI, where you can effortlessly manage and retrieve your data. Please note, the current threshold for input and output size is 2.5 GB. Should your workflow require handling larger datasets, we suggest utilizing the volume feature, which provides an efficient method to upload or download your data in persistent storage for seamless use in subsequent tasks.
1. Single Electron Workflow:
Utilize single electron workflows when you need to offload a computationally intensive task to the cloud. For example, if you have a function that processes large datasets or performs complex calculations, such as rendering a high-resolution image or analyzing big data, this method is ideal. It allows the task to leverage the cloud's CPU resources effectively. Note that inside of this function multiprocessing and other parallel methods can be used as this is run in a machine which has access to num_cpu
resources.
import covalent as ct
import covalent_cloud as cc
resource = cc.CloudExecutor(num_cpus=1, memory="1GB")
@ct.lattice(executor=resource, workflow_executor=resource)
@ct.electron
def high_compute_function(string):
return f"high compute {string}"
runid=cc.dispatch(high_compute_function)("function")
async_result=cc.get_result(runid,wait=True)
async_result.result.load()
print(async_result.result.value)
Once the lattice is dispatched, it will show up in https://app.covalent.xyz/dispatches, with live status updates and other details.
Note:
Since we have not defined the environment, the default environment will be used here.
2. Multi-Electron Workflow:
Choose this approach when your workflow consists of multiple interdependent tasks, such as a data pipeline that involves fetching data, processing it through several steps, and then performing an aggregation or analysis. If your business logic includes conditional operations, iterative processing, or requires a sequence of operations where the output of one step is the input to the next, a multi-electron workflow will manage these dependencies efficiently, running each task with the resources it requires in parallel where possible. This method streamlines complex workflows by managing resource allocation and task orchestration for you. For more intricate workflow patterns and resource management, delve into our documentation on lattice structures.
import covalent as ct
import covalent_cloud as cc
resource=cc.CloudExecutor(num_cpus=1, memory="1GB")
@ct.electron(executor=resource)
def add(x,y):
return x+y
@ct.electron #lattice's default executor will be propagated to the electron
def subtract(x,y):
return x+y
@ct.lattice(executor=resource, workflow_executor=resource)
def workflow(x,y,n):
k=add(x,y)
for i in range(n):
z=add(i,k)
l=subtract(z,y)
return l
runid=cc.dispatch(workflow)(x=1,y=2,n=3)
async_result=cc.get_result(runid,wait=True)
async_result.result.load()
print(async_result.result.value)
Once the lattice is dispatched, it will show up in https://app.covalent.xyz/dispatches, with live status updates and other details.
Info
Results in Covalent's asynchronous workflows are retrieved on-demand. Use .load()
to fetch and .value
to access results, ensuring efficient, just-in-time data handling.