본문 바로가기
App Programming/Apache Airflow

[Apache Airflow] Postgre DB 추가

by goatlab 2024. 6. 18.
728x90
반응형
SMALL

Postgre 인스턴스 추가

 

로컬에서 docker-compose.yaml 파일을 다음과 같이 수정한다.

 

services:
  postgres_custom:
    image: postgres:13
    environment:
      POSTGRES_USER: 이름
      POSTGRES_PASSWORD: 비밀번호
      POSGRES_DB: 데이터베이스
      TZ: Asia/Seoul
    volumes:
      - postgres-custom-db-volume:/var/lib/postgresql/data
    ports:
      - 5432:5432 # 외부에서 접속할 수 있도록 포트 노출
volumes:
  postgres-db-volume:
  postgres-custom-db-volume: # 새 볼륨 할당

 

고정 IP 할당

 

기본적으로 컨테이너들은 유동 IP를 지니며 (재기동시 IP변경 가능), 고정 IP를 할당하려면 신규 networks를 만들어 할당해야 한다. 반면, networks를 지정하지 않은 기존 컨테이너들은 default network에 묶이게 되어 컨테이너간 network가 서로 분리된다. 따라서, 컨테이너간 통신이 가능하려면 컨테이너를 모두 동일 networks에 할당 필요하다. 컨테이너에 새로운 network 할당시 default network가 사용하고 있지 않은 네트워크 대역을 지정해야 한다. 다음 명령어로 기존 default network (ubuntu_default)가 기존 사용하고 있는 대역 확인한다.

 

sudo docker network ls

sudo docker network inspect {ubuntu_default NETWORK ID}

 

사용하고 있는 네트워크 대역은 EC2마다 다를 수 있다.

 

Network 추가

 

로컬에서 default 네트워크가 쓰고 있지 않은 서브넷으로 구성하기 위해 docker-compose.yaml 파일을 다음과 같이 수정한다.

 

networks:
  network_custom:
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 172.28.0.0/16
          gateway: 172.28.0.1

 

그 다음, postgres_custom 컨테이너 뿐만 아니라 다른 컨테이너에도 network_custom 할당하고 IP를 부여한다.

 

postgres_custom: 172.28.0.3
postgres: 172.28.0.4+포트 노출 설정:(5431:5432)
redis: 172.28.0.5
airflow-webserver: 172.28.0.6
airflow-scheduler: 172.28.0.7
airflow-worker: 172.28.0.8
airflow-triggerer: 172.28.0.9
airflow-init: 172.28.0.10

 

docker-compose.yaml 파일 수정이 완료되면 git에 배포하고 restart한다.

 

ubuntu~$git pull
ubuntu~$cd airflow
ubuntu~$sudo docker compose restart

 

EC2 보안 그룹 수정

 

추가한 postgres_custom에 접속하기 위해 EC2 서버의 5432 포트, 메타 DB로 사용되는 기존 postgres 접속을 위해 5431 포트가 필요하다.

 

 

Postgre DB Dbeaver 접속

 

dbeaver을 설치하고, postgres_custom 접속은 EC2 퍼블릭 IP의 포트 5432로 한다.

 

 

postgres 메타 DB는 EC2 퍼블릭 IP의 포트:5431로 접속한다. 

 

 

SQL

 

CREATE TABLE py_opr_drct_insrt(
    dag_id varchar(100),
    task_id varchar(100),
    run_id varchar(100),
    msg text
)

 

오퍼레이터 작성

 

위에서 만든 테이블에 insert하는 DAG를 만든다.

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator

with DAG(
        dag_id='dags_python_with_postgres',
        start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
        import psycopg2
        from contextlib import closing
        with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
            with closing(conn.cursor()) as cursor:
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
                cursor.execute(sql, (dag_id, task_id, run_id, msg))
                conn.commit()

    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_args=['172.28.0.3', '5432', '1234', '1234', '1234']
    )

    insrt_postgres

 

 

728x90
반응형
LIST