본문 바로가기
App Programming/MLops

[MLops] 데이터베이스

by goatlab 2024. 8. 13.
728x90
반응형
SMALL

MySQL 컨테이너

 

기존 mlops 컨테이너와 동일한 네트워크로 설정하여 통신하도록 생성한다.

 

docker run -itd --name my-mlops-db --network mlops -e MYSQL_ROOT_PASSWORD=root mysql:8.0.39

 

그 다음, 컨테이너에 진입하여 mlops 데이터베이스 생성한다.

 

# 컨테이너 진입
docker exec -it my-mlops-db bash

# MySQL 로그인
mysql -u root -p  # root 패스워드 입력

# 데이터베이스 생성
create database mlops;

# 생성 확인
show databases;

# 패스워드 인증 방식 변경(python mysqlclient 라이브러리 호환성)
alter user 'root'@'%' identified with mysql_native_password by 'root';
flush privileges;

 

라이브러리 설치

 

파이썬 컨테이너에서 다음 명령으로 설치한다.

 

pip install mysqlclient sqlalchemy python-dotenv fastapi uvicorn pydantic

 

.env

 

데이터베이스 환경 변수 파일 내 환경 변수 추가한다. 브릿지 네트워크 설정한 것을 토대로 db 컨테이너와 통신이 가능하도록 한다.

 

/opt/mlops-movie-predictor# vi .env
DB_USER=root
DB_PASSWORD=root
DB_HOST=my-mlops-db
DB_PORT=3306

 

src/postprocess/postprocess.py

 

import os
from sqlalchemy import create_engine, text
import pandas as pd

def get_engine(db_name):
    engine = create_engine(url=(
        f"mysql+mysqldb://"
        f"{os.environ.get('DB_USER')}:"
        f"{os.environ.get('DB_PASSWORD')}@"
        f"{os.environ.get('DB_HOST')}:"
        f"{os.environ.get('DB_PORT')}/"
        f"{db_name}"))
    return engine

def write_db(data: pd.DataFrame, db_name, table_name):
    engine = get_engine(db_name)
    connect = engine.connect()
    data.to_sql(table_name, connect, if_exists="append")
    connect.close()

def read_db(db_name, table_name, k=10):
    engine = get_engine(db_name)
    connect = engine.connect()
    result = connect.execute(
        statement=text(
            f"select recommend_content_id from {table_name} order by `index` desc limit :k"
        ),
        parameters={"table_name": table_name, "k": k}
    )
    connect.close()
    contents = [data[0] for data in result]
    return contents

 

src/inference/inference.py

 

import os
import sys
import glob

sys.path.append(
    os.path.dirname(
        os.path.dirname(
            os.path.dirname(
                os.path.abspath(__file__)
            )
        )
    )
)

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
import pandas as pd
import onnx
import onnxruntime
from icecream import ic
from dotenv import load_dotenv
from src.utils.utils import model_dir, calculate_hash, read_hash
from src.model.movie_predictor import MoviePredictor
from src.dataset.watch_log import WatchLogDataset, get_datasets
from src.evaluate.evaluate import evaluate
from src.postprocess.postprocess import write_db

def model_validation(model_path):
    original_hash = read_hash(model_path)
    current_hash = calculate_hash(model_path)
    if original_hash == current_hash:
        ic("validation success")
        return True
    else:
        return False

def load_checkpoint():
    target_dir = model_dir(MoviePredictor.name)
    models_path = os.path.join(target_dir, "*.pth")
    latest_model = glob.glob(models_path)[-1]

    if model_validation(latest_model):
        checkpoint = torch.load(latest_model)
        return checkpoint
    else:
        raise FileExistsError("Not found or invalid model file")
    return checkpoint

def init_model(checkpoint):
    model = MoviePredictor(**checkpoint["model_params"])
    model.load_state_dict(checkpoint["model_state_dict"])
    criterion = nn.CrossEntropyLoss()
    scaler = checkpoint["scaler"]
    contents_id_map = checkpoint["contents_id_map"]
    return model, criterion, scaler, contents_id_map


def make_inference_df(data):
    columns = "user_id content_id watch_seconds rating popularity".split()
    return pd.DataFrame(
        data=[data],
        columns=columns,
    )

def inference(model, criterion, scaler, contents_id_map, data: np.array, batch_size=1):
    if data.size > 0:
        df = make_inference_df(data)
        dataset = WatchLogDataset(df, scaler=scaler)
    else:
        _, _, dataset = get_datasets()

    dataset.contents_id_map = contents_id_map
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=False)
    loss, predictions = evaluate(model, dataloader, criterion)
    ic(loss)
    ic(predictions)
    return [dataset.decode_content_id(idx) for idx in predictions]


def inference_onnx(scaler, contents_id_map, data):
    df = make_inference_df(data)
    dataset = WatchLogDataset(df, scaler=scaler)
    dataset.contents_id_map = contents_id_map
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=False)

    latest_model = get_latest_model(model_ext="onnx")
    ort_session = onnxruntime.InferenceSession(latest_model, providers=["CPUExecutionProvider"])

    predictions = []
    for data, labels in dataloader:
        ort_inputs = {ort_session.get_inputs()[0].name: data.numpy()}
        ort_outs = [ort_session.get_outputs()[0].name]

        output = ort_session.run(ort_outs, ort_inputs)
        predicted = np.argmax(output[0], 1)[0]
        predictions.append(predicted)

    return dataset.decode_content_id(predictions[0])

def recommend_to_df(recommend: list):
    return pd.DataFrame(
        data=recommend,
        columns="recommend_content_id".split()
    )

if __name__ == "__main__":
    load_dotenv()
    checkpoint = load_checkpoint()
    model, criterion, scaler, contents_id_map = init_model(checkpoint)
    data = np.array([1, 1209290, 4508, 7.577, 1204.764])
    # recommend = inference(model, criterion, scaler, contents_id_map, data)
    recommend = inference(model, criterion, scaler, contents_id_map, data=np.array([]), batch_size=64)
    ic(recommend)
    recommend_df = recommend_to_df(recommend)
    write_db(recommend_df, "mlops", "recommend")

 

src/main.py

 

import os
import sys

sys.path.append(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)

import fire
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
from src.dataset.watch_log import get_datasets
from src.model.movie_predictor import MoviePredictor
from src.utils.utils import init_seed, model_dir, auto_increment_run_suffix
from src.train.train import train
from src.evaluate.evaluate import evaluate
from src.model.movie_predictor import MoviePredictor, model_save
from utils.constant import Optimizers, Models
import wandb
import numpy as np
from src.inference.inference import load_checkpoint, init_model, inference, recommend_to_df
from src.postprocess.postprocess import write_db
from dotenv import load_dotenv

load_dotenv()
init_seed()

def get_runs(project_name):
    return wandb.Api().runs(path=project_name, order="-created_at")


def get_latest_run(project_name):
    runs = get_runs(project_name)
    if not runs:
        return f"{project_name}-000"

    return runs[0].name

def run_train(model_name, optimizer, num_epochs=10, lr=0.001, model_ext="pth"):
    api_key = os.environ["WANDB_API_KEY"]
    wandb.login(key=api_key)

    project_name = model_name.replace("_", "-")
    wandb.init(
        project=project_name,
        notes="content-based movie recommend model",
        tags=["content-based", "movie", "recommend"],
        config=locals(),
    )

    Models.validation(model_name)
    Optimizers.validation(optimizer)

    # 데이터셋 및 DataLoader 생성
    train_dataset, val_dataset, test_dataset = get_datasets()
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=0, pin_memory=False)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=0, pin_memory=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=0, pin_memory=False)

    # 모델 초기화
    model_params = {
        "input_dim": train_dataset.features_dim,
        "num_classes": train_dataset.num_classes
    }
    model_class = Models[model_name.upper()].value
    model = model_class(**model_params)

    # 손실 함수 및 옵티마이저 정의
    criterion = nn.CrossEntropyLoss()
    optimizer_class = Optimizers[optimizer.upper()].value
    optimizer = optimizer_class(model.parameters(), lr=lr)

    # 학습 루프
    epoch = 0
    train_loss = 0
    num_epochs = 10
    for epoch in tqdm(range(num_epochs)):
        train_loss = train(model, train_loader, criterion, optimizer)
        val_loss, _ = evaluate(model, val_loader, criterion)
        wandb.log({"Loss/Train": train_loss})
        wandb.log({"Loss/Valid": val_loss})
        print(f"Epoch {epoch + 1}/{num_epochs}, "
              f"Train Loss: {train_loss:.4f}, "
              f"Val Loss: {val_loss:.4f}, "
              f"Val-Train Loss : {val_loss-train_loss:.4f}")

    model_ext = "pth"  # or "pth"
    model_save(
        model=model,
        model_params=model_params,
        epoch=num_epochs,
        optimizer=optimizer,
        loss=train_loss,
        scaler=train_dataset.scaler,
        contents_id_map=train_dataset.contents_id_map,
        ext=model_ext,
    )

    # 테스트
    model.eval()
    test_loss, predictions = evaluate(model, test_loader, criterion)
    print(f"Test Loss : {test_loss:.4f}")
    # print([train_dataset.decode_content_id(idx) for idx in predictions])

def run_inference(data=None, batch_size=64):
    checkpoint = load_checkpoint()
    model, criterion, scaler, contents_id_map = init_model(checkpoint)

    if data is None:
        data = []

    data = np.array(data)

    recommend = inference(model, criterion, scaler, contents_id_map, data, batch_size)
    print(recommend)

    write_db(recommend_to_df(recommend), "mlops", "recommend")

if __name__ == '__main__':
    fire.Fire({
        "preprocessing": run_preprocessing,
        "train": run_train,
        "inference": run_inference,
    })

 

src/webapp.py

 

import os
import sys

sys.path.append(
    os.path.dirname(
        os.path.dirname(
            os.path.abspath(__file__)
        )
    )
)

import numpy as np
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from src.inference.inference import load_checkpoint, init_model, inference
from src.postprocess.postprocess import read_db

app = FastAPI()

load_dotenv()
checkpoint = load_checkpoint()
model, criterion, scaler, contents_id_map = init_model(checkpoint)

class InferenceInput(BaseModel):
    user_id: int
    content_id: int
    watch_seconds: int
    rating: float
    popularity: float

# https://api.endpoint/movie-predictor/inference?user_id=12345&content_id=456...

@app.post("/predict")
async def predict(input_data: InferenceInput):
    try:
        data = np.array([
            input_data.user_id,
            input_data.content_id,
            input_data.watch_seconds,
            input_data.rating,
            input_data.popularity
        ])
        recommend = inference(model, criterion, scaler, contents_id_map, data)
        return {"recommended_content_id": recommend}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/batch-predict")
async def batch_predict(k: int = 5):
    try:
        recommend = read_db("mlops", "recommend", k=k)
        return {"recommended_content_id": recommend}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

 

API 서버 시작 스크립트 작성

 

vi start_api_server.sh
#!/bin/bash
nohup python src/webapp.py &
chmod +x start_api_server.sh
docker commit my-mlops my-mlops:v1

docker run -itd --name my-mlops-api --network mlops -p 9999:8000 my-mlops:v1

cat nohup.out

 

FastAPI 서버 실행

 

인바운드 규칙에 9999번 포트를 사용한다.

 

 

EC2 퍼블릭 IP로 서버에 접속한다.

 

 

POST에서 Try it out을 누르면 테스트도 가능하다.

 

 

데이터베이스 조회

 

728x90
반응형
LIST

'App Programming > MLops' 카테고리의 다른 글

[MLops] MLflow  (0) 2024.08.19
[MLops] GitHub Action  (0) 2024.08.17
[MLops] 모델 추론  (0) 2024.08.13
[MLops] 학습 결과 기록하기  (0) 2024.08.12
[MLops] 모델 저장하기  (0) 2024.08.12