Spaces:
Running
Running
| # ํ์ผ ์ด๋ฆ: evaluate_step1.py | |
| # 1๋จ๊ณ ๋ชจ๋ธ์ ๋ถ๋ฌ์์ ํ๊ฐ ๋ฐ ํผ๋ ํ๋ ฌ๋ง ๋ค์ ์์ฑํ๋ ์คํฌ๋ฆฝํธ | |
| import os | |
| import pandas as pd | |
| from dataclasses import dataclass | |
| import json | |
| import re | |
| import torch | |
| import numpy as np | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| Trainer, | |
| TrainingArguments | |
| ) | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix | |
| import platform | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| # train_final.py์ ๋์ผํ ํด๋์ค ๋ฐ ํจ์๋ค | |
| # (๋ฐ์ดํฐ ๋ก๋ฉ, ๋ฉํธ๋ฆญ ๊ณ์ฐ ๋ฑ) | |
| # ----------------------------------------------------------------- | |
| class TrainingConfig: | |
| mode: str = "emotion" | |
| data_dir: str = "./data" | |
| output_dir: str = "./results1024" | |
| # [์์ ] 1์ฐจ NSMC ๋ชจ๋ธ ๊ฒฝ๋ก (์ฌ์ฉ์๋ ๊ฒฝ๋ก๋ก) | |
| base_model_name: str = r"E:\Emotion\results\nsmc_model" | |
| eval_batch_size: int = 64 | |
| max_length: int = 128 | |
| def get_step1_model_dir(self) -> str: | |
| # [์์ ] ์ด๋ฏธ ํ๋ จ๋ 1๋จ๊ณ ๋ชจ๋ธ์ "best_model" ํด๋๋ฅผ ์ง์ | |
| return os.path.join(self.output_dir, 'emotion_model_step1_3class', 'best_model') | |
| class EmotionDataset(torch.utils.data.Dataset): | |
| def __init__(self, encodings, labels): | |
| self.encodings = encodings | |
| self.labels = labels | |
| def __getitem__(self, idx): | |
| item = {key: val[idx].clone().detach() for key, val in self.encodings.items()} | |
| item['labels'] = torch.tensor(self.labels[idx]) | |
| return item | |
| def __len__(self): | |
| return len(self.labels) | |
| def compute_metrics(pred): | |
| labels = pred.label_ids | |
| preds = pred.predictions.argmax(-1) | |
| acc = accuracy_score(labels, preds) | |
| precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted', zero_division=0) | |
| return {'accuracy': acc, 'f1': f1} | |
| def clean_text(text: str) -> str: | |
| return re.sub(r'[^๊ฐ-ํฃa-zA-Z0-9 ]', '', str(text)) | |
| def map_ecode_to_6class(e_code_str): | |
| if not isinstance(e_code_str, str) or not e_code_str.startswith('E'): return None | |
| try: code_num = int(e_code_str[1:]) | |
| except (ValueError, TypeError): return None | |
| if 0 <= code_num <= 9: return '๊ธฐ์จ' | |
| elif 10 <= code_num <= 19: return '๋ถ๋ ธ' | |
| elif 20 <= code_num <= 29: return '์ฌํ' | |
| elif 30 <= code_num <= 39: return '๋ถ์' | |
| elif 40 <= code_num <= 49: return '์์ฒ' | |
| elif 50 <= code_num <= 59: return '๋นํฉ' | |
| else: return None | |
| def map_6_to_3_groups(emotion_6_class): | |
| if emotion_6_class == '์ฌํ': return '๊ทธ๋ฃน1(์ฌํ)' | |
| elif emotion_6_class in ['๋ถ์', '์์ฒ']: return '๊ทธ๋ฃน2(๋ถ์,์์ฒ)' | |
| elif emotion_6_class in ['๋ถ๋ ธ', '๋นํฉ', '๊ธฐ์จ']: return '๊ทธ๋ฃน3(๋ถ๋ ธ,๋นํฉ,๊ธฐ์จ)' | |
| else: return None | |
| def load_and_process(text_file, label_file, data_dir): | |
| text_path = os.path.join(data_dir, text_file) | |
| label_path = os.path.join(data_dir, label_file) | |
| try: | |
| df_text = pd.read_excel(text_path, header=0) | |
| with open(label_path, 'r', encoding='utf-8') as f: | |
| labels_raw = json.load(f) | |
| except FileNotFoundError as e: | |
| print(f"์ค๋ฅ: ํ์ ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค: {e}") | |
| return pd.DataFrame() | |
| e_codes = [] | |
| for dialogue in labels_raw: | |
| try: e_codes.append(dialogue['profile']['emotion']['type']) | |
| except KeyError: e_codes.append(None) | |
| if len(df_text) != len(e_codes): | |
| min_len = min(len(df_text), len(e_codes)) | |
| df_text = df_text.iloc[:min_len] | |
| e_codes = e_codes[:min_len] | |
| df_labels = pd.DataFrame({'e_code': e_codes}) | |
| df_combined = pd.concat([df_text, df_labels], axis=1) | |
| dialogue_cols = [col for col in df_combined.columns if '๋ฌธ์ฅ' in str(col)] | |
| for col in dialogue_cols: | |
| df_combined[col] = df_combined[col].astype(str).fillna('') | |
| df_combined['text'] = df_combined[dialogue_cols].apply(lambda row: ' '.join(row), axis=1) | |
| df_combined['cleaned_text'] = df_combined['text'].apply(clean_text) | |
| df_combined['major_emotion'] = df_combined['e_code'].apply(map_ecode_to_6class) | |
| df_combined.dropna(subset=['major_emotion'], inplace=True) | |
| df_combined['group_emotion'] = df_combined['major_emotion'].apply(map_6_to_3_groups) | |
| df_combined.dropna(subset=['group_emotion', 'cleaned_text'], inplace=True) | |
| df_combined = df_combined[df_combined['cleaned_text'].str.strip() != ''] | |
| return df_combined | |
| def get_test_data(config: TrainingConfig) -> pd.DataFrame: | |
| """[์์ ] Test Set๋ง ๋ถ๋ฌ์ค๋ ํจ์""" | |
| print("Loading TEST set (from validation-origin.xlsx + test.json)...") | |
| df_test = load_and_process( | |
| "validation-origin.xlsx", | |
| "test.json", | |
| config.data_dir | |
| ) | |
| if df_test.empty: | |
| print("์ค๋ฅ: Test ๋ฐ์ดํฐ ๋ก๋ฉ ์คํจ.") | |
| return pd.DataFrame() | |
| print(f"Test data loaded: {len(df_test)} rows") | |
| return df_test | |
| # ----------------------------------------------------------------- | |
| def run_evaluation(): | |
| # --- 1. ํ๊ธ ํฐํธ ์ค์ --- | |
| try: | |
| if platform.system() == 'Windows': | |
| plt.rc('font', family='Malgun Gothic') | |
| elif platform.system() == 'Darwin': # Mac OS | |
| plt.rc('font', family='AppleGothic') | |
| else: # Linux (์ฝ๋ฉ ๋ฑ) | |
| plt.rc('font', family='NanumBarunGothic') | |
| plt.rcParams['axes.unicode_minus'] = False | |
| print("ํ๊ธ ํฐํธ ์ค์ ์๋ฃ.") | |
| except Exception as e: | |
| print(f"ํ๊ธ ํฐํธ ์ค์ ๊ฒฝ๊ณ : {e}. ํผ๋ ํ๋ ฌ์ ๋ผ๋ฒจ์ด ๊นจ์ง ์ ์์ต๋๋ค.") | |
| config = TrainingConfig() | |
| # --- 2. ๋ชจ๋ธ ๋ฐ ํ ํฌ๋์ด์ ๋ก๋ --- | |
| model_dir = config.get_step1_model_dir() | |
| output_dir = os.path.dirname(model_dir) # .../emotion_model_step1_3class | |
| if not os.path.exists(model_dir): | |
| print(f"์ค๋ฅ: ํ๋ จ๋ ๋ชจ๋ธ์ ์ฐพ์ ์ ์์ต๋๋ค: {model_dir}") | |
| print("train_final.py๋ฅผ ๋จผ์ ์คํํ์ธ์.") | |
| return | |
| print(f"์ ์ฅ๋ 1๋จ๊ณ ๋ชจ๋ธ ๋ก๋: {model_dir}") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = AutoModelForSequenceClassification.from_pretrained(model_dir).to(device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_dir) | |
| # --- 3. ํ ์คํธ ๋ฐ์ดํฐ ๋ก๋ ๋ฐ ์ ์ฒ๋ฆฌ --- | |
| df_test = get_test_data(config) | |
| if df_test.empty: return | |
| # ๋ผ๋ฒจ ์ธ์ฝ๋ฉ (๋ชจ๋ธ config์์ ๋ถ๋ฌ์ค๊ธฐ) | |
| label_to_id = model.config.label2id | |
| id_to_label = model.config.id2label | |
| print(f"๋ชจ๋ธ์ ๋ผ๋ฒจ ๋งต ๋ก๋: {label_to_id}") | |
| df_test['label'] = df_test['group_emotion'].map(label_to_id) | |
| # NaN ๋ผ๋ฒจ์ด ์๋์ง ํ์ธ (test.json์ ํ๋ จ ์ ์๋ ๋ผ๋ฒจ์ด ์์ ๊ฒฝ์ฐ) | |
| if df_test['label'].isnull().any(): | |
| print("๊ฒฝ๊ณ : Test set์ ํ๋ จ ์ ์๋ ๋ผ๋ฒจ์ด ์์ต๋๋ค. ํด๋น ๋ฐ์ดํฐ๋ ํ๊ฐ์์ ์ ์ธ๋ฉ๋๋ค.") | |
| df_test.dropna(subset=['label'], inplace=True) | |
| df_test['label'] = df_test['label'].astype(int) | |
| test_encodings = tokenizer(list(df_test['cleaned_text']), max_length=config.max_length, padding=True, truncation=True, return_tensors="pt") | |
| test_dataset = EmotionDataset(test_encodings, df_test['label'].tolist()) | |
| # --- 4. Trainer ์ค์ (ํ๊ฐ ์ ์ฉ) --- | |
| training_args = TrainingArguments( | |
| output_dir=output_dir, | |
| per_device_eval_batch_size=config.eval_batch_size, | |
| report_to="none" | |
| ) | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| compute_metrics=compute_metrics | |
| ) | |
| # --- 5. ํ๊ฐ ์คํ ๋ฐ ํผ๋ ํ๋ ฌ ์์ฑ --- | |
| print("\n--- (2) Test Set ์ต์ข ํ๊ฐ (์๋ฅ) ---") | |
| test_predictions = trainer.predict(test_dataset) | |
| test_metrics = test_predictions.metrics | |
| print(f"์ต์ข Test ํ๊ฐ ๊ฒฐ๊ณผ: {test_metrics}") | |
| # (์ด ๋ถ๋ถ์ ์ด๋ฏธ ์ฑ๊ณตํ์ผ๋ฏ๋ก ์ค๋ณต ์ ์ฅ) | |
| results_path = os.path.join(output_dir, "TEST_evaluation_results.json") | |
| with open(results_path, "w", encoding='utf-8') as f: | |
| json.dump(test_metrics, f, indent=4, ensure_ascii=False) | |
| print(f"*** ์ต์ข Test ํ๊ฐ ๊ฒฐ๊ณผ๊ฐ {results_path}์ ์ ์ฅ๋์์ต๋๋ค. ***") | |
| print("\n--- ํผ๋ ํ๋ ฌ ์์ฑ (Test Set) ---") | |
| try: | |
| y_pred = test_predictions.predictions.argmax(-1) | |
| y_true = test_predictions.label_ids | |
| labels = [id_to_label[i] for i in sorted(id_to_label.keys())] | |
| cm = confusion_matrix(y_true, y_pred, labels=[label_to_id[l] for l in labels]) | |
| plt.figure(figsize=(10, 8)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels) | |
| plt.xlabel('์์ธก ๋ผ๋ฒจ (Predicted Label)') | |
| plt.ylabel('์ค์ ๋ผ๋ฒจ (True Label)') | |
| plt.title('Confusion Matrix (TEST Set - 3 Groups)') | |
| cm_path = os.path.join(output_dir, "TEST_confusion_matrix.png") | |
| plt.savefig(cm_path) | |
| print(f"Test Set ํผ๋ ํ๋ ฌ์ด {cm_path}์ ์ ์ฅ๋์์ต๋๋ค.") | |
| print("--- ํ๊ฐ ๋ฐ ํผ๋ ํ๋ ฌ ์์ฑ ์๋ฃ ---") | |
| except Exception as e: | |
| print("\n!!! ์น๋ช ์ ์ค๋ฅ: ํผ๋ ํ๋ ฌ ์์ฑ ์คํจ !!!") | |
| print(f"์ค๋ฅ ๋ฉ์์ง: {e}") | |
| print("matplotlib, seaborn, ๋๋ ํ๊ธ ํฐํธ ์ค์ ์ ํ์ธํ์ธ์.") | |
| if __name__ == "__main__": | |
| run_evaluation() |