Skip to main content

How to add a time trigger to a lattice GoToimage

In this guide we'll illustrate how to use a TimeTrigger to trigger workflow runs automatically every x seconds.

Let's first import the required parts:

import covalent as ct
from covalent.triggers import TimeTrigger

Now, let's create our TimeTrigger object which performs a trigger action every 5 seconds.

time_trigger = TimeTrigger(time_gap=5)

Let's create a simple workflow now:

@ct.lattice
@ct.electron
def my_workflow():
return 42

Once we've made sure that the covalent server is running, we can perform the dispatch for my_workflow as such, but this time we will be disabling the execution of this lattice for the first time using disable_run parameter in ct.dispatch:

dispatch_id = ct.dispatch(my_workflow, disable_run=True)()
print(dispatch_id)
372930b7-f81a-4e91-9302-236de8a0a9c0

Now let's attach that trigger to this dispatch_id and register it with the triggers server.

time_trigger.lattice_dispatch_id = dispatch_id
time_trigger.register()

And that's it, as you can see this was another way of attaching triggers to a workflow, which is slightly different than how we did it in the DirTrigger How To Guide.

Also, now if you check the UI you'll see that a new my_workflow gets dispatched every 5 seconds. If we want to stop that after a while, we can use the ct.stop_triggers function:

ct.stop_triggers(dispatch_id)
Triggers for following dispatch_ids have stopped observing:
372930b7-f81a-4e91-9302-236de8a0a9c0

The above will prevent any new dispatches from happening due to the trigger action on my_workflow lattice.

See Also