Adding a Database Trigger to a Lattice
This example illustrates how to use covalent.trigger.DatabaseTrigger
to trigger the workflow dispatches automatically after the successful execution of table reads with the conditions for N number of times.
Prerequisites
- Install the recommended SQL drivers that support SQLAlchemy.
- Create an environment variable named
COVALENT_DATABASE_URL
and set the desired database file or URL. For the PostgreSQL instance, the database connection URL will be similar to the below code snippet,
export COVALENT_DATABASE_URL=postgresql+pg8000://<user name>:<pwd>@<host>:<port>/<DB>
- To migrate tables,
use covalent db migrate
to create the required tables in the mentioned database. - Then start covalent using
covalent start
. Now, the covalent server points to the new database. - Import the Covalent and the trigger:
import covalent as ct
from covalent.triggers import DatabaseTrigger
Procedure
- Create a new table
test_db_trigger
.
db_path = "postgresql+pg8000://postgres:sam@localhost:5432/aqdb"
table_name = 'test_db_trigger'
#create table
from sqlalchemy import Table, Column, MetaData, DateTime, create_engine
meta = MetaData()
engine = create_engine(db_path, echo=True)
test_db_trigger = Table(
table_name, meta,
Column('trigger_at', DateTime, primary_key = True),
)
meta.create_all(engine)
2023-10-04 08:34:30,374 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2023-10-04 08:34:30,377 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-04 08:34:30,385 INFO sqlalchemy.engine.Engine select current_schema()
2023-10-04 08:34:30,389 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-04 08:34:30,391 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2023-10-04 08:34:30,392 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-04 08:34:30,397 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-04 08:34:30,405 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2023-10-04 08:34:30,406 INFO sqlalchemy.engine.Engine [generated in 0.00120s] ('test_db_trigger',)
2023-10-04 08:34:30,410 INFO sqlalchemy.engine.Engine
CREATE TABLE test_db_trigger (
trigger_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
PRIMARY KEY (trigger_at)
)
2023-10-04 08:34:30,411 INFO sqlalchemy.engine.Engine [no key 0.00107s] ()
2023-10-04 08:34:30,448 INFO sqlalchemy.engine.Engine COMMIT
- Load sample data into the newly created table
# load sample data.
from sqlalchemy import insert
from datetime import datetime
with engine.connect() as conn:
values = [{"trigger_at": datetime.now()} for _ in range(10)]
result = conn.execute(insert(test_db_trigger),[*values])
2023-10-04 08:34:33,407 INFO sqlalchemy.engine.Engine INSERT INTO test_db_trigger (trigger_at) VALUES (%s)
2023-10-04 08:34:33,409 INFO sqlalchemy.engine.Engine [generated in 0.00208s] ((datetime.datetime(2023, 10, 4, 14, 4, 33, 406624),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406636),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406638),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406639),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406641),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406643),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406645),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406646),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406648),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406650),))
2023-10-04 08:34:33,423 INFO sqlalchemy.engine.Engine COMMIT
/tmp/ipykernel_146652/3174457459.py:6: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
result = conn.execute(insert(test_db_trigger),[*values])#{"trigger_at": trigger_at}
- Create a
Database Trigger
object that performs a trigger. We can parse parameters such asdb_path
,table_name
,trigger_after_n
, andpoll_interval
. For this illustration, we will use thePostgreSQL
database.
database_trigger = DatabaseTrigger(db_path='postgresql+pg8000://postgres:sam@localhost:5432/aqdb',
table_name=table_name,
trigger_after_n=2,
poll_interval=3)
- Create a workflow:
@ct.lattice
@ct.electron
def my_workflow():
return 42
- Dispatch
my_workflow
, disabling its first execution using thedisable_run
parameter inct.dispatch
.
dispatch_id = ct.dispatch(my_workflow)()
print(dispatch_id)
de35492d-4f51-473e-b814-ad203939f85a
- Attach the trigger to the
dispatch_id
and register it with the trigger server with the where clause to filter dispatches with lattice namemy_workflow
.
database_trigger.lattice_dispatch_id = dispatch_id
triggered_at = values[-1]["trigger_at"]
database_trigger.where_clauses = [f"trigger_at = '{str(triggered_at)}'"]
database_trigger.register()
Monitor the Covalent UI. Watch the Dashboard for new dispatches of
my_workflow
.In the Covalent UI, observe that a new
my_workflow
is dispatched after reading the table two times and with a polling interval of 3 seconds.To disable triggers on the dispatch, use the
ct.stop_triggers
function and drop thetest_db_trigger
table.
ct.stop_triggers(dispatch_id)
meta.drop_all(engine, tables=[test_db_trigger], checkfirst=True)
[2023-10-04 08:36:10,622] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:
[2023-10-04 08:36:10,627] [DEBUG] local.py: Line 336 in stop_triggers: de35492d-4f51-473e-b814-ad203939f85a
2023-10-04 08:36:10,630 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-04 08:36:10,633 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s
2023-10-04 08:36:10,637 INFO sqlalchemy.engine.Engine [cached since 100.2s ago] ('test_db_trigger',)
2023-10-04 08:36:10,643 INFO sqlalchemy.engine.Engine
DROP TABLE test_db_trigger
2023-10-04 08:36:10,645 INFO sqlalchemy.engine.Engine [no key 0.00186s] ()
2023-10-04 08:36:11,073 INFO sqlalchemy.engine.Engine COMMIT
Note that the stop_triggers
function disables all triggers attached to the specified dispatch.