Triggers
Execute a workflow triggered by a specific type of event
Classes:
BaseTrigger([lattice_dispatch_id, …])
Base class to be subclassed by any custom defined trigger.
DirTrigger(dir_path, event_names[, …])
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
TimeTrigger(time_gap[, lattice_dispatch_id, …])
Performs a trigger action every time_gap seconds.
Bases: object
Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.
Parameters
lattice_dispatch_id (Optional[str]) – Dispatch ID of the workflow which has to be redispatched in case this trigger gets triggered
dispatcher_addr (Optional[str]) – Address of dispatcher server used to retrieve info about or redispatch any dispatches
triggers_server_addr (Optional[str]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
self.lattice_dispatch_id
Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered
self.dispatcher_addr
Address of dispatcher server used to retrieve info about or redispatch any dispatches
self.triggers_server_addr
Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
self.new_dispatch_ids
List of all the newly created dispatch ids from performing redispatch
self.observe_blocks
Boolean to indicate whether the self.observe method is a blocking call
self.event_loop
Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs
self.use_internal_funcs
Boolean indicating whether to use dispatcher’s functions directly instead of through API calls
self.stop_flag
To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)
Methods:
observe()
Start observing for any change which can be used to trigger this trigger.
register()
Register this trigger to the Triggers server and start observing.
stop()
Stop observing for changes.
to_dict()
Return a dictionary representation of this trigger which can later be used to regenerate it.
trigger()
Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.
Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.
Return a dictionary representation of this trigger which can later be used to regenerate it.
Returns
Dictionary representation of this trigger
Return Type
tr_dict
Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow. Should be called within self.observe() whenever a trigger action is desired.
Raises
RuntimeError – In case no dispatch id is connected to this trigger
Return Type
None
Bases: covalent.triggers.base.BaseTrigger
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
Parameters
dir_path – Dispatch ID of the workflow which has to be redispatched in case this trigger gets triggered
event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
batch_size (int) – The number of changes to wait for before performing the trigger action, default is 1.
recursive (bool) – Whether to recursively watch the directory, default is False.
self.dir_path
Path to the file/dir which is to be observed for events
self.event_names
'List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
self.batch_size
The number of events to wait for before performing the trigger action, default is 1.
self.recursive
Whether to recursively watch the directory, default is False.
self.n_changes
Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.
Methods:
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
observe()
Start observing the file/dir for any possible events among the ones mentioned in self.event_names.
stop()
Stop observing the file or directory for changes.
Dynamically attaches and overrides the “on**” methods to the handler depending on which ones are requested by the user.
Parameters
event_names – List of event names upon which to perform a trigger action
Return Type
None
Start observing the file/dir for any possible events among the ones mentioned in self.event_names. Currently only supports running within the Covalent/Triggers server.
Return Type
None
Bases: covalent.triggers.base.BaseTrigger
Performs a trigger action every time_gap seconds.
Parameters
time_gap(int) – Amount of seconds to wait before doing a trigger action
self.time_gap
Amount of seconds to wait before doing a trigger action
self.stop_flag
NThread safe flag used to check whether the stop condition has been met
Methods:
observe()
Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
stop()
Stop the running self.observe() method by setting the self.stop_flag flag.
Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
Return Type
None
Stop the running self.observe() method by setting the self.stop_flag flag.
Return Type
None