Transferring Files To and From Google Cloud Storage
Retrieve files from a Google Cloud Storage bucket before executing a task, then upload files to a bucket after the task's execution.
The following example workflow downloads an image file from a bucket, processes the file's contents, then uploads the file back to the bucket.
Prerequisites
Upload a color image file to a bucket. Make note of the gsutil URI to use in the workflow, in the format gs://<bucket_name>/<object_name>
.
In this example, the gsutil URI is gs://covalenthowto/remote_{unprocessed_filename}
, where {unprocessed_filename}
is a variable containing the name of the file.
Additionally, create a service account whose credentials will be used to authenticate to the storage account.
Procedure
- Define two Covalent
FileTransfer
objects and a CovalentGCloud
strategy object. In this example, we will be using factory classesTransferFromRemote
andTransferToRemote
which generateFileTransfer
objects.
import covalent as ct
from typing import List, Tuple
from pathlib import Path
from skimage import io, color
strategy = ct.fs_strategies.GCloud(
credentials="/path/to/credentials.json",
project_id="my-project-id",
)
unprocessed_filename = "unprocessed_file.png"
processed_filename = "processed_file.png"
unprocessed_filepath = str(Path(unprocessed_filename).resolve())
processed_filepath = str(Path(processed_filename).resolve())
storage_bucket = "covalenthowto"
object_source_path = f"gs://{storage_bucket}/remote_{unprocessed_filename}"
object_dest_path = f"gs://{storage_bucket}/remote_{processed_filename}"
ft_1 = ct.fs.TransferFromRemote(blob_source_path, unprocessed_filepath, strategy=strategy)
ft_2 = ct.fs.TransferToRemote(blob_dest_path, processed_filepath, strategy=strategy)
- Define an electron to:
- Download the unprocessed file from cloud storage
- Perform some processing on the contents
- Upload the processed file to cloud storage
Access the file paths inside the electron as shown below using the "files" keyword argument. Covalent injects the source and destination file paths of the TransferFromRemote
and TransferToRemote
objects into the files
argument. In this case, the files
variable is a list of tuples of the form (<source-path>, <destination-path>)
. The list looks something like this:
[('/remote_unprocessed_file.png', '/path/to/current/dir/unprocessed_file.png'), ('/path/to/current/dir/processed_file.png', '/remote_processed_file.png')]
The Google Cloud storage bucket names are omitted from the remote path in the list; they are applied automatically by the FileTransfer
objects.
@ct.electron(files=[ft_1, ft_2]) # ft_1 is done before the electron is executed; ft_2 is done after.
def to_grayscale(files: List[Tuple[str]] = None):
# Get the downloaded file's path
image_path = files[0][1] # destination file path of first file transfer, downloaded before executing this electron
# Convert the image to grayscale
img = io.imread(image_path)[:, :, :3] # limiting image to 3 channels
gray_img = color.rgb2gray(img)
# Save the grayscale image to the upload file path
gray_image_path = files[1][0] # source filepath of second file transfer, to be uploaded
io.imsave(gray_image_path, gray_img)
- Create and dispatch a lattice to run the electron.
@ct.lattice
def process_blob_data():
return to_grayscale()
dispatch_id = ct.dispatch(process_blob_data)()
status = ct.get_result(dispatch_id, wait=True).status
print(status)
See Also
Transferring Local Files During Workflows
Transferring Files To and From a Remote Host