본문 바로가기
App Programming/Apache Airflow

[Apache Airflow] KaKao 연동

by goatlab 2024. 6. 19.
728x90
반응형
SMALL

카카오 Developer

 

카카오 Developer에서 다음과 같이 애플리케이션을 추가한다.

 

 

 

 

카카오 연동

 

제품 설정에서 카카오 로그인의 활성화 설정 상태를 ON으로 변경한다.

 

 

Redirect URI는 https://example.com/oauth로 입력한다.

 

 

task에 대한 메시지를 전송하기 위해 동의 항목에서 선택 동의로 저장한다.

 

 

인가 코드

 

 

client_id는 앱 키 →REST API 키로 해서 아래 주소를 웹 브라우저를 통해 접속하여 인가 코드를 발급받는다.

 

https://kauth.kakao.com/oauth/authorize?response_type=code&client_id={client_id}&redirect_uri=https://example.com/oauth&response_type=code

 

토큰 저장

 

로컬의 config 디렉토리에서 get_kakao_token.py 작성 후, 발급받은 client_id와 authorize_code을 수정하여 로컬에서 실행한다.

 

import requests

# client_id, authorize_code 노출 주의, 실제 값은 임시로만 넣고 Git에 올라가지 않도록 유의
client_id = '{REST API 키}'
redirect_uri = 'https://example.com/oauth'
authorize_code = '{oauth?code}'
token_url = 'https://kauth.kakao.com/oauth/token'

data = {
    'grant_type': 'authorization_code',
    'client_id': client_id,
    'redirect_uri': redirect_uri,
    'code': authorize_code,
    }

response = requests.post(token_url, data=data)
tokens = response.json()
print(tokens)

 

로컬에서 get_kakao_token.py를 실행시키면 아래와 같이 access_token과 refresh_token을 받을 수 있다.

 

  • Access_token:프로그램에서 카카오 메시지 전송할 때 필요한 토큰 (유효기간은 6시간)
  • Refreshtoken:access_token을 재발급받을 때 필요한 토큰 (유효시간은 60일)
{'access_token': '', 'token_type': '', 'refresh_token': '', 'expires_in': , 'scope': 'talk_message', 'refresh_token_expires_in': }

 

Airflow UI에서 Admin의 Variables에 다음 값을 추가한다.

 

  • key: kakao_client_secret / value: {kakao id}
  • key: kakao_tokens / value: {tokens 조회 결과 전체}

 

kakao api 호출 함수

 

plugins/apis 디렉토리에서 kakao api 호출하여 메시지 전송할 수 있는 함수 kakao_api.py를 작성한다.

 

import pendulum
import os
import json
import requests
from airflow.models import Variable

REDIRECT_URL = 'https://example.com/oauth'

def _refresh_token_to_variable():
    client_id = Variable.get("kakao_client_secret")
    tokens = eval(Variable.get("kakao_tokens"))
    refresh_token = tokens.get('refresh_token')
    url = "https://kauth.kakao.com/oauth/token"
    data = {
        "grant_type": "refresh_token",
        "client_id": f"{client_id}",
        "refresh_token": f"{refresh_token}"
    }
    response = requests.post(url, data=data)
    rslt = response.json()
    new_access_token = rslt.get('access_token')
    new_refresh_token = rslt.get('refresh_token')         # Refresh 토큰 만료기간이 30일 미만이면 refresh_token 값이 포함되어 리턴됨.
    if new_access_token:
        tokens['access_token'] = new_access_token
    if new_refresh_token:
        tokens['refresh_token'] = new_refresh_token

    now = pendulum.now('Asia/Seoul').strftime('%Y-%m-%d %H:%M:%S')
    tokens['updated'] = now
    os.system(f'airflow variables set kakao_tokens "{tokens}"')
    print('variable 업데이트 완료(key: kakao_tokens)')

def send_kakao_msg(talk_title: str, content: dict):
    '''
    content:{'tltle1':'content1', 'title2':'content2'...}
    '''

    try_cnt = 0
    while True:
        ### get Access 토큰
        tokens = eval(Variable.get("kakao_tokens"))
        access_token = tokens.get('access_token')
        content_lst = []
        button_lst = []

        for title, msg in content.items():
            content_lst.append({
                'title': f'{title}',
                'description': f'{msg}',
                'image_url': '',
                'image_width': 40,
                'image_height': 40,
                'link': {
                    'web_url': '',
                    'mobile_web_url': ''
                }
            })
            button_lst.append({
                'title': '',
                'link': {
                    'web_url': '',
                    'mobile_web_url': ''
                }
            })

        list_data = {
            'object_type': 'list',
            'header_title': f'{talk_title}',
            'header_link': {
                'web_url': '',
                'mobile_web_url': '',
                'android_execution_params': 'main',
                'ios_execution_params': 'main'
            },
            'contents': content_lst,
            'buttons': button_lst
        }

        send_url = "https://kapi.kakao.com/v2/api/talk/memo/default/send"
        headers = {
            "Authorization": f'Bearer {access_token}'
        }
        data = {'template_object': json.dumps(list_data)}
        response = requests.post(send_url, headers=headers, data=data)
        print(f'try횟수: {try_cnt}, reponse 상태:{response.status_code}')
        try_cnt += 1

        if response.status_code == 200:         # 200: 정상
            return response.status_code
        elif response.status_code == 400:       # 400: Bad Request (잘못 요청시), 무조건 break 하도록 return
            return response.status_code
        elif response.status_code == 401 and try_cnt <= 2:      # 401: Unauthorized (토큰 만료 등)
            _refresh_token_to_variable()
        elif response.status_code != 200 and try_cnt >= 3:      # 400, 401 에러가 아닐 경우 3회 시도때 종료
            return response.status_code

 

callback 함수

 

plugins/callbacks 디렉토리에서 task 실패시 kakao를 이용하여 callback을 실행할 수 있는 함수 on_failure_callback_to_kakao.py를 작성한다.

 

from apis.kakao_api import send_kakao_msg

def on_failure_callback_to_kakao(context):
    exception = context.get('exception') or 'exception 없음'
    ti = context.get('ti')
    dag_id = ti.dag_id
    task_id = ti.task_id
    data_interval_end = context.get('data_interval_end').in_timezone('Asia/Seoul')

    content = {f'{dag_id}.{task_id}': f'에러내용: {exception}', '':''}      # Content 길이는 2 이상
    send_kakao_msg(talk_title=f'task 실패 알람({data_interval_end})',
                   content=content)

 

DAG

 

task 실패 유도 및 on_failure_callback을 실행시키는 dags_on_failure_callback_to_kakao.py를 작성한다.

 

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack

with DAG(
    dag_id='dags_on_failure_callback_to_slack',
    start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
    schedule='0 * * * *',
    catchup=False,
    default_args={
        'on_failure_callback':on_failure_callback_to_slack,
        'execution_timeout': timedelta(seconds=60)
    }

) as dag:
    task_slp_90 = BashOperator(
        task_id='task_slp_90',
        bash_command='sleep 90',
    )

    task_ext_1 = BashOperator(
        trigger_rule='all_done',
        task_id='task_ext_1',
        bash_command='exit 1'
    )

    task_slp_90 >> task_ext_1

 

 

로컬에서 docker-compose.yaml 파일 수정하여 config 디렉토리 연결 완료 후 git commit & push를 해준다.

 

  volumes:
    - /home/ubuntu/airflow/dags:/opt/airflow/dags
    - /home/ubuntu/logs:/opt/airflow/logs
    - /home/ubuntu/airflow/config:/opt/airflow/config
    - /home/ubuntu/airflow/plugins:/opt/airflow/plugins
    - /home/ubuntu/ingest:/opt/airflow/ingest

 

DAG 실행

 

728x90
반응형
LIST