728x90
반응형
SMALL
Slack API
slack API에서 어플리케이션을 생성한다. Create an app → From scratch을 선택한다.
앱 이름은 airflow_bot으로 입력하고, 봇을 띄우고자 하는 워크스페이스를 선택한다.
그 다음, Incoming Webhooks Off →On으로 변경한다.
그리고 페이지 하단의 Webhook URL에서 T로 시작하는 토큰값을 복사한다.
커넥션 등록
Slack의 경우 Airflow 설치시 기본 제공되는 Provider 중 하나로 Hook, Operator사용 가능하다. 커넥션에 Slack 관련 정보를 넣어주면 된다. 위에서 복사한 Webhook URL 토큰을 붙여 넣는다.
callback
plugins/collbacks 디렉토리에 on_failure_callback_to_slack.py를 생성한다.
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def on_failure_callback_to_slack(context):
ti = context.get('ti')
dag_id = ti.dag_id
task_id = ti.task_id
err_msg = context.get('exception')
batch_date = context.get('data_interval_end').in_timezone('Asia/Seoul')
slack_hook = SlackWebhookHook(slack_webhook_conn_id='conn_slack_airflow_bot')
text = "실패 알람"
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{dag_id}.{task_id} 실패 알람*"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*배치 시간*: {batch_date}"
},
{
"type": "mrkdwn",
"text": f"*에러 내용*: {err_msg}"
}
]
}
]
slack_hook.send(text=text, blocks=blocks)
DAG
dags_on_failure_callback_to_slack.py를 dags 디렉토리에 생성한다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack
with DAG(
dag_id='dags_on_failure_callback_to_slack',
start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
schedule='0 * * * *',
catchup=False,
default_args={
'on_failure_callback':on_failure_callback_to_slack,
'execution_timeout': timedelta(seconds=60)
}
) as dag:
task_slp_90 = BashOperator(
task_id='task_slp_90',
bash_command='sleep 90',
)
task_ext_1 = BashOperator(
trigger_rule='all_done',
task_id='task_ext_1',
bash_command='exit 1'
)
task_slp_90 >> task_ext_1
DAG 실행 후 Slack 메시지 전송 확인한다.
728x90
반응형
LIST
'App Programming > Apache Airflow' 카테고리의 다른 글
[Apache Airflow] KaKao 연동 (0) | 2024.06.19 |
---|---|
[Apache Airflow] Connection & Hook (0) | 2024.06.18 |
[Apache Airflow] Postgre DB 추가 (0) | 2024.06.18 |
[Apache Airflow] Custom 오퍼레이터 (0) | 2024.06.18 |
[Apache Airflow] Python 오퍼레이터 (0) | 2024.06.17 |