Transferring Local Files During Workflows 
Transfer files locally before or after executing an electron.
Prerequisites
- For convenience in running this example, define the read (source) and write (destination) file paths.
from pathlib import Path
# Define source and destination filepaths
source_filepath = Path('./my_source_file').resolve()
dest_filepath = Path('./my_dest_file').resolve()
- Create a source file to transfer.
# Create an example file
file_content = """Mares eat oats and does eat oats
And little lambs eat ivy ...
"""
with open(source_filepath, "w") as f:
f.write(file_content)
Procedure
- Define a Covalent
FileTransferobject, assigning the source and destination file paths respectively as its arguments.
import covalent as ct
xfer = ct.fs.FileTransfer(str(source_filepath), str(dest_filepath))
- Define a list of Covalent
FileTransferobjects to assign to a task. (In this example, the list contains only the singleFileTransfernamedxfer.)
ft_list = [xfer]
- Define an electron that uses a Covalent
FileTransfertask to read the source file and writes to the destination file, assigning the list containing theFileTransferobjects to thefilesargument of the electron decorator. (Note that thefilesargument takes a list of CovalentFileTransferobjects, not files or path names.)
@ct.electron(
files = ft_list
)
def my_file_transfer_task(files):
from_file, to_file = files[0]
with open(to_file,'w') as f:
for line in open(from_file, 'r'):
f.write(line)
return to_file
Here is the task definition again, with the three steps combined in the electron decorator. The FileTransfer defaults to the local Rsync strategy:
import covalent as ct
@ct.electron(
files=[ct.fs.FileTransfer(str(source_filepath), str(dest_filepath))] # defaults to Rsync
)
def my_file_transfer_task(files=[]):
from_file, to_file = files[0]
with open(to_file,'w') as f_to, open(from_file, 'r') as f_from:
for line in f_from:
f_to.write(line)
return to_file
- Run the electron thus created in a lattice:
# Create and dispatch a workflow to transfer data from source to destination, and write to destination file
@ct.lattice()
def my_workflow():
return my_file_transfer_task()
dispatch_id = ct.dispatch(my_workflow)()
- Confirm the transfer by reading the contents of the destination file:
After executing the workflow a copy of the file (
source_filepath) has been written tomy_dest_file. This file transfer occured before electron execution.
result = ct.get_result(dispatch_id, wait=True)
print(result)
result_filepath = result.result
# Read from the destination file
print("Reading from ", result_filepath, "\n")
with open(result_filepath,'r') as f:
print(f.read())
# Clean up files
source_filepath.unlink()
dest_filepath.unlink()
Lattice Result
==============
status: COMPLETED
result: /Users/mini-me/agnostiq/covalent/doc/source/how_to/coding/my_dest_file
input args: []
input kwargs: {}
error: None
start_time: 2023-01-29 22:18:07.789282
end_time: 2023-01-29 22:18:07.914328
results_dir: /Users/mini-me/agnostiq/covalent/doc/source/how_to/coding/results
dispatch_id: 024088b9-6f74-4e5f-9757-9f088bd16b29
Node Outputs
------------
my_file_transfer_task(0): /Users/mini-me/agnostiq/covalent/doc/source/how_to/coding/my_dest_file
Reading from /Users/mini-me/agnostiq/covalent/doc/source/how_to/coding/my_dest_file
Mares eat oats and does eat oats
And little lambs eat ivy ...