Getting Started
Welcome! This guide contains a handful of simple examples to help you get started with running batch jobs, deploying services, and orchestrating complex workflows in Covalent Cloud. You'll need only basic Python knowledge to follow along — no prior cloud, Kubernetes, or Docker experience is required!
Installation and Initial Setup
Start by installing the Covalent Cloud SDK. We recommend using a local Python environment with Python version 3.9 to 3.11.
pip install -U covalent-cloud
Create an account then retrieve your API key from the dashboard and save it.
import covalent_cloud as cc
cc.save_api_key("<your-API-key-here>")
Environment Setup
Skip the hassle of managing container images. With Covalent Cloud, you can directly create, delete, and modify reusable cloud environments.
cc.create_env(
name="sklearn-env",
pip=["scikit-learn==1.5.2", "yfinance==0.2.48", "numpy==1.23.5"],
wait=True,
)
See our Environment Management Guide for more on cloud environments.
Define Compute Resources
Next, let's create an executor to use in this guide. In Covalent Cloud, executors specify a modular set of compute resources for tasks and services.
cpu_ex = cc.CloudExecutor(
env="sklearn-env",
num_cpus=2,
memory="8GB",
time_limit="2 hours"
)
This guide does not require them, but to use GPUs we'd simply specify the gpu_type
and num_gpus
:
gpu_ex = cc.CloudExecutor(
env="sklearn-env",
num_cpus=24,
num_gpus=4,
gpu_type="h100",
time_limit="30 minutes"
)
Making a CloudExecutor
instance does not provision or reserve anything in the cloud. You can think of executors as specifications for the resources you want to use. Any executor can be assigned to any task or service. Covalent Cloud only creates resources when a task is executed or service is deployed.
See our Compute Resources Guide for more information.
- Compute Jobs
- Inference API endpoints
- Orchestrating Compute Workflows
Running batch jobs or tasks in Covalent Cloud is straightforward. Creating tasks is as simple as adding a single decorator to almost any Python function. In the example below, we train a stock predictor using the environment and compute resources defined above.
As written, this example requires yfinance
and scikit-learn
to be installed in your local environment.
pip install yfinance==0.2.48 scikit-learn==1.5.2
Local requirements can generally be avoided by simply moving relevant imports inside the decorated function.
import covalent as ct
import sklearn
import sklearn.svm
import yfinance as yf
from sklearn.model_selection import train_test_split
# One-task workflow when stacking decorators:
@ct.lattice(executor=cpu_ex, workflow_executor=cpu_ex)
@ct.electron(executor=cpu_ex)
def fit_svr_model_and_evaluate(ticker, n_chunks, C=1):
ticker_data = yf.download(ticker, start='2022-01-01', end='2023-01-01')
data = ticker_data.Close.to_numpy()
X = [data[i:i+n_chunks].squeeze() for i in range(len(data) - n_chunks)]
y = data[n_chunks:].squeeze()
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False)
# Fit SVR model
model = sklearn.svm.SVR(C=C).fit(X_train, y_train)
# Predict and calculate MSE
predictions = model.predict(X_test)
mse = sklearn.metrics.root_mean_squared_error(y_test, predictions)
return model, mse
# Run the one-task workflow
runid = cc.dispatch(fit_svr_model_and_evaluate)('AAPL', n_chunks=6, C=10)
model, mse = cc.get_result(runid, wait=True).result.load()
print(model, mse)
Full Code
import covalent as ct
import covalent_cloud as cc
import sklearn
import sklearn.svm
import yfinance as yf
from sklearn.model_selection import train_test_split
cc.save_api_key("<your-API-key-here>")
cc.create_env(
name="sklearn-env",
pip=["scikit-learn==1.5.2", "yfinance==0.2.48", "numpy==1.23.5"],
wait=True,
)
cpu_ex = cc.CloudExecutor(
env="sklearn-env",
num_cpus=2,
memory="8GB",
time_limit="2 hours"
)
# One-task workflow when stacking decorators:
@ct.lattice(executor=cpu_ex, workflow_executor=cpu_ex)
@ct.electron(executor=cpu_ex)
def fit_svr_model_and_evaluate(ticker, n_chunks, C=1):
ticker_data = yf.download(ticker, start='2022-01-01', end='2023-01-01')
data = ticker_data.Close.to_numpy()
X = [data[i:i+n_chunks].squeeze() for i in range(len(data) - n_chunks)]
y = data[n_chunks:].squeeze()
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False)
# Fit SVR model
model = sklearn.svm.SVR(C=C).fit(X_train, y_train)
# Predict and calculate MSE
predictions = model.predict(X_test)
mse = sklearn.metrics.root_mean_squared_error(y_test, predictions)
return model, mse
# Run the one-task workflow
runid = cc.dispatch(fit_svr_model_and_evaluate)('AAPL', n_chunks=6, C=10)
model, mse = cc.get_result(runid, wait=True).result.load()
print(model, mse)
In Covalent Cloud, you can define services directly in Python to host, for example, AI models or other applications. Everything from distributed REST endpoints to authentication is handled automatically. This means there is no need to write proxy services or to create and manage any additional infrastructure.
See here for more information on services in Covalent Cloud.
Example: Serving a Predictive Model
Here's how to deploy a scikit-learn
model as a function service:
import sklearn
import sklearn.svm
import yfinance as yf
@cc.service(executor=cpu_ex, name='Demo Stock Predictor', auth=False)
def stock_prediction(ticker, n_chunks, C=1):
"""Hosts a simple stock value prediction model."""
data = yf.download(ticker, start='2022-01-01',
end='2023-01-01').Close.to_numpy()
X = [data[i:i+n_chunks].squeeze() for i in range(len(data) - n_chunks)]
y = data[n_chunks:].squeeze()
# Fit SVR model
model = sklearn.svm.SVR(C=C).fit(X, y)
return {'model': model, 'n_chunks': n_chunks}
@stock_prediction.endpoint('/predict')
def predict(model=None, n_chunks=None, *, stock_price=None):
"""Predict a future stock price given preceding stock prices."""
if not stock_price:
return "Missing preceding stock prices."
if len(stock_price) != n_chunks:
return {'error': 'Invalid input size'}
return model.predict([stock_price]).tolist()
# Deploy predictor service.
predictor = cc.deploy(stock_prediction)(ticker='AAPL', n_chunks=6, C=10)
predictor = cc.get_deployment(predictor, wait=True) # wait for active state
The cc.deploy
function returns a Python client for making requests to custom endpoints.
print(predictor)
╭──────────────────────────────── Deployment Information ────────────────────────────────╮
│ Name Demo Stock Predictor │
│ Description Add a docstring to your service function to populate this section. │
│ Function ID 6724072c9d6b44ec9082d193 │
│ Address https://fn-a.prod.covalent.xyz/6724072c9d6b44ec9082d193 │
│ Status ACTIVE │
│ Auth Enabled No │
╰────────────────────────────────────────────────────────────────────────────────────────╯
╭────────────────────────────────────── Endpoints ───────────────────────────────────────╮
│ Route POST /predict │
│ Streaming No │
│ Description Predict the stock price given the preceding stock prices. │
╰────────────────────────────────────────────────────────────────────────────────────────╯
stock_price = [135.21000671, 135.27000427, 137.86999512, 141.11000061, 142.52999878, 141.86000061]
predictor.predict(stock_price=stock_price)
We can also submit requests directly, without using the client:
address="<paste-service-address-here>"
curl -s -XPOST $address/predict -d '{"stock_price": [135.21000671, 135.27000427, 137.86999512, 141.11000061,142.52999878, 141.86000061]}'
Finally, to tear down a deployment before its executor's time_limit
, we can run the following:
predictor.teardown()
For more information on deploying and managing API services, see our Service Deployment Guide.
Full Code
import covalent_cloud as cc
import sklearn
import sklearn.svm
import yfinance as yf
cc.save_api_key("<your-API-key-here>")
cc.create_env(
name="sklearn-env",
pip=["scikit-learn==1.5.2", "yfinance==0.2.48", "numpy==1.23.5"],
wait=True,
)
cpu_ex = cc.CloudExecutor(
env="sklearn-env",
num_cpus=2,
memory="8GB",
time_limit="2 hours"
)
@cc.service(executor=cpu_ex, name='Demo Stock Predictor', auth=False)
def stock_prediction(ticker, n_chunks, C=1):
"""Hosts a simple stock value prediction model."""
data = yf.download(ticker, start='2022-01-01',
end='2023-01-01').Close.to_numpy()
X = [data[i:i+n_chunks].squeeze() for i in range(len(data) - n_chunks)]
y = data[n_chunks:].squeeze()
# Fit SVR model
model = sklearn.svm.SVR(C=C).fit(X, y)
return {'model': model, 'n_chunks': n_chunks}
@stock_prediction.endpoint('/predict')
def predict(model=None, n_chunks=None, *, stock_price=None):
"""Predict a future stock price given preceding stock prices."""
if not stock_price:
return "Missing preceding stock prices."
if len(stock_price) != n_chunks:
return {'error': 'Invalid input size'}
return model.predict([stock_price]).tolist()
# Deploy predictor service.
predictor = cc.deploy(stock_prediction)(ticker='AAPL', n_chunks=6, C=10)
predictor = cc.get_deployment(predictor, wait=True) # wait for active state
stock_price = [135.21000671, 135.27000427, 137.86999512,
141.11000061, 142.52999878, 141.86000061]
predictor.predict(stock_price=stock_price)
predictor.teardown()
Covalent Cloud also makes it easy to combine batch jobs and/or services into seamless workflows. See here for more information on workflows in Covalent Cloud.
Let's create a workflow that trains multiple SVR models with different parameters, selects the one with the best mse, and deploys it as an API service.
1. Define tasks
import covalent as ct
import covalent_cloud as cc
import sklearn
import sklearn.svm
import yfinance as yf
from sklearn.model_selection import train_test_split
@ct.electron(executor=cpu_ex)
def fit_svr_model_and_evaluate(ticker, n_chunks, C=1):
# Fetch and prepare data
ticker_data = yf.download(ticker, start='2022-01-01', end='2023-01-01')
data = ticker_data.Close.to_numpy()
X = [data[i:i+n_chunks].squeeze() for i in range(len(data) - n_chunks)]
y = data[n_chunks:].squeeze()
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False)
# Fit SVR model
model = sklearn.svm.SVR(C=C).fit(X_train, y_train)
# Predict and calculate mse
predictions = model.predict(X_test)
mse = sklearn.metrics.root_mean_squared_error(y_test, predictions)
return model, n_chunks, mse
@ct.electron(executor=cpu_ex)
def choose_best(model_accuracies):
best_model = None
best_mse = float('inf')
for model, n_chunks, mse in model_accuracies:
if mse < best_mse:
best_model = model
best_mse = mse
best_n_chunks = n_chunks
return best_model, best_n_chunks, best_mse
2. Define deployment service
@cc.service(executor=cpu_ex, name='Demo Stock Predictor')
def deploy_model(model, n_chunks):
"""Hosts a simple stock value prediction model."""
return {'model': model, 'n_chunks': n_chunks}
@deploy_model.endpoint('/predict')
def predict(model=None, n_chunks=None, *, stock_price=None):
"""Predict the stock price given the preceding stock prices."""
if not stock_price:
return "Missing preceding stock prices."
if len(stock_price) != n_chunks:
return {'error': 'Invalid input size'}
return model.predict([stock_price]).tolist()
3. Define the workflow
@ct.lattice(executor=cpu_ex, workflow_executor=cpu_ex)
def find_best_svr(ticker, n_chunks_list, Cs):
model_accuracies = []
for n_chunks in n_chunks_list:
for C in Cs:
model_accuracies.append(
fit_svr_model_and_evaluate(ticker, n_chunks, C))
best_model, best_n_chunks, best_mse = choose_best(model_accuracies)
return deploy_model(best_model, best_n_chunks)
4. Run the workflow
runid = cc.dispatch(find_best_svr)(ticker='AAPL', n_chunks_list=[5, 10, 15], Cs=[0.1, 1, 10])
The workflow result is a Python client for the best model deployment. We can use this client to make authenticated requests to the deployed service.
predictor = cc.get_result(runid, wait=True).result.load()
# assuming n_chunks=5 model was selected
stock_price = [135.21000671, 135.27000427, 137.86999512, 141.11000061, 142.52999878]
predictor.predict(stock_price=stock_price)
predictor.teardown() # comment out to leave running
Full Code
import covalent as ct
import covalent_cloud as cc
import sklearn
import sklearn.svm
import yfinance as yf
from sklearn.model_selection import train_test_split
cc.save_api_key("<your-API-key-here>")
cc.create_env(
name="sklearn-env",
pip=["scikit-learn==1.5.2", "yfinance==0.2.48", "numpy==1.23.5"],
wait=True,
)
cpu_ex = cc.CloudExecutor(
env="sklearn-env",
num_cpus=2,
memory="8GB",
time_limit="2 hours"
)
@ct.electron(executor=cpu_ex)
def fit_svr_model_and_evaluate(ticker, n_chunks, C=1):
# Fetch and prepare data
ticker_data = yf.download(ticker, start='2022-01-01', end='2023-01-01')
data = ticker_data.Close.to_numpy()
X = [data[i:i+n_chunks].squeeze() for i in range(len(data) - n_chunks)]
y = data[n_chunks:].squeeze()
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False)
# Fit SVR model
model = sklearn.svm.SVR(C=C).fit(X_train, y_train)
# Predict and calculate mse
predictions = model.predict(X_test)
mse = sklearn.metrics.root_mean_squared_error(y_test, predictions)
return model, n_chunks, mse
@ct.electron(executor=cpu_ex)
def choose_best(model_accuracies):
best_model = None
best_mse = float('inf')
for model, n_chunks, mse in model_accuracies:
if mse < best_mse:
best_model = model
best_mse = mse
best_n_chunks = n_chunks
return best_model, best_n_chunks, best_mse
@cc.service(executor=cpu_ex, name='Demo Stock Predictor', auth=False)
def deploy_model(model, n_chunks):
"""Hosts a simple stock value prediction model."""
return {'model': model, 'n_chunks': n_chunks}
@deploy_model.endpoint('/predict')
def predict(model=None, n_chunks=None, *, stock_price=None):
"""Predict the stock price given the preceding stock prices."""
if not stock_price:
return "Missing preceding stock prices."
if len(stock_price) != n_chunks:
return {'error': 'Invalid input size'}
return model.predict([stock_price]).tolist()
@ct.lattice(executor=cpu_ex, workflow_executor=cpu_ex)
def find_best_svr(ticker, n_chunks_list, Cs):
model_accuracies = []
for n_chunks in n_chunks_list:
for C in Cs:
model_accuracies.append(
fit_svr_model_and_evaluate(ticker, n_chunks, C))
best_model, best_n_chunks, best_mse = choose_best(model_accuracies)
return deploy_model(best_model, best_n_chunks)
runid = cc.dispatch(find_best_svr)(
ticker='AAPL',
n_chunks_list=[5, 10, 15],
Cs=[0.1, 1, 10]
)
predictor = cc.get_result(runid, wait=True).result.load()
# assuming n_chunks=5 model was selected
stock_price = [135.21000671, 135.27000427, 137.86999512, 141.11000061, 142.52999878]
predictor.predict(stock_price=stock_price)
predictor.teardown() # comment out to leave running