본문 바로가기
App Programming/Apache Airflow

[Apache Airflow] Slack 연동

by goatlab 2024. 6. 19.
728x90
반응형
SMALL

Slack API

 

slack API에서 어플리케이션을 생성한다. Create an app → From scratch을 선택한다.

 

 

앱 이름은 airflow_bot으로 입력하고, 봇을 띄우고자 하는 워크스페이스를 선택한다.

 

 

그 다음, Incoming Webhooks Off →On으로 변경한다.

 

 

그리고 페이지 하단의 Webhook URL에서 T로 시작하는 토큰값을 복사한다.

 

 

커넥션 등록

 

Slack의 경우 Airflow 설치시 기본 제공되는 Provider 중 하나로 Hook, Operator사용 가능하다. 커넥션에 Slack 관련 정보를 넣어주면 된다. 위에서 복사한 Webhook URL 토큰을 붙여 넣는다.

 

 

callback

 

plugins/collbacks 디렉토리에 on_failure_callback_to_slack.py를 생성한다.

 

from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

def on_failure_callback_to_slack(context):
    ti = context.get('ti')
    dag_id = ti.dag_id
    task_id = ti.task_id
    err_msg = context.get('exception')
    batch_date = context.get('data_interval_end').in_timezone('Asia/Seoul')

    slack_hook = SlackWebhookHook(slack_webhook_conn_id='conn_slack_airflow_bot')
    text = "실패 알람"
    blocks = [
        {
			"type": "section",
			"text": {
				"type": "mrkdwn",
				"text": f"*{dag_id}.{task_id} 실패 알람*"
			}
		},
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": f"*배치 시간*: {batch_date}"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*에러 내용*: {err_msg}"
                }
            ]
        }
    ]

    slack_hook.send(text=text, blocks=blocks)

 

 

DAG

 

dags_on_failure_callback_to_slack.py를 dags 디렉토리에 생성한다.

 

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack

with DAG(
    dag_id='dags_on_failure_callback_to_slack',
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule='0 * * * *',
    catchup=False,
    default_args={
        'on_failure_callback':on_failure_callback_to_slack,
        'execution_timeout': timedelta(seconds=60)
    }

) as dag:
    task_slp_90 = BashOperator(
        task_id='task_slp_90',
        bash_command='sleep 90',
    )

    task_ext_1 = BashOperator(
        trigger_rule='all_done',
        task_id='task_ext_1',
        bash_command='exit 1'
    )

    task_slp_90 >> task_ext_1

 

DAG 실행 후 Slack 메시지 전송 확인한다.

 

728x90
반응형
LIST