728x90
반응형
SMALL
src/main.py
모델 확장 파일을 pth로 지정한다.
model_ext = "pth"
python src/main.py train --model_name movie_predictor --optimizer adam --num_epochs 20 --lr 0.002
src/utils/utils.py
torch.save는 내부적으로 pickle을 사용해 직렬화 (마샬링) 후 저장하게 된다. 이는 보안적 취약점으로 작용하게 된다. 따라서, 최소한의 검증 절차인는 sha256 해시 알고리즘을 통해 변조 여부를 최소한으로 확인한다.
import os
import random
import hashlib
from datetime import datetime
import numpy as np
import torch
def parse_date(date: str):
date_format = "%y%m%d"
parsed_date = datetime.strptime(str(date).replace("-", ""), date_format)
return parsed_date
def init_seed(seed=0):
np.random.seed(seed)
torch.manual_seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def project_path():
return os.path.join(
os.path.dirname(
os.path.abspath(__file__) # /opt/mlops-movie-predictor/src/utils/utils.py
),
"..",
".."
) # /opt/mlops-movie-predictor
def model_dir(model_name):
return os.path.join(
project_path(),
"models",
model_name
) # /opt/mlops-movie-/predictor/models/{model_name}
def auto_increment_run_suffix(name: str, pad=3):
suffix = name.split("-")[-1] # 001
next_suffix = str(int(suffix) + 1).zfill(pad) # 002
return name.replace(suffix, next_suffix)
def calculate_hash(filename):
sha256_hash = hashlib.sha256()
with open(filename, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def save_hash(dst):
hash_ = calculate_hash(dst)
dst, _ = os.path.splitext(dst)
with open(f"{dst}.sha256", "w") as f:
f.write(hash_)
def read_hash(dst):
dst, _ = os.path.splitext(dst)
with open(f"{dst}.sha256", "r") as f:
return f.read()
src/inference/inference.py
학습된 모델 가져와서 데이터의 실시간 추론하기 위해 inference는 다음과 같이 정의한다.
import os
import sys
import glob
sys.path.append(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
import pandas as pd
from src.dataset.watch_log import WatchLogDataset, get_datasets
from src.evaluate.evaluate import evaluate
from src.model.movie_predictor import MoviePredictor
from src.utils.utils import model_dir, calculate_hash, read_hash
import onnx
import onnxruntime
def model_validation(model_path):
original_hash = read_hash(model_path)
current_hash = calculate_hash(model_path)
if original_hash == current_hash:
print("validation success")
return True
else:
return False
def init_model(checkpoint):
model = MoviePredictor(**checkpoint["model_params"])
model.load_state_dict(checkpoint["model_state_dict"])
criterion = nn.CrossEntropyLoss()
scaler = checkpoint["scaler"]
contents_id_map = checkpoint["contents_id_map"]
return model, criterion, scaler, contents_id_map
def load_checkpoint():
target_dir = model_dir(MoviePredictor.name)
models_path = os.path.join(target_dir, "*.pth")
latest_model = glob.glob(models_path)[-1]
if model_validation(latest_model):
checkpoint = torch.load(latest_model)
return checkpoint
else:
raise FileExistsError("Not found or invalid model file")
return checkpoint
def make_inference_df(data):
columns = "user_id content_id watch_seconds rating popularity".split()
return pd.DataFrame(
data=[data],
columns=columns
)
def inference(model, criterion, scaler, contents_id_map, data):
df = make_inference_df(data)
dataset = WatchLogDataset(df, scaler=scaler)
dataset.contents_id_map = contents_id_map
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=False)
loss, predictions = evaluate(model, dataloader, criterion)
print(loss, predictions[0])
return dataset.decode_content_id(predictions[0])
def inference_onnx(scaler, contents_id_map, data):
df = make_inference_df(data)
dataset = WatchLogDataset(df, scaler=scaler)
dataset.contents_id_map = contents_id_map
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=False)
latest_model = get_latest_model(model_ext="onnx")
ort_session = onnxruntime.InferenceSession(latest_model, providers=["CPUExecutionProvider"])
predictions = []
for data, labels in dataloader:
ort_inputs = {ort_session.get_inputs()[0].name: data.numpy()}
ort_outs = [ort_session.get_outputs()[0].name]
output = ort_session.run(ort_outs, ort_inputs)
predicted = np.argmax(output[0], 1)[0]
predictions.append(predicted)
return dataset.decode_content_id(predictions[0])
if __name__ == '__main__':
checkpoint = load_checkpoint()
model, criterion, scaler, contents_id_map = init_model(checkpoint)
data = np.array([1, 1209290, 4508, 7.577, 1204.764])
recommend = inference(model, criterion, scaler, contents_id_map, data)
print(recommend)
python src/inference/inference.py
배치 사이즈 추론은 다음과 같이 정의한다.
def inference(model, criterion, scaler, contents_id_map, data: np.array, batch_size=1):
if data.size > 0:
df = make_inference_df(data)
dataset = WatchLogDataset(df, scaler=scaler)
else:
_, _, dataset = get_datasets()
dataset.contents_id_map = contents_id_map
dataloader = DataLoader(
dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=False
)
loss, predictions = evaluate(model, dataloader, criterion)
print(loss, predictions)
return [dataset.decode_content_id(idx) for idx in predictions]
if __name__ == '__main__':
checkpoint = load_checkpoint()
model, criterion, scaler, contents_id_map = init_model(checkpoint)
data = np.array([1, 1209290, 4508, 7.577, 1204.764])
recommend = inference(
model, criterion, scaler, contents_id_map, data=np.array([]), batch_size=64
)
print(recommend)
python src/inference/inference.py
728x90
반응형
LIST
'App Programming > MLops' 카테고리의 다른 글
[MLops] GitHub Action (0) | 2024.08.17 |
---|---|
[MLops] 데이터베이스 (0) | 2024.08.13 |
[MLops] 학습 결과 기록하기 (0) | 2024.08.12 |
[MLops] 모델 저장하기 (0) | 2024.08.12 |
[MLops] 모델 학습 및 평가 (0) | 2024.08.12 |