Triggers
Execute a workflow triggered by a specific type of event
Classes:
BaseTrigger
([lattice_dispatch_id, …])
Base class to be subclassed by any custom defined trigger.
DirTrigger
(dir_path, event_names[, …])
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
TimeTrigger
(time_gap[, lattice_dispatch_id, …])
Performs a trigger action every time_gap seconds.
Bases: object
Base class to be subclassed by any custom defined trigger. Implements all the necessary methods used for interacting with dispatches, including getting their statuses and performing a redispatch of them whenever the trigger gets triggered.
Parameters
lattice_dispatch_id (Optional
[str
]) – Dispatch ID of the workflow which has to be redispatched in case this trigger gets triggered
dispatcher_addr (Optional
[str
]) – Address of dispatcher server used to retrieve info about or redispatch any dispatches
triggers_server_addr (Optional
[str
]) – Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
self.lattice_dispatch_id
Dispatch ID of the worfklow which has to be redispatched in case this trigger gets triggered
self.dispatcher_addr
Address of dispatcher server used to retrieve info about or redispatch any dispatches
self.triggers_server_addr
Address of the Triggers server (if there is any) to register this trigger to, uses the dispatcher’s address by default
self.new_dispatch_ids
List of all the newly created dispatch ids from performing redispatch
self.observe_blocks
Boolean to indicate whether the self.observe method is a blocking call
self.event_loop
Event loop to be used if directly calling dispatcher’s functions instead of the REST APIs
self.use_internal_funcs
Boolean indicating whether to use dispatcher’s functions directly instead of through API calls
self.stop_flag
To handle stopping mechanism in a thread safe manner in case self.observe() is a blocking call (e.g. see TimeTrigger)
Methods:
observe
()
Start observing for any change which can be used to trigger this trigger.
register
()
Register this trigger to the Triggers server and start observing.
stop
()
Stop observing for changes.
to_dict
()
Return a dictionary representation of this trigger which can later be used to regenerate it.
trigger
()
Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow.
Start observing for any change which can be used to trigger this trigger. To be implemented by the subclass.
Return a dictionary representation of this trigger which can later be used to regenerate it.
Returns
Dictionary representation of this trigger
Return Type
tr_dict
Trigger this trigger and perform a redispatch of the connected dispatch id’s workflow. Should be called within self.observe() whenever a trigger action is desired.
Raises
RuntimeError – In case no dispatch id is connected to this trigger
Return Type
None
Bases: covalent.triggers.base.BaseTrigger
Directory or File based trigger which watches for events in said file/dir and performs a trigger action whenever they happen.
Parameters
dir_path – Dispatch ID of the workflow which has to be redispatched in case this trigger gets triggered
event_names – List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
batch_size (int
) – The number of changes to wait for before performing the trigger action, default is 1.
recursive (bool
) – Whether to recursively watch the directory, default is False.
self.dir_path
Path to the file/dir which is to be observed for events
self.event_names
'List of event names on which to perform the trigger action. Possible options can be a subset of: [“created”, “deleted”, “modified”, “moved”, “closed”].
self.batch_size
The number of events to wait for before performing the trigger action, default is 1.
self.recursive
Whether to recursively watch the directory, default is False.
self.n_changes
Number of events since last trigger action. Whenever self.n_changes == self.batch_size a trigger action happens.
Methods:
Dynamically attaches and overrides the “on_*” methods to the handler depending on which ones are requested by the user.
observe
()
Start observing the file/dir for any possible events among the ones mentioned in self.event_names.
stop
()
Stop observing the file or directory for changes.
Dynamically attaches and overrides the “on**” methods to the handler depending on which ones are requested by the user.
Parameters
event_names – List of event names upon which to perform a trigger action
Return Type
None
Start observing the file/dir for any possible events among the ones mentioned in self.event_names. Currently only supports running within the Covalent/Triggers server.
Return Type
None
Bases: covalent.triggers.base.BaseTrigger
Performs a trigger action every time_gap seconds.
Parameters
time_gap(int
) – Amount of seconds to wait before doing a trigger action
self.time_gap
Amount of seconds to wait before doing a trigger action
self.stop_flag
NThread safe flag used to check whether the stop condition has been met
Methods:
observe
()
Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
stop
()
Stop the running self.observe() method by setting the self.stop_flag flag.
Keep performing the trigger action every self.time_gap seconds until stop condition has been met.
Return Type
None
Stop the running self.observe() method by setting the self.stop_flag flag.
Return Type
None