본문 바로가기
App Programming/Apache Airflow

[Apache Airflow] Connection & Hook

by goatlab 2024. 6. 18.
728x90
반응형
SMALL

오퍼레이터 문제점

 

  • 접속 정보 노출 : postgre DB에 대한 User, Password 등
  • 접속 정보 변경시 대응 어려움 : 직접 접속하는 DAG이 많은데 접속 정보가 수정되는 경우

 

해결 방법으로는 Variable (User, Password 등을 Variable에 등록하고 꺼내오기)이나 Hook을 이용한다 (Variable 등록 필요없음).

 

Connection

 

Airflow를 사용하면 사용자 정의 연결 유형을 정의할 수 있다. Airflow UI 화면에서 등록한 커넥션 정보를 말한다.

 

Connection DB 등록

 

Admin Connection에서 postges_customDB 접속 정보를 등록한다.

 

 

Connection을 등록하고 나면 Postgre Hook을 이용해 접속 가능하다. 그리고 Admin Providers에서 새로운 Hook이나 오퍼레이터, 센서 등에 대해 사용시에는 반드시 문서를 통해 코드 레벨에서 기능 확인 필요하다.

 

Hook

 

Airflow에서 외부 솔루션에 연결, 기능을 사용할 수 있도록 미리 구현된 메서드를 가진 클래스이다. Hook의 특징은 다음과 같다.

 

  • Connection 정보를 통해 생성되는 객체 ☞ 접속 정보를 Connection을 통해 받아오므로 접속 정보가 코드상 노출되지 않음
  • 특정 솔루션을 다룰 수 있는 메서드가 구현되어 있음
  • 오퍼레이터나 센서와는 달리 Hook은 task를 만들어내지 못하므로 Custom 오퍼레이터 안에서나 Python 오퍼레이터 내 함수에서 사용

 

Postgre Hook

 

기존 DAG인 dags_python_with_postgres.py를 dags_python_with_postgres_hook.py로 변경하고 Hook을 사용하면서 접속 정보는 Connection에서 가져오는 것으로 수정한다.

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator

with DAG(
        dag_id='dags_python_with_postgres_hook',
        start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    def insrt_postgres(postgres_conn_id, **kwargs):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        from contextlib import closing

        postgres_hook = PostgresHook(postgres_conn_id)
        with closing(postgres_hook.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'hook insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
                cursor.execute(sql, (dag_id, task_id, run_id, msg))
                conn.commit()

    insrt_postgres_with_hook = PythonOperator(
        task_id='insrt_postgres_with_hook',
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom'}
    )
    insrt_postgres_with_hook

 

Postgre Hook bulk upload

 

Postgre Hook에는 bulk upload할 수 있는 기능이 존재한다. bulk_upload 기능을 이용해 csv파일을 postgres DB로 업로드하는 DAG를 작성한다.

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

with DAG(
        dag_id='dags_python_with_postgres_hook_bulk_load',
        start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
        schedule='0 7 * * *',
        catchup=False
) as dag:
    def insrt_postgres(postgres_conn_id, tbl_nm, file_nm, **kwargs):
        postgres_hook = PostgresHook(postgres_conn_id)
        postgres_hook.bulk_load(tbl_nm, file_nm)

    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
                   'tbl_nm':'seoul_bike_hist',
                   'file_nm':'/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'}
    )

 

 

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/connections.html

 

Connections & Hooks — Airflow Documentation

 

airflow.apache.org

 

728x90
반응형
LIST