Skip to main content

Persistent Volumes

Cloud Storage

While each task has access to temporary storage at runtime, no persistent storage is available to workflow tasks by default. Covalent Cloud provides a simple interface enabling persistent storage, using cc.volume(). This function creates a new volume (or refers to an existing volume) which can be attached to any workflow during dispatch.

# Creates the volume if it does not exist.
volume = cc.volume("/mydata")
runid = cc.dispatch(my_workflow, volume=volume)(*args, **kwargs)

This lets any task inside workflow freely read or write to files in /volumes/mydata.

Why use volumes?

Using task inputs and outputs is not generally suitable for transferring very large amounts of data. Inputs and outputs are in fact limited to 2.5 Gb in Covalent Cloud. A much better alternative here is to associate a storage volume via cc.volume().

Volumes in Covalent Cloud belong to a user, rather than a particular workflow. This means that volumes store data indefinitely, until explicit deletion. A volume therefore functions as a modular filesystem that can be associated with any workflow during dispatch. An associated volume becomes accessible workflow-wide and serves as shared persistent storage among the comprising tasks.

Volumes are especially helpful for avoiding time-consuming data transfers, as well as repeated downloads over parallel tasks. Loading and uploading model state files from storage is much more efficient, for example, than downloading state files over and over. This can be crucial for workflows that iterate upon Large Language Models (a.k.a. LLMs) or other pre-trained models, as well as workflows that themselves train models upon massive amounts of data.

Using storage volumes inside workflow tasks

Let’s take a look at a simple example.

from pathlib import Path
from datetime import datetime

import covalent as ct
import covalent_cloud as cc

cc.save_api_key("API_KEY")

default_ex = cc.CloudExecutor()

Consider the task defined below, which writes to specific files whenever it is executed. Assuming the "/mydata" volume is included on dispatch, we can write perfectly ordinary code to manipulate its contents.

@ct.electron(executor=ex)
def init_labelled_model(label, **params):
"""Creates files on the volume."""

# Create directory if it does not exist.
model_folder = Path("/volumes/mydata") / label
model_folder.mkdir(exist_ok=True)

# Copy a model to the named directory.
#
# e.g.
# _copy_existing_model(label, modler_folder, **params)
#
# or
# _download_model(label, model_folder, **params)
#

# Write to log file on volume.
log = model_folder / "log.txt"
timestamp = datetime.now().strftime("%H:%M:%S")
with open(log, "a", encoding="utf-8") as f:
f.write(f"{timestamp} - init new model\n")

return label

Tip

We haven’t done so here, but storing the volume name (i.e., ”/volumes/mydata” or f"/volumes/{volume.name}") inside a variable is often a good idea.

Assuming the model state files exist, we can then read, load, and update them as necessary.

@ct.electron(executor=ex)
def update_models(labels, **new_params):
"""Reads files on the volume."""

model_folder = Path("/volumes/mydata")

# Load each model and do whatever...
# for label in labels:
# model = _load_pytorch(f"model_{label}.pt")
# ...

# Read text from log files on volume.
logs_events = {}
for label in labels:
log = model_folder / label / "log.txt"
logs_events[label] = log.read_text().splitlines()

return logs_events

Now let’s create a simple workflow to do the following, using the two tasks above:

  • First, we'll create new “models” inside one or more named folders on the volume, using parallel init_labelled_model() tasks.
  • Then, we'll update these "models" by accessing the named folders with an update_models() task.

We also have the workflow (shown below) returning some file contents here, just to help drive home the point.

@ct.lattice(executor=ex, workflow_executor=ex)
def init_models(*labels, **params):
"""Workflow that manipulates model files; performs updates; reads logs."""
names = []
for label in labels:
names.append(init_labelled_model(label)) # parallel tasks

logs_update = update_models(names, **params)
return logs_update

Dispatching and inspecting workflow outputs

The outputs of subsequent init_models() workflows will be exactly what we’d expect. Let’s quickly confirm that with two sequential dispatches.

volume = cc.volume("/volumes/mydata")
model_types = ["model-1a-small", "model-3c-small", "model-3c-large"]

if __name__ == "__main__":

# Dispatch workflow and associate the volume.
runid = cc.dispatch(init_models, volume=volume)(*model_types)

result = cc.get_result(runid, wait=True).result
result.load()

from pprint import pprint
pprint(result.value)

Here’s what we get the very first time that we run this workflow. These are the contents of the log files, /volumes/mydata/<model-id-name>/log.txt:

{'model-1a-small': ['18:08:13 - init new model'],
'model-3c-large': ['18:08:04 - init new model'],
'model-3c-small': ['18:08:13 - init new model']}

Naturally, we see one line per model label, as read from each log file.

Dispatching the workflow a second time, with the same volume attached, produces the output:

{'model-1a-small': ['18:08:13 - init new model', '18:18:52 - init new model'],
'model-3c-large': ['18:08:04 - init new model', '18:18:52 - init new model'],
'model-3c-small': ['18:08:13 - init new model', '18:18:51 - init new model']}

This time, we again see the old entries and the new entries created by the second workflow.

Notes and Caveats

Volumes are not currently compatible with every available GPU-type, as specified by the gpu_type parameter of CloudExecutor. See here for more details.