Gửi Slack Alerts trên Airflow
Note: This post is over 6 years old. The information may be outdated.
Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.
1. Slack Incoming Webhooks và Airflow Connection
Truy cập Slack App Directory tìm Incoming Webhooks: https://<workspace>.slack.com/apps/A0F7XDUAZ-incoming-webhooks
Ở mục Post to Channel chọn Channel, sau đó bấm Add Incoming Webhooks integration
Sau đó bạn sẽ nhận được 1 URL có dạng: https://hooks.slack.com/services/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2
Vào Airflow > Admin > Connections để thêm một connection mới
- Conn Id:
Slack
- Conn Type:
HTTP
- Host:
https://hooks.slack.com/services
- Password:
/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2
2. Slack alert Utils
Tạo file utils chứa function alert, ví dụ: /dags/utils/slack_alert.py
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
Task Failed.
*DAG*: {dag_id}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
dag_id=context.get('dag').dag_id,
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
3. Config Slack alert cho từng DAG
Với mỗi DAG muốn alert, ta thêm thuộc tính on_failure_callback
cho mỗi DAG. Ví dụ như dưới dây:
example_dag.py
from airflow import DAG
...
from utils.slack_alert import task_fail_slack_alert
default_args = {
**params['default_args'],
'owner': DAG_OWNER,
'on_failure_callback': task_fail_slack_alert,
...
}
dag = DAG('dag_id', default_args=default_args)
...
Kết quả:
Tham khảo
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- airflow.operators.slack_operator: https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Chúc các bạn thành công.