Skip to main content

Adding a Database Trigger to a Lattice GoToimage

This example illustrates how to use covalent.trigger.DatabaseTrigger to trigger the workflow dispatches automatically after the successful execution of table reads with the conditions for N number of times.

Prerequisites

  1. Install the recommended SQL drivers that support SQLAlchemy.
  2. Create an environment variable named COVALENT_DATABASE_URL and set the desired database file or URL. For the PostgreSQL instance, the database connection URL will be similar to the below code snippet,
export COVALENT_DATABASE_URL=postgresql+pg8000://<user name>:<pwd>@<host>:<port>/<DB>
  1. To migrate tables, use covalent db migrate to create the required tables in the mentioned database.
  2. Then start covalent using covalent start. Now, the covalent server points to the new database.
  3. Import the Covalent and the trigger:
import covalent as ct
from covalent.triggers import DatabaseTrigger

Procedure

  1. Create a new table test_db_trigger.

db_path = "postgresql+pg8000://postgres:sam@localhost:5432/aqdb"
table_name = 'test_db_trigger'

#create table

from sqlalchemy import Table, Column, MetaData, DateTime, create_engine
meta = MetaData()
engine = create_engine(db_path, echo=True)
test_db_trigger = Table(
table_name, meta,
Column('trigger_at', DateTime, primary_key = True),
)
meta.create_all(engine)

2023-10-04 08:34:30,374 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2023-10-04 08:34:30,377 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-04 08:34:30,385 INFO sqlalchemy.engine.Engine select current_schema()
2023-10-04 08:34:30,389 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-04 08:34:30,391 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2023-10-04 08:34:30,392 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-04 08:34:30,397 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-04 08:34:30,405 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2023-10-04 08:34:30,406 INFO sqlalchemy.engine.Engine [generated in 0.00120s] ('test_db_trigger',)
2023-10-04 08:34:30,410 INFO sqlalchemy.engine.Engine
CREATE TABLE test_db_trigger (
trigger_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
PRIMARY KEY (trigger_at)
)


2023-10-04 08:34:30,411 INFO sqlalchemy.engine.Engine [no key 0.00107s] ()
2023-10-04 08:34:30,448 INFO sqlalchemy.engine.Engine COMMIT
  1. Load sample data into the newly created table
# load sample data.
from sqlalchemy import insert
from datetime import datetime
with engine.connect() as conn:
values = [{"trigger_at": datetime.now()} for _ in range(10)]
result = conn.execute(insert(test_db_trigger),[*values])
2023-10-04 08:34:33,407 INFO sqlalchemy.engine.Engine INSERT INTO test_db_trigger (trigger_at) VALUES (%s)
2023-10-04 08:34:33,409 INFO sqlalchemy.engine.Engine [generated in 0.00208s] ((datetime.datetime(2023, 10, 4, 14, 4, 33, 406624),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406636),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406638),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406639),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406641),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406643),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406645),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406646),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406648),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406650),))
2023-10-04 08:34:33,423 INFO sqlalchemy.engine.Engine COMMIT
/tmp/ipykernel_146652/3174457459.py:6: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
result = conn.execute(insert(test_db_trigger),[*values])#{"trigger_at": trigger_at}
  1. Create a Database Trigger object that performs a trigger. We can parse parameters such as db_path, table_name, trigger_after_n, and poll_interval. For this illustration, we will use the PostgreSQL database.
database_trigger = DatabaseTrigger(db_path='postgresql+pg8000://postgres:sam@localhost:5432/aqdb',
table_name=table_name,
trigger_after_n=2,
poll_interval=3)
  1. Create a workflow:
@ct.lattice
@ct.electron
def my_workflow():
return 42
  1. Dispatch my_workflow, disabling its first execution using the disable_run parameter in ct.dispatch.
dispatch_id = ct.dispatch(my_workflow)()
print(dispatch_id)
de35492d-4f51-473e-b814-ad203939f85a
  1. Attach the trigger to the dispatch_id and register it with the trigger server with the where clause to filter dispatches with lattice name my_workflow.
database_trigger.lattice_dispatch_id = dispatch_id
triggered_at = values[-1]["trigger_at"]
database_trigger.where_clauses = [f"trigger_at = '{str(triggered_at)}'"]
database_trigger.register()
  1. Monitor the Covalent UI. Watch the Dashboard for new dispatches of my_workflow.

  2. In the Covalent UI, observe that a new my_workflow is dispatched after reading the table two times and with a polling interval of 3 seconds.

  3. To disable triggers on the dispatch, use the ct.stop_triggers function and drop the test_db_trigger table.

ct.stop_triggers(dispatch_id)
meta.drop_all(engine, tables=[test_db_trigger], checkfirst=True)
[2023-10-04 08:36:10,622] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:
[2023-10-04 08:36:10,627] [DEBUG] local.py: Line 336 in stop_triggers: de35492d-4f51-473e-b814-ad203939f85a
2023-10-04 08:36:10,630 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-04 08:36:10,633 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2023-10-04 08:36:10,637 INFO sqlalchemy.engine.Engine [cached since 100.2s ago] ('test_db_trigger',)
2023-10-04 08:36:10,643 INFO sqlalchemy.engine.Engine
DROP TABLE test_db_trigger
2023-10-04 08:36:10,645 INFO sqlalchemy.engine.Engine [no key 0.00186s] ()
2023-10-04 08:36:11,073 INFO sqlalchemy.engine.Engine COMMIT

Note that the stop_triggers function disables all triggers attached to the specified dispatch.

See Also