본문 바로가기
App Programming/Apache Airflow

[Apache Airflow] Custom 오퍼레이터

by goatlab 2024. 6. 18.
728x90
반응형
SMALL

Custom 오퍼레이터

 

앞서, SimpleHttp는 기본적으로 1회 호출만 가능하다. 그리고 openAPI를 통해 얻은 데이터셋은 몇 개의 row가 존재할 지 미리 알기 어렵다. 하나의 task로 모든 row를 추출하면서 전처리하여 파일로 저장할 수 있는 기능을 가능케 하는 것은 커스텀 오퍼레이터이다. Airflow는 필요한 오퍼레이터를 직접 만들어 사용할 수 있도록 확장성을 지원한다. BaseOperator를 상속하며 원하는 기능은 파이썬으로 직접 구현 가능하다. 다음과 같이, BaseOperator 상속시 두 가지 메서드를 재정의해야 한다 (Overriding).

 

  • def__init__ : 클래스에서 객체 생성시 객체에 대한 초기값 지정하는 함수
  • def execute(self, context) : 실제 로직을 담은 함수
  • Template 적용이 필요한 변수 : class 변수 template_fields에 지정 필요

 

서울시 공공 데이터를 가져오기 위한 Custom 오퍼레이터 만들기

 

서울시 공공 자전거의 전체 row를 가져오고 결과를 csv로 저장할 수 있는 오퍼레이터 만들기 위해 plugins/operators 경로에 seoul_api_to_csv_operator.py를 다음과 같이 작성한다.

 

from airflow.models.baseoperator import BaseOperator
from airflow.hooks.base import BaseHook
import pandas as pd 

class SeoulApiToCsvOperator(BaseOperator):
    template_fields = ('endpoint', 'path','file_name','base_dt')

    def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs):
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr'
        self.path = path
        self.file_name = file_name
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
        self.base_dt = base_dt

    def execute(self, context):
        import os
        
        connection = BaseHook.get_connection(self.http_conn_id)
        self.base_url = f'http://{connection.host}:{connection.port}/{self.endpoint}'

        total_row_df = pd.DataFrame()
        start_row = 1
        end_row = 1000
        while True:
            self.log.info(f'시작:{start_row}')
            self.log.info(f'끝:{end_row}')
            row_df = self._call_api(self.base_url, start_row, end_row)
            total_row_df = pd.concat([total_row_df, row_df])
            if len(row_df) < 1000:
                break
            else:
                start_row = end_row + 1
                end_row += 1000

        if not os.path.exists(self.path):
            os.system(f'mkdir -p {self.path}')
        total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)

    def _call_api(self, base_url, start_row, end_row):
        import requests
        import json 

        headers = {'Content-Type': 'application/json',
                   'charset': 'utf-8',
                   'Accept': '*/*'
                   }

        request_url = f'{base_url}/{start_row}/{end_row}/'
        if self.base_dt is not None:
            request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
        response = requests.get(request_url, headers)
        contents = json.loads(response.text)

        key_nm = list(contents.keys())[0]
        row_data = contents.get(key_nm).get('row')
        row_df = pd.DataFrame(row_data)

        return row_df

 

위의 CustomOperator를 사용해서 task를 수행할 dags_seoul_bikelist.py DAG를 만든다.

 

from operators.seoul_api_to_csv_operator import SeoulApiToCsvOperator
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_seoul_bikelist',
    schedule='0 7 * * *',
    start_date=pendulum.datetime(2024,6,16, tz='Asia/Seoul'),
    catchup=False
) as dag:
    '''서울시 공공자전거 실시간 대여 현황'''
    seoul_api2csv_bike_list = SeoulApiToCsvOperator(
        task_id='seoul_api2csv_bike_list',
        dataset_nm='bikeList',
        path='./opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
        file_name='bikeList.csv'
    )

 

CSV 확인

 

생성된 CSV 파일은 Worker 컨테이너 내부에 존재한다. 따라서, Worker 컨테이너 내부 접속 및 파일 확인한다.

 

ubuntu~$sudo docker ps
ubuntu~$sudo docker exec -it {worker 컨테이너 ID} bash
default@~:cd /opt/airflow/ingest
default@~:ls

 

그 다음, ingest 디렉토리를 컨테이너와 연결하기 위해 서버의 /home/ubuntu 디렉토리에서 ingest 디렉토리를 생성한다.

 

ubuntu~$mkdir ingest

 

로컬에서 docker-compose.yaml 파일을 다음과 같이 수정한다.

 

volumes:
	- /home/ubuntu/airflow/dags:/opt/airflow/dags
	- /home/ubuntu/logs:/opt/airflow/logs
	- /home/ubuntu/config:/opt/airflow/config
	- /home/ubuntu/airflow/plugins:/opt/airflow/plugins
	- /home/ubuntu/ingest:/opt/airflow/ingest

 

수정 후 git commit & push을 한 뒤, 서버에서 docker compose 재시작한다.

 

sudo docker compose restart

 

dags_seoul_bikelist DAG 재수행 후 서버의 /home/ubuntu/ingest 디렉토리에 csv가 생성되었는지 확인한다.

 

 

 

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

 

Creating a custom Operator — Airflow Documentation

 

airflow.apache.org

 

 

 

 

728x90
반응형
LIST