Skip to main content

Triggers

Execute a workflow triggered by a specific type of event

Classes:

BaseTrigger([lattice_dispatch_id, …])

Base class to be subclassed by any custom defined trigger.

DirTrigger(dir_path, event_names[, …])

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

TimeTrigger(time_gap[, lattice_dispatch_id, …])

Performs a trigger action every time_gap seconds.

class covalent.triggers.BaseTrigger

#

Bases: object

Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.

Parameters

lattice_dispatch_id (Optional[str]) – Dispatch ID of the workflow which has to be redispatched in case this trigger gets triggered

dispatcher_addr (Optional[str]) – Address of dispatcher server used to retrieve info about or redispatch any dispatches

triggers_server_addr (Optional[str]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.lattice_dispatch_id

Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered

self.dispatcher_addr

Address of dispatcher server used to retrieve info about or redispatch any dispatches

self.triggers_server_addr

Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default

self.new_dispatch_ids

List of all the newly created dispatch ids from performing redispatch

self.observe_blocks

Boolean to indicate whether the self.observe method is a blocking call

self.event_loop

Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs

self.use_internal_funcs

Boolean indicating whether to use dispatcher’s functions directly instead of through API calls

self.stop_flag

To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)

Methods:

Start observing for any change which can be used to trigger this trigger.

Register this trigger to the Triggers server and start observing.

Stop observing for changes.

Return a dictionary representation of this trigger which can later be used to regenerate it.

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.

abstract observe

#

Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.

register

#

Register this trigger to the Triggers server and start observing.

Return Type

None

abstract stop

#

Stop observing for changes. To be implemented by the subclass.

to_dict

#

Return a dictionary representation of this trigger which can later be used to regenerate it.

Returns

Dictionary representation of this trigger

Return Type

tr_dict

trigger

#

Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow. Should be called within self.observe() whenever a trigger action is desired.

Raises

RuntimeError – In case no dispatch id is connected to this trigger

Return Type

None

class covalent.triggers.DirTrigger

#

Bases: covalent.triggers.base.BaseTrigger

Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.

Parameters

dir_path – Dispatch ID of the workflow which has to be redispatched in case this trigger gets triggered

event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].

batch_size (int) – The number of changes to wait for before performing the trigger action, default is 1.

recursive (bool) – Whether to recursively watch the directory, default is False.

self.dir_path

Path to the file/dir which is to be observed for events

self.event_names

'List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].

self.batch_size

The number of events to wait for before performing the trigger action, default is 1.

self.recursive

Whether to recursively watch the directory, default is False.

self.n_changes

Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.

Methods:

Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.

Start observing the file/dir for any possible events among the ones mentioned in self.event_names.

Stop observing the file or directory for changes.

attach_methods_to_handler

#

Dynamically attaches and overrides the “on**” methods to the handler depending on which ones are requested by the user.

Parameters

event_names – List of event names upon which to perform a trigger action

Return Type

None

observe

#

Start observing the file/dir for any possible events among the ones mentioned in self.event_names. Currently only supports running within the Covalent/Triggers server.

Return Type

None

stop

#

Stop observing the file or directory for changes.

Return Type

None

class covalent.triggers.TimeTrigger

#

Bases: covalent.triggers.base.BaseTrigger

Performs a trigger action every time_gap seconds.

Parameters

time_gap(int) – Amount of seconds to wait before doing a trigger action

self.time_gap

Amount of seconds to wait before doing a trigger action

self.stop_flag

NThread safe flag used to check whether the stop condition has been met

Methods:

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

Stop the running self.observe() method by setting the self.stop_flag flag.

observe

#

Keep performing the trigger action every self.time_gap seconds until stop condition has been met.

Return Type

None

stop

#

Stop the running self.observe() method by setting the self.stop_flag flag.

Return Type

None

Examples