본문 바로가기
728x90
반응형
SMALL

App Programming/Apache Airflow11

[Apache Airflow] Slack 연동 Slack API slack API에서 어플리케이션을 생성한다. Create an app → From scratch을 선택한다.  앱 이름은 airflow_bot으로 입력하고, 봇을 띄우고자 하는 워크스페이스를 선택한다.  그 다음, Incoming Webhooks Off →On으로 변경한다.  그리고 페이지 하단의 Webhook URL에서 T로 시작하는 토큰값을 복사한다.  커넥션 등록 Slack의 경우 Airflow 설치시 기본 제공되는 Provider 중 하나로 Hook, Operator사용 가능하다. 커넥션에 Slack 관련 정보를 넣어주면 된다. 위에서 복사한 Webhook URL 토큰을 붙여 넣는다.  callback plugins/collbacks 디렉토리에 on_failure_callbac.. 2024. 6. 19.
[Apache Airflow] KaKao 연동 카카오 Developer 카카오 Developer에서 다음과 같이 애플리케이션을 추가한다.    카카오 연동 제품 설정에서 카카오 로그인의 활성화 설정 상태를 ON으로 변경한다.  Redirect URI는 https://example.com/oauth로 입력한다.  task에 대한 메시지를 전송하기 위해 동의 항목에서 선택 동의로 저장한다.  인가 코드  client_id는 앱 키 →REST API 키로 해서 아래 주소를 웹 브라우저를 통해 접속하여 인가 코드를 발급받는다. https://kauth.kakao.com/oauth/authorize?response_type=code&client_id={client_id}&redirect_uri=https://example.com/oauth&response_t.. 2024. 6. 19.
[Apache Airflow] Connection & Hook 오퍼레이터 문제점 접속 정보 노출 : postgre DB에 대한 User, Password 등접속 정보 변경시 대응 어려움 : 직접 접속하는 DAG이 많은데 접속 정보가 수정되는 경우 해결 방법으로는 Variable (User, Password 등을 Variable에 등록하고 꺼내오기)이나 Hook을 이용한다 (Variable 등록 필요없음). Connection Airflow를 사용하면 사용자 정의 연결 유형을 정의할 수 있다. Airflow UI 화면에서 등록한 커넥션 정보를 말한다. Connection DB 등록  Admin Connection에서 postges_customDB 접속 정보를 등록한다.  Connection을 등록하고 나면 Postgre Hook을 이용해 접속 가능하다. 그리고 Adm.. 2024. 6. 18.
[Apache Airflow] Postgre DB 추가 Postgre 인스턴스 추가 로컬에서 docker-compose.yaml 파일을 다음과 같이 수정한다. services: postgres_custom: image: postgres:13 environment: POSTGRES_USER: 이름 POSTGRES_PASSWORD: 비밀번호 POSGRES_DB: 데이터베이스 TZ: Asia/Seoul volumes: - postgres-custom-db-volume:/var/lib/postgresql/data ports: - 5432:5432 # 외부에서 접속할 수 있도록 포트 노출volumes: postgres-db-volume: postgres-custom-db-volume: #.. 2024. 6. 18.
[Apache Airflow] Custom 오퍼레이터 Custom 오퍼레이터 앞서, SimpleHttp는 기본적으로 1회 호출만 가능하다. 그리고 openAPI를 통해 얻은 데이터셋은 몇 개의 row가 존재할 지 미리 알기 어렵다. 하나의 task로 모든 row를 추출하면서 전처리하여 파일로 저장할 수 있는 기능을 가능케 하는 것은 커스텀 오퍼레이터이다. Airflow는 필요한 오퍼레이터를 직접 만들어 사용할 수 있도록 확장성을 지원한다. BaseOperator를 상속하며 원하는 기능은 파이썬으로 직접 구현 가능하다. 다음과 같이, BaseOperator 상속시 두 가지 메서드를 재정의해야 한다 (Overriding). def__init__ : 클래스에서 객체 생성시 객체에 대한 초기값 지정하는 함수def execute(self, context) : 실제 .. 2024. 6. 18.
[Apache Airflow] Python 오퍼레이터 Python 오퍼레이터 파이썬 함수를 실행시킬수 있는 오퍼레이터이며, 가장 많이 사용되는 오퍼레이터 중 하나이다. dags 디렉토리를 기본적으로 sys.path에 추가해주고 있기 때문에 dags 디렉토리에 dag 파일을 가져다 놓기만 해도 airflow는 DAG 인식이 가능하다. 뿐만 아니라 config, plugins 디렉토리도 sys.path에 추가해 주고 있으므로 파이썬 공통 모듈이나 설정 파일은 plugins 디렉토리 또는 config 디렉토리 안에 만들어 놓으면 dag에서 import하여 사용 가능하다. dags_python_operator.py  from airflow import DAGimport pendulumfrom airflow.operators.python import PythonOp.. 2024. 6. 17.
[Apache Airflow] Email 오퍼레이터 Email 오퍼레이터 Airflow에서 기본 제공하는 오퍼레이터 중 하나이며, Email 전송할 수 있는 오퍼레이터이다. 기존 airflow 컨테이너를 종료한다. ubuntu~$cd airflowubuntu~$sudo docker compose down 구 컨테이너의 restart policy 변경 후 종료한다. ubuntu~$sudo docker update --restart=no $(sudo docker ps -a -q)ubuntu~$sudo docker stop $(sudo docker ps -a -q) 새 컨테이너를 시작한다. ubuntu~$sudo docker compose up 구글 계정 설정 Gmail 계정 내 설정하기에서 G-mail → 설정 → 모든 설정보기 → 전달 및 POP/IMAP .. 2024. 6. 17.
[Apache Airflow] Cron Schedule Cron Schedule Cron Schedule은 다음과 같이 task가 실행되어야 하는 시간 (주기)을 정하기 위한 다섯개의 필드로 구성된 문자열이다. {분} {시} {일} {월} {요일} Airflow의 모든 DAG는 실행 주기를 결정해야 하며, 일반적으로 Cron Schedule을 활용하여 DAG이 실행되어야 할 주기를 명시한다. 특수 문자 특수 문자설명*모든 값-범위 지정,여러 값 지정/증가값 지정L마지막 값 (일, 요일에만 설정 가능)※ 일에 L입력시 해당 월의 마지막 일 의미※ 요일에 L 입력시 토요일 의미#몇 번째 요일인지 지정 Cron Schedule설명비고15 2 * * *매일 02시 15분 0 * * * *매시 정각 0 0 1 * *매월 1일 0시 0분 10 1 * * 1매주 월요일 .. 2024. 6. 17.
[Apache Airflow] 개발 환경 구성 개발 환경 구성 DAG 개발시 서버에서 직접 개발하지 않으며, 일반적으로 git을 활용한 CI/CD 환경을 주로 이용한다. Airflow 서버가 별도로 존재한다고 가정할 때, 코드 개발은 로컬에서 개발 후 완성된 코드를 서버로 배포하는 식으로 진행한다. 다음 명령어로 파이썬 버전을 확인하고 로컬에서 동일한 버전을 설치한다. sudo docker exec -it {스케줄러 컨테이너ID} bashdefault@~: python-V 가상 환경 파이참에서는 프로젝트마다 가상 환경 생성이 가능하다.  Git 구성 git 홈페이지에서 프로그램을 OS에 맞게 다운 받고, git 계정을 설정한다. git config --global --edit 그 다음, 코드를 올리기 위한 github Repository 만들고, 레.. 2024. 6. 14.
[Apache Airflow] DAG 개발 DAG  DAG는 AirFlow에서 실행할 작업들을 순서에 맞게 구성한 워크플로우 (workflow)를 의미하는 방향성 있는 비순환 그래프라고 불린다. 여러 Task가 의존 관계를 통해 연결되어 있으며, 의존 관계는 순환되지 않고 한 방향으로 연결된다. DAG 개발 DAG는 1개 이상의 오퍼레이터로 정의되며 오퍼레이터는 하고자 하는 기능에 대한 설계도라 할 수있다. Task는 오퍼레이터 (클래스)를 통해 생성된 객체로 실제 job을 수행하는 대상이 된다. EC2서버의 /home/ubuntu/dags 디렉토리에 DAG 파일을 다음과 정의한다. /home/ubuntu/홈 디렉토리dags/dag을 모아놓는 디렉토리해당 디렉토리에 dag 파일을 저장해 두면 일정 시간이 지난 후 web에서 확인 가능plugins.. 2024. 6. 14.
728x90
반응형
LIST