Re-executing a Workflow
Use the Covalent redispatch
command to re-execute a workflow with:
- New input parameters
- Updated or replacement task definitions
Optionally, you can reuse previous results as well.
Prerequisites
Run the Covalent server.
Procedure
Construct a workflow.
import covalent as ct
import covalent._dispatcher_plugins
@ct.electron
def task_0(a):
return a
@ct.electron
def task_1(a, b):
return a + b
@ct.electron
def task_2(a, b):
return a / b
# For demonstrating redispatch
@ct.electron
def task_2_redefined(a, b):
return a * b
@ct.lattice
def workflow(a, b):
res_0 = task_0(a)
res_1 = task_1(res_0, b)
res_2 = task_2(res_1, b)
return res_2
- Dispatch the workflow.
dispatch_id = ct.dispatch(workflow)(1, 2) # Input parameters are a=1, b=2
print(f"Dispatch id: {dispatch_id}")
result = ct.get_result(dispatch_id, wait=True)
print(f"Workflow execution status: {result.status}")
print(f"Workflow output: {result.result}")
Dispatch id: f6553e74-86dc-4ed7-9f38-6c3c71b6ebb7
Workflow execution status: COMPLETED
Workflow output: 1.5
- Use the Covalent
redispatch
function to redispatch the workflow, replacing one or more of the tasks. (One task is replaced in these demos. In practice you can re-execute any number of tasks.)
Three cases are illustrated below:
- Using the previous input parameters.
- Reusing (where possible) previous task results.
- Using new input parameters.
1. Using the previous input parameters
Leave the input parameters empty to dispatch the workflow with the previous input parameters.
redispatch_id = ct.redispatch(
dispatch_id,
replace_electrons={"task_2": task_2_redefined}
)()
print(f"Redispatch id: {redispatch_id}")
result = ct.get_result(redispatch_id, wait=True)
print(f"Redispatched workflow execution status: {result.status}")
print(f"Redispatched workflow output: {result.result}")
Redispatch id: 3c3be52a-4ee7-4b17-9732-c12334530839
Redispatched workflow execution status: COMPLETED
Redispatched workflow output: 6
2: Reusing the previous task results
Set the keyword argument reuse_previous_results
to True
to reuse computed results wherever possible. Thereuse_previous_results
flag defaults to False
, so you must explicitly set it to True
to reuse results.
Warning: When the workflow involves stochastic results (values that are randomly generated or computed from randomly generated values), do not set reuse_previous_results
to True
.
redispatch_id = ct.redispatch(
dispatch_id,
replace_electrons={"task_2": task_2_redefined},
reuse_previous_results=True
)()
print(f"Redispatch id: {redispatch_id}")
result = ct.get_result(redispatch_id, wait=True)
print(f"Redispatched workflow execution status: {result.status}")
print(f"Redispatched workflow output: {result.result}")
Redispatch id: 52230138-e415-44c8-bd4c-f4986beb0601
Redispatched workflow execution status: COMPLETED
Redispatched workflow output: 6
The following code compares the start and end times times of the reused results to demonstrate that the tasks were not re-run.
# If previous results were reused to compute res_0 and res_1, execution time of the tasks should be 0
# and the following two statements should be `True`.
print(result.get_node_result(0)["start_time"] == result.get_node_result(0)["end_time"])
print(result.get_node_result(2)["start_time"] == result.get_node_result(2)["end_time"])
True
True
3. Using new input parameters
To evaluate the workflow with new input parameters, pass a new set of parameters in the ct.dispatch
command.
redispatch_id = ct.redispatch(
dispatch_id,
replace_electrons={"task_2": task_2_redefined},
reuse_previous_results=True
)(1, 4) # Input parameters are a=1 (same as before), b=4 (different)
print(f"Redispatch id: {redispatch_id}")
result = ct.get_result(redispatch_id, wait=True)
print(f"Redispatched workflow execution status: {result.status}")
print(f"Redispatched workflow output: {result.result}")
Redispatch id: 57ab8b01-e6ee-454e-bd45-9122d750df33
Redispatched workflow execution status: COMPLETED
Redispatched workflow output: 20
With reuse_previous_results
set to True
, only results that depend on the new input parameters are recomputed. In this case res_0
is reused; res_1
is recomputed since one of its arguments, b
, is changed.
Note that res_0
is reused based on a comparison of its arguments: Covalent doesn't just recompute the results because the arguments were resubmitted. Instead, it sees that the resubmitted argument a
is identical and thus correctly concludes that there is no need to recompute res_0
.
# res_0 was reused; res_1 was recomputed since the value of b changed.
print(result.get_node_result(0)["start_time"] == result.get_node_result(0)["end_time"])
print(result.get_node_result(2)["start_time"] == result.get_node_result(2)["end_time"])
True
False