Skip to main content

Getting Started

Hello and welcome! This guide will help you get started with workflows and function serving in Covalent Cloud.

The first step is to install the Covalent Cloud SDK.

pip install -U covalent-cloud

Create an account and log in to grab your API key from the dashboard. Now we're ready go!

Setup

Developing with Covalent usually starts with something like this:

import covalent as ct
import covalent_cloud as cc

cc.save_api_key("paste-your-api-key-here") # save your API key

cc.create_env(name="my-env", pip=["faker"], wait=True) # create an environment

general_ex = cc.CloudExecutor(env="my-env", num_cpus=4, memory="12GB") # define an executor
default_ex = cc.CloudExecutor(env="my-env") # define another executor

Save your API key

Your API key grants you access to Covalent Cloud. Save it just once with cc.save_api_key() (until you generate a new one).

Create environments

Software environments in Covalent Cloud are similar to conda environments. Once they're created with cc.create_env(), they're stored in your account to be edited and/or reused. You can create as many environments as you like.

Define executors

Cloud executors represent a modular set of compute resources. We do this in Python with a CloudExecutor instance.

Tip

An executor with two H100 GPUs might look the one below.

cc.CloudExecutor(env="my-llm-env", gpu_type="h100", num_gpus=2, num_cpus=32, memory="128GB")

See here for all GPU types currently available in Covalent Cloud.

Workflows

Workflows in Covalent usually contain a few electrons (tasks) and a single lattice (a function that composes tasks).

The workflow below just does some simple math, but it's useful to illustrate the concept. Here we use the general_ex and default_ex executors to assign different resources to each electron.

@ct.electron(executor=default_ex)  # assign default 1 CPU, 1 GB RAM
def add(a, b):
return a + b

@ct.electron(executor=general_ex) # assign 12 CPUs, 16 GB RAM
def multiply(a, b):
return a * b

@ct.lattice(executor=default_ex, workflow_executor=default_ex)
def my_workflow(a, b, c):
x = add(a, b)
y = multiply(a, c)
z = add(x, y)
return z

# - Dispatch to Covalent Cloud - #

dispatch_id = cc.dispatch(my_workflow)(1, 2, 3)
res = cc.get_result(dispatch_id, wait=True) # wait for the result
res.result.load()

print("The number is:", res.result.value) # "The number is: 6"

Inside a lattice, tasks are executed concurrently wherever possible. This means x and y here will be computed at the same time - with z starting immediately after.

Tip

Covalent can run almost any code on various cloud GPUs, CPUs, and more! See our tutorials for more examples!

Services

Function services are another powerful feature in Covalent Cloud. They consist of a service initializer and one or more API endpoints.

The simple service below uses faker to provide an API for creating realistic (but fake) data.

@cc.service(executor=general_ex)  # define service initializer
def my_service(default_count=1):
import faker
fake = faker.Faker()
return {"fake": fake, "count": default_count}

@my_service.endpoint("/create-fakes") # add API endpoints
def create_fakes(fake, count, kind="name"):
fake_func = getattr(fake, kind)
return [fake_func() for _ in range(count)]

# - Deploy to Covalent Cloud - #

client = cc.deploy(my_service)()
client = cc.get_deployment(client, wait=True) # wait for active state

Notice how @cc.service accepts an executor (just like @ct.electron). An executor here assigns compute resources to the function service.

Once it's deployed, we can use the client to get two fake addresses and print the second.

print(client.create_fakes(count=2, kind="address")[1])

# 82331 Payne Plaza Apt. 115
# Dannyton, NV 59645

Unlike workflows, function services are long-lived so they can be used repeatedly, without starting over.

client.create_fakes()

# ['Edgar Walker']

To remove a service before its time limit expires, we initiate a teardown:

client.teardown()  # remove the service

Tip

For a more substantial example using GPUs, see this tutorial on serving a Llama 3 model for real-time inference.

Combining Workflows and Services

We can also deploy function services from inside a workflow, using task results to initialize the service. To do this, just call the initializer function inside the lattice.

After a small change, the workflow from above now concludes by using the result z to deploy the "faker" service with default_count=6.

@ct.electron(executor=default_ex)  # assign default 1 CPU, 1 GB RAM
def add(a, b):
return a + b

@ct.electron(executor=general_ex) # assign 12 CPUs, 16 GB RAM
def multiply(a, b):
return a * b

@ct.lattice(executor=default_ex, workflow_executor=default_ex)
def my_workflow(a, b, c):
x = add(a, b)
y = multiply(a, c)
z = add(x, y)
return my_service(default_count=z) # !! NEW !!

# - Dispatch (and deploy) to Covalent Cloud - #

dispatch_id = cc.dispatch(my_workflow)(1, 2, 3)
res = cc.get_result(dispatch_id, wait=True) # wait for the result
res.result.load()

Notice how the /create-fakes endpoint now responds with six fake names by default:

client = res.result.value
client.create_fakes()

# ['Oscar Herring', 'Mary Tucker', 'Christopher Davis', 'Elizabeth Valdez', 'Kristen Baldwin', 'Stacey Delacruz']

As before, we can initiate a teardown to remove this service before its time limit expires:

client.teardown()  # remove the service

Summary

Covalent Cloud is a platform for executing high-compute workflows and serving high-compute functions on powerful hardware. This guide covered the bare basics of the Python SDK. Covalent Cloud is capable of much more than we encountered here, so be sure to visit our tutorials for a variety of more practical examples!

Full code
import covalent as ct
import covalent_cloud as cc

# Setup.

cc.save_api_key("paste-your-api-key-here") # save your API key

cc.create_env(name="my-env", pip=["faker"], wait=True) # create an environment
general_ex = cc.CloudExecutor(env="my-env", num_cpus=12, memory="16GB") # define an executor
default_ex = cc.CloudExecutor()

# Service.

@cc.service(executor=general_ex) # define service initializer
def my_service(default_count=1):
import faker
fake = faker.Faker()
return {"fake": fake, "count": default_count}

@my_service.endpoint("/create-fakes") # add API endpoints
def create_fakes(fake, count, kind="name"):
fake_func = getattr(fake, kind)
return [fake_func() for _ in range(count)]

# Workflow.

@ct.electron(executor=default_ex) # assign default 1 CPU, 1 GB RAM
def add(a, b):
return a + b

@ct.electron(executor=general_ex) # assign 12 CPUs, 16 GB RAM
def multiply(a, b):
return a * b

@ct.lattice(executor=default_ex, workflow_executor=default_ex)
def my_workflow(a, b, c):
x = add(a, b)
y = multiply(a, c)
z = add(x, y)
# return z
return my_service(default_count=z) # !! DEPLOY FAKER SERVICE !!

# Dispatch and deploy to Covalent Cloud.

if __name__ == "__main__":

import time

dispatch_id = cc.dispatch(my_workflow)(1, 2, 3)
res = cc.get_result(dispatch_id, wait=True) # wait for the result
res.result.load()

client = res.result.value

print(client)
print(client.create_fakes())

print("Service will be removed in 10 seconds...")
time.sleep(10)
client.teardown()