Custom 오퍼레이터
앞서, SimpleHttp는 기본적으로 1회 호출만 가능하다. 그리고 openAPI를 통해 얻은 데이터셋은 몇 개의 row가 존재할 지 미리 알기 어렵다. 하나의 task로 모든 row를 추출하면서 전처리하여 파일로 저장할 수 있는 기능을 가능케 하는 것은 커스텀 오퍼레이터이다. Airflow는 필요한 오퍼레이터를 직접 만들어 사용할 수 있도록 확장성을 지원한다. BaseOperator를 상속하며 원하는 기능은 파이썬으로 직접 구현 가능하다. 다음과 같이, BaseOperator 상속시 두 가지 메서드를 재정의해야 한다 (Overriding).
|
서울시 공공 데이터를 가져오기 위한 Custom 오퍼레이터 만들기
서울시 공공 자전거의 전체 row를 가져오고 결과를 csv로 저장할 수 있는 오퍼레이터 만들기 위해 plugins/operators 경로에 seoul_api_to_csv_operator.py를 다음과 같이 작성한다.
from airflow.models.baseoperator import BaseOperator
from airflow.hooks.base import BaseHook
import pandas as pd
class SeoulApiToCsvOperator(BaseOperator):
template_fields = ('endpoint', 'path','file_name','base_dt')
def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs):
super().__init__(**kwargs)
self.http_conn_id = 'openapi.seoul.go.kr'
self.path = path
self.file_name = file_name
self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
self.base_dt = base_dt
def execute(self, context):
import os
connection = BaseHook.get_connection(self.http_conn_id)
self.base_url = f'http://{connection.host}:{connection.port}/{self.endpoint}'
total_row_df = pd.DataFrame()
start_row = 1
end_row = 1000
while True:
self.log.info(f'시작:{start_row}')
self.log.info(f'끝:{end_row}')
row_df = self._call_api(self.base_url, start_row, end_row)
total_row_df = pd.concat([total_row_df, row_df])
if len(row_df) < 1000:
break
else:
start_row = end_row + 1
end_row += 1000
if not os.path.exists(self.path):
os.system(f'mkdir -p {self.path}')
total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)
def _call_api(self, base_url, start_row, end_row):
import requests
import json
headers = {'Content-Type': 'application/json',
'charset': 'utf-8',
'Accept': '*/*'
}
request_url = f'{base_url}/{start_row}/{end_row}/'
if self.base_dt is not None:
request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
response = requests.get(request_url, headers)
contents = json.loads(response.text)
key_nm = list(contents.keys())[0]
row_data = contents.get(key_nm).get('row')
row_df = pd.DataFrame(row_data)
return row_df
위의 CustomOperator를 사용해서 task를 수행할 dags_seoul_bikelist.py DAG를 만든다.
from operators.seoul_api_to_csv_operator import SeoulApiToCsvOperator
from airflow import DAG
import pendulum
with DAG(
dag_id='dags_seoul_bikelist',
schedule='0 7 * * *',
start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
catchup=False
) as dag:
'''서울시 공공자전거 실시간 대여 현황'''
seoul_api2csv_bike_list = SeoulApiToCsvOperator(
task_id='seoul_api2csv_bike_list',
dataset_nm='bikeList',
path='./opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
file_name='bikeList.csv'
)
CSV 확인
생성된 CSV 파일은 Worker 컨테이너 내부에 존재한다. 따라서, Worker 컨테이너 내부 접속 및 파일 확인한다.
ubuntu~$sudo docker ps
ubuntu~$sudo docker exec -it {worker 컨테이너 ID} bash
default@~:cd /opt/airflow/ingest
default@~:ls
그 다음, ingest 디렉토리를 컨테이너와 연결하기 위해 서버의 /home/ubuntu 디렉토리에서 ingest 디렉토리를 생성한다.
ubuntu~$mkdir ingest
로컬에서 docker-compose.yaml 파일을 다음과 같이 수정한다.
volumes:
- /home/ubuntu/airflow/dags:/opt/airflow/dags
- /home/ubuntu/logs:/opt/airflow/logs
- /home/ubuntu/config:/opt/airflow/config
- /home/ubuntu/airflow/plugins:/opt/airflow/plugins
- /home/ubuntu/ingest:/opt/airflow/ingest
수정 후 git commit & push을 한 뒤, 서버에서 docker compose 재시작한다.
sudo docker compose restart
dags_seoul_bikelist DAG 재수행 후 서버의 /home/ubuntu/ingest 디렉토리에 csv가 생성되었는지 확인한다.
https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
'App Programming > Apache Airflow' 카테고리의 다른 글
[Apache Airflow] Connection & Hook (0) | 2024.06.18 |
---|---|
[Apache Airflow] Postgre DB 추가 (0) | 2024.06.18 |
[Apache Airflow] Python 오퍼레이터 (0) | 2024.06.17 |
[Apache Airflow] Email 오퍼레이터 (0) | 2024.06.17 |
[Apache Airflow] Cron Schedule (0) | 2024.06.17 |