Skip to main content

Automate Repetitive Tasks with Triggers

Triggers are a powerful feature in Covalent that allow you to automate repetitive tasks and streamline your workflow. With these, you can define a pre-defined set of steps that will run automatically every time a specific event occurs.

To use Triggers, you simply need to attach a Trigger object to a lattice. Then, every time the event described in the trigger occurs, the connected lattice will perform a trigger action and dispatch the connected workflow. This makes it easy to automate processes, reducing the risk of human error and ensuring that your pipeline runs smoothly and efficiently.

For example, if you want to plot a graph of a CSV file every time it gets modified, you can use these Triggers to automate this process. The trigger will be watching the CSV file for changes, and every time the file is modified, it will run the workflow to plot a graph of the data.

Triggers are especially useful if you’re using Covalent as part of a larger pipeline, rather than as a user-facing tool. By automating these tasks, you can save time, reduce the risk of error, and ensure that your pipeline runs smoothly and efficiently.

Using Triggers

Covalent provides several options for starting the server in relation to triggers. By default, the Covalent server starts with the triggers server endpoints included.

Note

It is also possible to start the Covalent server without the triggers endpoints and manage the observe() method of the triggers manually, or start the standalone triggers server without Covalent.

The following code block showcases the three different start options:

# Starting the default way which starts with the triggers server endpoints as part of Covalent server

covalent start

# Starting the Covalent server without the trigger endpoints, thus in order to use triggers you will have
# either have to start the triggers server independently or manage the observe() method of triggers manually

covalent start --no-triggers

# Starting the standalone triggers server without Covalent, this is useful if your Covalent server
# is running on a different machine than the triggers server

covalent start --triggers-only

For the purpose of this example, let’s assume you started Covalent the default way.

You can attach a Trigger object to a lattice quite simply as shown below:

...
tr_object = TimeTrigger(5)
@ct.lattice(triggers=tr_object):
def my_workflow():
...

Under the hood, once this is done and when you dispatch the lattice using ct.dispatch, the following events occur:

  • The first run of the lattice is disabled, and Covalent only saves the lattice and generates a dispatch_id for reference later.

  • The Trigger object is registered on the triggers server, which is the same as the Covalent server by default.

  • Upon registration, the observe() method of the trigger is called, which starts observing for the desired condition to be met in an unblocking manner. In the example above, the TimeTrigger with a time gap of 5 seconds will call the trigger() method every 5 seconds.

  • At this point, ct.dispatch now returns with the earlier generate dispatch_id.

  • The trigger() method, whenever it’s called, performs an automatic dispatch of the connected lattice using the dispatch_id obtained earlier, and stores the newly obtained dispatch_ids for connections between the “parent” and subsequent “child” dispatch_ids.

Once a trigger is started, to stop the automatic dispatching when an event happens, you can call ct.stop_triggers(dispatch_id) with the parent dispatch id dispatch_id.

Attaching a Trigger to a Dispatched Workflow

Another case which might be useful here is let’s say you want to attach a trigger to a workflow which has already been dispatched, and you only have access to its dispatch_id, then in that case you can do the following:

tr_object = TimeTrigger(10)
tr_object.lattice_dispatch_id = dispatch_id
tr_object.register()

This way of attaching a trigger is equivalent to the one mentioned before, but gives more degrees of freedom. For example, you can register the same trigger to multiple workflows by just repeating the last two lines for each of them. This method also eliminates the need to design workflows with the trigger in mind, disentangling the trigger creation code from the actual workflow code. And in fact, since a trigger can be set post the workflow creation, this method can be used to attach a trigger from an entirely different Python process than the one where the workflow was created.

Note

In case you already know that you are going to be attaching a trigger to a workflow post-dispatch and don’t wish to run it the first time or until a trigger event takes place, then while dispatching it you can do ct.dispatch(my_workflow, disable_run=True)() and it won’t start running but will still generate a dispatch_id which you can later use.

Attaching Triggers to Workflows on Remote Servers

Another way to attach triggers to workflows that have already been dispatched is by utilizing the dispatch_id and the address of both the Covalent server and the triggers server. This is useful in scenarios where the trigger should be managed from a separate machine.

For example, let’s consider a scenario where there are 3 machines: 2 remote servers and 1 client machine. ServerA is the one where Covalent is running without triggers support, ServerB where only the triggers server is running, and Client is the one where you are working from.

Let’s say our workflow my_workflow has been dispatched to ServerA without any triggers. To attach triggers to that workflow and register it with the triggers server, you can follow the steps given below:

Note

When using triggers remotely, make sure to set trigger.use_internal_funcs = False this will ensure that the trigger interacts with the Covalent server through the API endpoints instead of directly accessing the required internal functions.

trigger = TimeTrigger(30)
# Interacting with dispatcher server through API endpoints
trigger.use_internal_funcs = False
# Attaching dispatch id of `my_workflow` to the trigger
trigger.lattice_dispatch_id = dispatch_id
# Specifying the address of the dispatcher server
trigger.dispatcher_addr = "<ServerA_addr>"
# Specifying the address of the triggers server
trigger.triggers_server_addr = "<ServerB_addr>"
# Registering it to the triggers server
trigger.register()

And this will be sufficient for your workflow to get dispatched every 30 seconds due to this trigger.

Adding a Trigger without Registering it to the Triggers Server

You can also run the observation component of a trigger as part of your own server, without registering it with the triggers server. For example, if you have a long-running process on a server, you can call the trigger.observe() function to start observing, as follows:

trigger = TimeTrigger(2)
trigger.use_internal_funcs = False
trigger.lattice_dispatch_id = dispatch_id
trigger.dispatcher_addr = `<ServerA_addr>`
# And now start observing
trigger.observe()

Keep in mind that it’s important to handle the blocking/non-blocking nature of the trigger.observe() function correctly. If it’s a blocking call, it’s recommended to offload trigger.observe() to a separate thread so it doesn’t block the execution of other components of your server. You can check if trigger.observe() is blocking by accessing the trigger.observe_blocks attribute of any trigger.

This becomes extremely useful when writing custom triggers, for example to trigger workflows off of email/slack messages. The ability to run trigger.observe() as part of your own server or process opens up a world of possibilities to integrate triggers into your workflow in a way that best suits your use case.

Types of Triggers in Covalent

Covalent offers an array of triggers designed to cater to diverse use cases, simplifying the automation of tasks based on a range of conditions. It’s important to note that this list represents the currently available triggers, with more to be added in the future. If you find these triggers valuable and have suggestions for new ones, we encourage you to contribute to Covalent’s GitHub repository.

Here are the currently available triggers in Covalent:

  1. DirTrigger: This trigger observes a specified directory or file for events such as creation, deletion, modification, or movement. It performs the trigger action when these events occur. For example:
from covalent.triggers import DirTrigger
import covalent as ct

dir_trigger = DirTrigger(dir_path='path/to/your/directory', event_names=['modified'])

@ct.lattice(triggers=dir_trigger)
def my_workflow():
...
  1. TimeTrigger: This trigger performs the trigger action after a specified time interval. It is useful for recurring tasks or periodic data processing. For example:
from covalent.triggers import TimeTrigger
import covalent as ct

time_trigger = TimeTrigger(time_gap=5) # Trigger action every 5 seconds

@ct.lattice(triggers=time_trigger)
def my_workflow():
...
  1. SQLiteTrigger: This trigger monitors an SQLite database for changes and performs the trigger action when changes occur. It is helpful for automating tasks in response to database updates. For example:
from covalent.triggers import SQLiteTrigger
import covalent as ct

sqlite_trigger = SQLiteTrigger(db_path='path/to/your/database.sqlite',table_name='your_table)

@ct.lattice(triggers=sqlite_trigger)
def my_workflow():
...
  1. DatabaseTrigger: This trigger monitors the database for changes and performs the trigger action when changes occur. It is helpful for automating tasks in response to database updates. For example:
from covalent.triggers import DatabaseTrigger
import covalent as ct
database_trigger = DatabaseTrigger(db_path="db path",table_name='table name')
@ct.lattice(triggers=database_trigger)
def my_workflow():

These triggers can be easily integrated into your Covalent workflows to automate various tasks based on the desired conditions.

Trigger How-to Guides

For further examples on how to use triggers, check out the Trigger how to guides: