Workflows are a simple yet powerful construct that takes a callable and returns elements. They are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.
This article will show how workflows can be used to push notifications upon certain event triggers. Using this method, an activity feed of content can be created.
Install dependencies
Install txtai
and all dependencies.
pip install txtai[pipeline,workflow]
Create workflow notification action
Workflows run a series of tasks to transform and process data. This section creates a callable object that can be used as a workflow action.
The action below pushes events to Slack. While Slack is used here, any notification service can easily be substituted in (Zapier, IFTT etc).
It is assumed there is a Slack workspace and application installed and ready to use. See this comprehensive example for more information on setting up a new Slack app and posting messages via the API.
The channel id can be found from the Slack web interface. Log into Slack and click on the channel where messages will be posted. The channel id
is the last part of the URL.
https://app.slack.com/client/<team id>/<channel id>
import logging
import requests
# Uncomment and set. The following are dummy parameters. Your parameters should not be publicly shared!
# AUTH = "xoxb-not-a-real-token-this-will-not-work"
# CHANNEL = "C0XXXXXXXXX"
# URL = "https://slack.com/api/chat.postMessage"
# Logging configuration
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Slack:
def __init__(self):
self.alerts = set()
def __call__(self, elements):
for alert in elements:
uid, text, _ = self.extract(alert)
if uid not in self.alerts:
logger.info("Sending alert: %s", alert)
self.alerts.add(uid)
headers = {
"Content-type": "application/json",
"Authorization": f"Bearer {AUTH}"
}
response = requests.post(URL, headers=headers, json={
"channel": CHANNEL,
"text": f"{text} {uid}"
}).json()
if not response["ok"]:
logger.error(response)
return elements
def extract(self, alert):
if isinstance(alert, dict):
return (alert["id"], alert["text"], None)
return alert
Build a semantic notification workflow
Next we'll create a notification workflow. The example below indexes the top trending Hacker News articles and pushes an alert when an article matches an embeddings query for software development library
.
from txtai.app import Application
workflow = """
writable: true
embeddings:
path: sentence-transformers/nli-mpnet-base-v2
content: true
tabular:
idcolumn: url
textcolumns:
- title
__main__.Slack:
workflow:
index:
schedule:
cron: "* * * * * 0/5"
elements:
- front_page
iterations: 1
tasks:
- batch: false
extract:
- hits
method: get
params:
tags: null
task: service
url: https://hn.algolia.com/api/v1/search?hitsPerPage=50
- action: tabular
- action: upsert
alert:
schedule:
cron: 0/1 * * * *
elements:
- select id, text, score from txtai where similar('software development library') and score >= 0.4 and id like 'http%'
iterations: 1
tasks:
- action: search
- action: __main__.Slack
"""
app = API(workflow)
app.wait()
2022-02-10 15:12:42,838 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 15:12:42,839 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 15:12:42,843 [INFO] schedule: 'index' next run scheduled for 2022-02-10T15:12:45+00:00
2022-02-10 15:12:42,851 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T15:13:00+00:00
2022-02-10 15:12:45,884 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 15:13:00,042 [INFO] __call__: Sending alert: {'id': 'https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', 'text': 'The world of PostgreSQL wire compatibility', 'score': 0.40123000741004944}
2022-02-10 15:13:00,254 [INFO] schedule: 'alert' max iterations (1) reached
The log above shows the indexing and alerting jobs each ran once. There was a single match and it was sent to Slack. The score threshold of 0.4 is relatively low, it can be raised if more strict matches are desired.
Build a notification workflow using SQL
The next example is similar but it instead runs a SQL search using another column.
This workflow indexes the top trending sports events as identified by neuspo. An alert is generated when an excitement factor of 40 or above is met (the field for excitement is called weight
).
from txtai.app import Application
workflow = """
writable: true
embeddings:
path: sentence-transformers/nli-mpnet-base-v2
content: true
tabular:
idcolumn: url
textcolumns:
- summary
content: true
__main__.Slack:
workflow:
index:
schedule:
cron: "* * * * * 0/5"
elements:
- 10
iterations: 1
tasks:
- batch: false
extract:
- rows
method: get
params:
size: null
task: service
url: https://neuspo.com/data/articles/list?category=Top&from=0
- action: tabular
- action: upsert
alert:
schedule:
cron: 0/1 * * * *
elements:
- select 'https://neuspo.com' || id id, text from txtai where weight >= 40
iterations: 1
tasks:
- action: search
- action: __main__.Slack
"""
app = Application(workflow)
app.wait()
2022-02-10 15:16:49,702 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 15:16:49,704 [INFO] schedule: 'index' next run scheduled for 2022-02-10T15:16:50+00:00
2022-02-10 15:16:49,704 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 15:16:49,714 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T15:17:00+00:00
2022-02-10 15:16:50,474 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 15:17:00,010 [INFO] __call__: Sending alert: {'id': 'https://neuspo.com/stream/be15c852925b53639b63feb7169a2842', 'text': 'Islanders 6, Canucks 3: Five-goal first period keys Islanders win in first game post-'}
2022-02-10 15:17:00,215 [INFO] schedule: 'alert' max iterations (1) reached
And the notification in Slack for this job.
Search-Summarize-Notify
The next section builds on the Hacker News example. Instead of just sending a notification on a match, this workflow will summarize the match first.
There are a number of alternative combinations. For example, the summaries could be built at index time. But this example will do everything on the fly when searching.
from txtai.app import Application
workflow = """
writable: true
embeddings:
path: sentence-transformers/nli-mpnet-base-v2
content: true
summary:
path: sshleifer/distilbart-cnn-12-6
tabular:
idcolumn: url
textcolumns:
- title
textractor:
join: true
minlength: 100
paragraphs: true
__main__.Slack:
workflow:
index:
schedule:
cron: "* * * * * 0/5"
elements:
- front_page
iterations: 1
tasks:
- batch: false
extract:
- hits
method: get
params:
tags: null
task: service
url: https://hn.algolia.com/api/v1/search?hitsPerPage=50
- action: tabular
- action: upsert
alert:
schedule:
cron: 0/1 * * * *
elements:
- select id url, id title from txtai where similar('software development library') and score >= 0.4 and id like 'http%'
iterations: 1
tasks:
- action: search
- action: tabular
- action: textractor
- action: summary
- action: __main__.Slack
unpack: false
"""
app = API(workflow)
app.wait()
2022-02-10 17:19:48,847 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 17:19:48,857 [INFO] schedule: 'index' next run scheduled for 2022-02-10T17:19:50+00:00
2022-02-10 17:19:48,848 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 17:19:48,864 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T17:20:00+00:00
2022-02-10 17:19:50,368 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 17:20:02,233 [INFO] __call__: Sending alert: ('https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', 'Every server-client database has a wire protocol. A wire protocol is the format for interactions between a database server and its clients. It does NOT encompass the actual query language itself, let alone database semantics. Proprietary databases like Oracle and IBM Db2 find value in developing their own drivers.', None)
2022-02-10 17:20:02,496 [INFO] schedule: 'alert' max iterations (1) reached
And the result in Slack. See how the text is now the article summary vs. the title.
Search-Summarize-Translate-Notify
One more example to really drive this home. Let's do the same as the last example and add a translate to French step.
from txtai.app import Application
workflow = """
writable: true
embeddings:
path: sentence-transformers/nli-mpnet-base-v2
content: true
summary:
path: sshleifer/distilbart-cnn-12-6
tabular:
idcolumn: url
textcolumns:
- title
textractor:
join: true
minlength: 100
paragraphs: true
translation:
__main__.Slack:
workflow:
index:
schedule:
cron: "* * * * * 0/5"
elements:
- front_page
iterations: 1
tasks:
- batch: false
extract:
- hits
method: get
params:
tags: null
task: service
url: https://hn.algolia.com/api/v1/search?hitsPerPage=50
- action: tabular
- action: upsert
alert:
schedule:
cron: 0/1 * * * *
elements:
- select id url, id title from txtai where similar('software development library') and score >= 0.4 and id like 'http%'
iterations: 1
tasks:
- action: search
- action: tabular
- action: textractor
- action: summary
- action: translation
args:
- fr
- action: __main__.Slack
unpack: false
"""
app = API(workflow)
app.wait()
2022-02-10 17:25:04,448 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 17:25:04,449 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 17:25:04,451 [INFO] schedule: 'index' next run scheduled for 2022-02-10T17:25:05+00:00
2022-02-10 17:25:04,457 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T17:26:00+00:00
2022-02-10 17:25:05,357 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 17:26:08,125 [INFO] __call__: Sending alert: ('https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', "Chaque base de données serveur-client a un protocole filaire. Un protocole filaire est le format pour les interactions entre un serveur de base de données et ses clients. Il n'inclut PAS le langage de requête réel lui-même, et encore moins la sémantique de la base de données.", None)
2022-02-10 17:26:08,310 [INFO] schedule: 'alert' max iterations (1) reached
And just like before, Slack has a summary and a link but this time in French!
Wrapping up
This article covered how to build workflow notifications with txtai. There are many directions one could go with this. Build an activity feed, alert when semantic events occur and more. More ideas can be found in the txtai application on Hugging Face Spaces.
Everything in this article can also be written in Python. The benefits of YAML workflows are that they require little to no-code. Work is ongoing as of txtai 4.1 to make workflows easier to containerize and ultimately run in serverless environments. Keep an eye on this!
Top comments (0)