오퍼레이터 문제점
|
해결 방법으로는 Variable (User, Password 등을 Variable에 등록하고 꺼내오기)이나 Hook을 이용한다 (Variable 등록 필요없음).
Connection
Airflow를 사용하면 사용자 정의 연결 유형을 정의할 수 있다. Airflow UI 화면에서 등록한 커넥션 정보를 말한다.
Connection DB 등록
Admin Connection에서 postges_customDB 접속 정보를 등록한다.
Connection을 등록하고 나면 Postgre Hook을 이용해 접속 가능하다. 그리고 Admin Providers에서 새로운 Hook이나 오퍼레이터, 센서 등에 대해 사용시에는 반드시 문서를 통해 코드 레벨에서 기능 확인 필요하다.
Hook
Airflow에서 외부 솔루션에 연결, 기능을 사용할 수 있도록 미리 구현된 메서드를 가진 클래스이다. Hook의 특징은 다음과 같다.
|
Postgre Hook
기존 DAG인 dags_python_with_postgres.py를 dags_python_with_postgres_hook.py로 변경하고 Hook을 사용하면서 접속 정보는 Connection에서 가져오는 것으로 수정한다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres_hook',
start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(postgres_conn_id, **kwargs):
from airflow.providers.postgres.hooks.postgres import PostgresHook
from contextlib import closing
postgres_hook = PostgresHook(postgres_conn_id)
with closing(postgres_hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'hook insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql, (dag_id, task_id, run_id, msg))
conn.commit()
insrt_postgres_with_hook = PythonOperator(
task_id='insrt_postgres_with_hook',
python_callable=insrt_postgres,
op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom'}
)
insrt_postgres_with_hook
Postgre Hook bulk upload
Postgre Hook에는 bulk upload할 수 있는 기능이 존재한다. bulk_upload 기능을 이용해 csv파일을 postgres DB로 업로드하는 DAG를 작성한다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
with DAG(
dag_id='dags_python_with_postgres_hook_bulk_load',
start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
schedule='0 7 * * *',
catchup=False
) as dag:
def insrt_postgres(postgres_conn_id, tbl_nm, file_nm, **kwargs):
postgres_hook = PostgresHook(postgres_conn_id)
postgres_hook.bulk_load(tbl_nm, file_nm)
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
'tbl_nm':'seoul_bike_hist',
'file_nm':'/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/bikeList.csv'}
)
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/connections.html
'App Programming > Apache Airflow' 카테고리의 다른 글
[Apache Airflow] Slack 연동 (0) | 2024.06.19 |
---|---|
[Apache Airflow] KaKao 연동 (0) | 2024.06.19 |
[Apache Airflow] Postgre DB 추가 (0) | 2024.06.18 |
[Apache Airflow] Custom 오퍼레이터 (0) | 2024.06.18 |
[Apache Airflow] Python 오퍼레이터 (0) | 2024.06.17 |