App Programming/MLops

[MLops] 모델 추론

goatlab 2024. 8. 13. 14:36
728x90
반응형
SMALL

src/main.py

 

모델 확장 파일을 pth로 지정한다.

 

model_ext = "pth"
python src/main.py train --model_name movie_predictor --optimizer adam --num_epochs 20 --lr 0.002

 

 

src/utils/utils.py

 

torch.save는 내부적으로 pickle을 사용해 직렬화 (마샬링) 후 저장하게 된다. 이는 보안적 취약점으로 작용하게 된다. 따라서, 최소한의 검증 절차인는 sha256 해시 알고리즘을 통해 변조 여부를 최소한으로 확인한다.

 

import os
import random
import hashlib
from datetime import datetime
import numpy as np
import torch

def parse_date(date: str):
    date_format = "%y%m%d"
    parsed_date = datetime.strptime(str(date).replace("-", ""), date_format)
    return parsed_date

def init_seed(seed=0):
    np.random.seed(seed)
    torch.manual_seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def project_path():
    return os.path.join(
        os.path.dirname(
            os.path.abspath(__file__)  # /opt/mlops-movie-predictor/src/utils/utils.py
        ),
        "..",
        ".."
    )  # /opt/mlops-movie-predictor

def model_dir(model_name):
    return os.path.join(
        project_path(),
        "models",
        model_name
    )  # /opt/mlops-movie-/predictor/models/{model_name}

def auto_increment_run_suffix(name: str, pad=3):
    suffix = name.split("-")[-1]  # 001
    next_suffix = str(int(suffix) + 1).zfill(pad)  # 002
    return name.replace(suffix, next_suffix)

def calculate_hash(filename):
    sha256_hash = hashlib.sha256()
    with open(filename, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def save_hash(dst):
    hash_ = calculate_hash(dst)
    dst, _ = os.path.splitext(dst)
    with open(f"{dst}.sha256", "w") as f:
        f.write(hash_)

def read_hash(dst):
    dst, _ = os.path.splitext(dst)
    with open(f"{dst}.sha256", "r") as f:
        return f.read()

 

src/inference/inference.py

 

학습된 모델 가져와서 데이터의 실시간 추론하기 위해 inference는 다음과 같이 정의한다.

 

import os
import sys
import glob

sys.path.append(
    os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
import pandas as pd
from src.dataset.watch_log import WatchLogDataset, get_datasets
from src.evaluate.evaluate import evaluate
from src.model.movie_predictor import MoviePredictor
from src.utils.utils import model_dir, calculate_hash, read_hash
import onnx
import onnxruntime

def model_validation(model_path):
    original_hash = read_hash(model_path)
    current_hash = calculate_hash(model_path)
    if original_hash == current_hash:
        print("validation success")
        return True
    else:
        return False

def init_model(checkpoint):
    model = MoviePredictor(**checkpoint["model_params"])
    model.load_state_dict(checkpoint["model_state_dict"])
    criterion = nn.CrossEntropyLoss()
    scaler = checkpoint["scaler"]
    contents_id_map = checkpoint["contents_id_map"]
    return model, criterion, scaler, contents_id_map

def load_checkpoint():
    target_dir = model_dir(MoviePredictor.name)
    models_path = os.path.join(target_dir, "*.pth")
    latest_model = glob.glob(models_path)[-1]

    if model_validation(latest_model):
        checkpoint = torch.load(latest_model)
        return checkpoint
    else:
        raise FileExistsError("Not found or invalid model file")
    return checkpoint

def make_inference_df(data):
    columns = "user_id content_id watch_seconds rating popularity".split()
    return pd.DataFrame(
        data=[data],
        columns=columns
    )

def inference(model, criterion, scaler, contents_id_map, data):
    df = make_inference_df(data)
    dataset = WatchLogDataset(df, scaler=scaler)
    dataset.contents_id_map = contents_id_map
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=False)
    loss, predictions = evaluate(model, dataloader, criterion)
    print(loss, predictions[0])
    return dataset.decode_content_id(predictions[0])

def inference_onnx(scaler, contents_id_map, data):
    df = make_inference_df(data)
    dataset = WatchLogDataset(df, scaler=scaler)
    dataset.contents_id_map = contents_id_map
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=False)

    latest_model = get_latest_model(model_ext="onnx")
    ort_session = onnxruntime.InferenceSession(latest_model, providers=["CPUExecutionProvider"])

    predictions = []
    for data, labels in dataloader:
        ort_inputs = {ort_session.get_inputs()[0].name: data.numpy()}
        ort_outs = [ort_session.get_outputs()[0].name]

        output = ort_session.run(ort_outs, ort_inputs)
        predicted = np.argmax(output[0], 1)[0]
        predictions.append(predicted)

    return dataset.decode_content_id(predictions[0])

if __name__ == '__main__':
    checkpoint = load_checkpoint()
    model, criterion, scaler, contents_id_map = init_model(checkpoint)
    data = np.array([1, 1209290, 4508, 7.577, 1204.764])
    recommend = inference(model, criterion, scaler, contents_id_map, data)
    print(recommend)
python src/inference/inference.py

 

배치 사이즈 추론은 다음과 같이 정의한다.

 

def inference(model, criterion, scaler, contents_id_map, data: np.array, batch_size=1):
    if data.size > 0:
        df = make_inference_df(data)
        dataset = WatchLogDataset(df, scaler=scaler)
    else:
        _, _, dataset = get_datasets()

    dataset.contents_id_map = contents_id_map
    dataloader = DataLoader(
        dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=False
    )
    loss, predictions = evaluate(model, dataloader, criterion)
    print(loss, predictions)
    return [dataset.decode_content_id(idx) for idx in predictions]

if __name__ == '__main__':
    checkpoint = load_checkpoint()
    model, criterion, scaler, contents_id_map = init_model(checkpoint)
    data = np.array([1, 1209290, 4508, 7.577, 1204.764])
    recommend = inference(
        model, criterion, scaler, contents_id_map, data=np.array([]), batch_size=64
    )
    print(recommend)
python src/inference/inference.py

728x90
반응형
LIST