본문 바로가기
App Programming/Apache Airflow

[Apache Airflow] Python 오퍼레이터

by goatlab 2024. 6. 17.
728x90
반응형
SMALL

Python 오퍼레이터

 

파이썬 함수를 실행시킬수 있는 오퍼레이터이며, 가장 많이 사용되는 오퍼레이터 중 하나이다. dags 디렉토리를 기본적으로 sys.path에 추가해주고 있기 때문에 dags 디렉토리에 dag 파일을 가져다 놓기만 해도 airflow는 DAG 인식이 가능하다. 뿐만 아니라 config, plugins 디렉토리도 sys.path에 추가해 주고 있으므로 파이썬 공통 모듈이나 설정 파일은 plugins 디렉토리 또는 config 디렉토리 안에 만들어 놓으면 dag에서 import하여 사용 가능하다.

 

dags_python_operator.py

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
import random

with DAG(
    dag_id="dags_python_operator",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:
    def select_fruit():
        fruit = ['APPLE','BANANA','ORANGE','AVOCADO']
        rand_int = random.randint(0,3)
        print(fruit[rand_int])

    py_t1 = PythonOperator(
        task_id='py_t1',
        python_callable=select_fruit
    )

    py_t1

 

공통 함수 만들기

 

plugins 디렉토리 아래 common/common_func.py을 생성한다.

 

def get_sftp():
    print('sftp 작업 시작')

def regist(name, sex, *args):
    print(f'이름: {name}')
    print(f'성별: {sex}')
    print(f'기타 옵션: {args}')

def regist2(name, sex, *args, **kwargs):
    print(f'이름: {name}')
    print(f'성별: {sex}')
    print(f'기타 옵션: {args}')
    email = kwargs['email'] or 'empty'
    phone = kwargs['phone'] or 'empty'
    
    if email:
        print(email)
    if phone:
        print(phone)

 

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

 

PythonOperator — Airflow Documentation

 

airflow.apache.org

 

728x90
반응형
LIST