카카오 Developer
카카오 Developer에서 다음과 같이 애플리케이션을 추가한다.
카카오 연동
제품 설정에서 카카오 로그인의 활성화 설정 상태를 ON으로 변경한다.
Redirect URI는 https://example.com/oauth로 입력한다.
task에 대한 메시지를 전송하기 위해 동의 항목에서 선택 동의로 저장한다.
인가 코드
client_id는 앱 키 →REST API 키로 해서 아래 주소를 웹 브라우저를 통해 접속하여 인가 코드를 발급받는다.
https://kauth.kakao.com/oauth/authorize?response_type=code&client_id={client_id}&redirect_uri=https://example.com/oauth&response_type=code
토큰 저장
로컬의 config 디렉토리에서 get_kakao_token.py 작성 후, 발급받은 client_id와 authorize_code을 수정하여 로컬에서 실행한다.
import requests
# client_id, authorize_code 노출 주의, 실제 값은 임시로만 넣고 Git에 올라가지 않도록 유의
client_id = '{REST API 키}'
redirect_uri = 'https://example.com/oauth'
authorize_code = '{oauth?code}'
token_url = 'https://kauth.kakao.com/oauth/token'
data = {
'grant_type': 'authorization_code',
'client_id': client_id,
'redirect_uri': redirect_uri,
'code': authorize_code,
}
response = requests.post(token_url, data=data)
tokens = response.json()
print(tokens)
로컬에서 get_kakao_token.py를 실행시키면 아래와 같이 access_token과 refresh_token을 받을 수 있다.
|
{'access_token': '', 'token_type': '', 'refresh_token': '', 'expires_in': , 'scope': 'talk_message', 'refresh_token_expires_in': }
Airflow UI에서 Admin의 Variables에 다음 값을 추가한다.
|
kakao api 호출 함수
plugins/apis 디렉토리에서 kakao api 호출하여 메시지 전송할 수 있는 함수 kakao_api.py를 작성한다.
import pendulum
import os
import json
import requests
from airflow.models import Variable
REDIRECT_URL = 'https://example.com/oauth'
def _refresh_token_to_variable():
client_id = Variable.get("kakao_client_secret")
tokens = eval(Variable.get("kakao_tokens"))
refresh_token = tokens.get('refresh_token')
url = "https://kauth.kakao.com/oauth/token"
data = {
"grant_type": "refresh_token",
"client_id": f"{client_id}",
"refresh_token": f"{refresh_token}"
}
response = requests.post(url, data=data)
rslt = response.json()
new_access_token = rslt.get('access_token')
new_refresh_token = rslt.get('refresh_token') # Refresh 토큰 만료기간이 30일 미만이면 refresh_token 값이 포함되어 리턴됨.
if new_access_token:
tokens['access_token'] = new_access_token
if new_refresh_token:
tokens['refresh_token'] = new_refresh_token
now = pendulum.now('Asia/Seoul').strftime('%Y-%m-%d %H:%M:%S')
tokens['updated'] = now
os.system(f'airflow variables set kakao_tokens "{tokens}"')
print('variable 업데이트 완료(key: kakao_tokens)')
def send_kakao_msg(talk_title: str, content: dict):
'''
content:{'tltle1':'content1', 'title2':'content2'...}
'''
try_cnt = 0
while True:
### get Access 토큰
tokens = eval(Variable.get("kakao_tokens"))
access_token = tokens.get('access_token')
content_lst = []
button_lst = []
for title, msg in content.items():
content_lst.append({
'title': f'{title}',
'description': f'{msg}',
'image_url': '',
'image_width': 40,
'image_height': 40,
'link': {
'web_url': '',
'mobile_web_url': ''
}
})
button_lst.append({
'title': '',
'link': {
'web_url': '',
'mobile_web_url': ''
}
})
list_data = {
'object_type': 'list',
'header_title': f'{talk_title}',
'header_link': {
'web_url': '',
'mobile_web_url': '',
'android_execution_params': 'main',
'ios_execution_params': 'main'
},
'contents': content_lst,
'buttons': button_lst
}
send_url = "https://kapi.kakao.com/v2/api/talk/memo/default/send"
headers = {
"Authorization": f'Bearer {access_token}'
}
data = {'template_object': json.dumps(list_data)}
response = requests.post(send_url, headers=headers, data=data)
print(f'try횟수: {try_cnt}, reponse 상태:{response.status_code}')
try_cnt += 1
if response.status_code == 200: # 200: 정상
return response.status_code
elif response.status_code == 400: # 400: Bad Request (잘못 요청시), 무조건 break 하도록 return
return response.status_code
elif response.status_code == 401 and try_cnt <= 2: # 401: Unauthorized (토큰 만료 등)
_refresh_token_to_variable()
elif response.status_code != 200 and try_cnt >= 3: # 400, 401 에러가 아닐 경우 3회 시도때 종료
return response.status_code
callback 함수
plugins/callbacks 디렉토리에서 task 실패시 kakao를 이용하여 callback을 실행할 수 있는 함수 on_failure_callback_to_kakao.py를 작성한다.
from apis.kakao_api import send_kakao_msg
def on_failure_callback_to_kakao(context):
exception = context.get('exception') or 'exception 없음'
ti = context.get('ti')
dag_id = ti.dag_id
task_id = ti.task_id
data_interval_end = context.get('data_interval_end').in_timezone('Asia/Seoul')
content = {f'{dag_id}.{task_id}': f'에러내용: {exception}', '':''} # Content 길이는 2 이상
send_kakao_msg(talk_title=f'task 실패 알람({data_interval_end})',
content=content)
DAG
task 실패 유도 및 on_failure_callback을 실행시키는 dags_on_failure_callback_to_kakao.py를 작성한다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import timedelta
import pendulum
from callbacks.on_failure_callback_to_slack import on_failure_callback_to_slack
with DAG(
dag_id='dags_on_failure_callback_to_slack',
start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
schedule='0 * * * *',
catchup=False,
default_args={
'on_failure_callback':on_failure_callback_to_slack,
'execution_timeout': timedelta(seconds=60)
}
) as dag:
task_slp_90 = BashOperator(
task_id='task_slp_90',
bash_command='sleep 90',
)
task_ext_1 = BashOperator(
trigger_rule='all_done',
task_id='task_ext_1',
bash_command='exit 1'
)
task_slp_90 >> task_ext_1
로컬에서 docker-compose.yaml 파일 수정하여 config 디렉토리 연결 완료 후 git commit & push를 해준다.
volumes:
- /home/ubuntu/airflow/dags:/opt/airflow/dags
- /home/ubuntu/logs:/opt/airflow/logs
- /home/ubuntu/airflow/config:/opt/airflow/config
- /home/ubuntu/airflow/plugins:/opt/airflow/plugins
- /home/ubuntu/ingest:/opt/airflow/ingest
DAG 실행
'App Programming > Apache Airflow' 카테고리의 다른 글
[Apache Airflow] Slack 연동 (0) | 2024.06.19 |
---|---|
[Apache Airflow] Connection & Hook (0) | 2024.06.18 |
[Apache Airflow] Postgre DB 추가 (0) | 2024.06.18 |
[Apache Airflow] Custom 오퍼레이터 (0) | 2024.06.18 |
[Apache Airflow] Python 오퍼레이터 (0) | 2024.06.17 |