App Programming/Apache Airflow

[Apache Airflow] KaKao 연동

by goatlab 2024. 6. 19.

카카오 Developer


카카오 Developer에서 다음과 같이 애플리케이션을 추가한다.





카카오 연동


제품 설정에서 카카오 로그인의 활성화 설정 상태를 ON으로 변경한다.



Redirect URI는 https://example.com/oauth로 입력한다.



task에 대한 메시지를 전송하기 위해 동의 항목에서 선택 동의로 저장한다.



인가 코드



client_id는 앱 키 →REST API 키로 해서 아래 주소를 웹 브라우저를 통해 접속하여 인가 코드를 발급받는다.




토큰 저장


로컬의 config 디렉토리에서 get_kakao_token.py 작성 후, 발급받은 client_id와 authorize_code을 수정하여 로컬에서 실행한다.


import requests

# client_id, authorize_code 노출 주의, 실제 값은 임시로만 넣고 Git에 올라가지 않도록 유의
client_id = '{REST API 키}'
redirect_uri = 'https://example.com/oauth'
authorize_code = '{oauth?code}'
token_url = 'https://kauth.kakao.com/oauth/token'

data = {
    'grant_type': 'authorization_code',
    'client_id': client_id,
    'redirect_uri': redirect_uri,
    'code': authorize_code,

response = requests.post(token_url, data=data)
tokens = response.json()


로컬에서 get_kakao_token.py를 실행시키면 아래와 같이 access_token과 refresh_token을 받을 수 있다.


  • Access_token:프로그램에서 카카오 메시지 전송할 때 필요한 토큰 (유효기간은 6시간)
  • Refreshtoken:access_token을 재발급받을 때 필요한 토큰 (유효시간은 60일)
{'access_token': '', 'token_type': '', 'refresh_token': '', 'expires_in': , 'scope': 'talk_message', 'refresh_token_expires_in': }


Airflow UI에서 Admin의 Variables에 다음 값을 추가한다.


  • key: kakao_client_secret / value: {kakao id}
  • key: kakao_tokens / value: {tokens 조회 결과 전체}


kakao api 호출 함수


plugins/apis 디렉토리에서 kakao api 호출하여 메시지 전송할 수 있는 함수 kakao_api.py를 작성한다.


import pendulum
import os
import json
import requests
from airflow.models import Variable

REDIRECT_URL = 'https://example.com/oauth'

def _refresh_token_to_variable():
    client_id = Variable.get("kakao_client_secret")
    tokens = eval(Variable.get("kakao_tokens"))
    refresh_token = tokens.get('refresh_token')
    url = "https://kauth.kakao.com/oauth/token"
    data = {
        "grant_type": "refresh_token",
        "client_id": f"{client_id}",
        "refresh_token": f"{refresh_token}"
    response = requests.post(url, data=data)
    rslt = response.json()
    new_access_token = rslt.get('access_token')
    new_refresh_token = rslt.get('refresh_token')         # Refresh 토큰 만료기간이 30일 미만이면 refresh_token 값이 포함되어 리턴됨.
    if new_access_token:
        tokens['access_token'] = new_access_token
    if new_refresh_token:
        tokens['refresh_token'] = new_refresh_token

    now = pendulum.now('Asia/Seoul').strftime('%Y-%m-%d %H:%M:%S')
    tokens['updated'] = now
    os.system(f'airflow variables set kakao_tokens "{tokens}"')
    print('variable 업데이트 완료(key: kakao_tokens)')

def send_kakao_msg(talk_title: str, content: dict):
    content:{'tltle1':'content1', 'title2':'content2'...}

    try_cnt = 0
    while True:
        ### get Access 토큰
        tokens = eval(Variable.get("kakao_tokens"))
        access_token = tokens.get('access_token')
        content_lst = []
        button_lst = []

        for title, msg in content.items():
                'title': f'{title}',
                'description': f'{msg}',
                'image_url': '',
                'image_width': 40,
                'image_height': 40,
                'link': {
                    'web_url': '',
                    'mobile_web_url': ''
                'title': '',
                'link': {
                    'web_url': '',
                    'mobile_web_url': ''

        list_data = {
            'object_type': 'list',
            'header_title': f'{talk_title}',
            'header_link': {
                'web_url': '',
                'mobile_web_url': '',
                'android_execution_params': 'main',
                'ios_execution_params': 'main'
            'contents': content_lst,
            'buttons': button_lst

        send_url = "https://kapi.kakao.com/v2/api/talk/memo/default/send"
        headers = {
            "Authorization": f'Bearer {access_token}'
        data = {'template_object': json.dumps(list_data)}
        response = requests.post(send_url, headers=headers, data=data)
        print(f'try횟수: {try_cnt}, reponse 상태:{response.status_code}')
        try_cnt += 1

        if response.status_code == 200:         # 200: 정상
            return response.status_code
        elif response.status_code == 400:       # 400: Bad Request (잘못 요청시), 무조건 break 하도록 return
            return response.status_code
        elif response.status_code == 401 and try_cnt <= 2:      # 401: Unauthorized (토큰 만료 등)
        elif response.status_code != 200 and try_cnt >= 3:      # 400, 401 에러가 아닐 경우 3회 시도때 종료
            return response.status_code


callback 함수


plugins/callbacks 디렉토리에서 task 실패시 kakao를 이용하여 callback을 실행할 수 있는 함수 on_failure_callback_to_kakao.py를 작성한다.


from apis.kakao_api import send_kakao_msg

def on_failure_callback_to_kakao(context):
    exception = context.get('exception') or 'exception 없음'
    ti = context.get('ti')
    dag_id = ti.dag_id
    task_id = ti.task_id
    data_interval_end = context.get('data_interval_end').in_timezone('Asia/Seoul')

    content = {f'{dag_id}.{task_id}': f'에러내용: {exception}', '':''}      # Content 길이는 2 이상
    send_kakao_msg(talk_title=f'task 실패 알람({data_interval_end})',




task 실패 유도 및 on_failure_callback을 실행시키는 dags_on_failure_callback_to_kakao.py를 작성한다.


from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack

with DAG(
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule='0 * * * *',
        'execution_timeout': timedelta(seconds=60)

) as dag:
    task_slp_90 = BashOperator(
        bash_command='sleep 90',

    task_ext_1 = BashOperator(
        bash_command='exit 1'

    task_slp_90 >> task_ext_1



로컬에서 docker-compose.yaml 파일 수정하여 config 디렉토리 연결 완료 후 git commit & push를 해준다.


    - /home/ubuntu/airflow/dags:/opt/airflow/dags
    - /home/ubuntu/logs:/opt/airflow/logs
    - /home/ubuntu/airflow/config:/opt/airflow/config
    - /home/ubuntu/airflow/plugins:/opt/airflow/plugins
    - /home/ubuntu/ingest:/opt/airflow/ingest


DAG 실행

