Spaces:
Running
Running
| #newtrain.py | |
| import os | |
| import pandas as pd | |
| import json | |
| import re | |
| import torch | |
| import numpy as np | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| Trainer, | |
| TrainingArguments | |
| ) | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix | |
| from sklearn.model_selection import train_test_split # ๋ฐ์ดํฐ ๋ถ๋ฆฌ | |
| from sklearn.utils import class_weight | |
| from torch.nn import CrossEntropyLoss | |
| from typing import Dict, List, Tuple | |
| from dataclasses import dataclass | |
| import platform | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| # --- Matplotlib ํ๊ธ ํฐํธ ์ค์ (๋ก์ปฌ PC์ฉ) --- | |
| try: | |
| if platform.system() == 'Windows': | |
| plt.rc('font', family='Malgun Gothic') | |
| elif platform.system() == 'Darwin': # Mac OS | |
| plt.rc('font', family='AppleGothic') | |
| else: # Linux (์ฝ๋ฉ ๋ฑ) | |
| plt.rc('font', family='NanumBarunGothic') | |
| plt.rcParams['axes.unicode_minus'] = False | |
| except: | |
| print("ํ๊ธ ํฐํธ ์ค์ ์ ์คํจํ์ต๋๋ค. ํผ๋ ํ๋ ฌ์ ๋ผ๋ฒจ์ด ๊นจ์ง ์ ์์ต๋๋ค.") | |
| # --- 1. ์ค์ ๋ถ --- | |
| class TrainingConfig: | |
| mode: str = "emotion" | |
| data_dir: str = "./data" | |
| output_dir: str = "./results" | |
| base_model_name: str = "klue/roberta-base" | |
| eval_batch_size: int = 64 | |
| num_train_epochs: int = 10 | |
| learning_rate: float = 1e-5 | |
| train_batch_size: int = 16 | |
| weight_decay: float = 0.01 | |
| max_length: int = 128 | |
| warmup_ratio: float = 0.1 | |
| def get_model_name(self) -> str: | |
| return self.base_model_name | |
| def get_output_dir(self) -> str: | |
| # v2 ๋ชจ๋ธ ์ ์ฅ ๊ฒฝ๋ก | |
| return os.path.join(self.output_dir, 'emotion_model_v2_manual') | |
| # --- 2. ์ปค์คํ ํด๋์ค ๋ฐ ํจ์ --- | |
| class EmotionDataset(torch.utils.data.Dataset): | |
| def __init__(self, encodings, labels): | |
| self.encodings = encodings | |
| self.labels = labels | |
| def __getitem__(self, idx): | |
| item = {key: val[idx].clone().detach() for key, val in self.encodings.items()} | |
| item['labels'] = torch.tensor(self.labels[idx]) | |
| return item | |
| def __len__(self): | |
| return len(self.labels) | |
| class CustomTrainer(Trainer): | |
| def __init__(self, *args, class_weights=None, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| if class_weights is not None: | |
| self.loss_fct = CrossEntropyLoss(weight=class_weights) | |
| def compute_loss(self, model, inputs, return_outputs=False, **kwargs): | |
| labels = inputs.pop("labels") | |
| outputs = model(**inputs) | |
| logits = outputs.get("logits") | |
| loss = self.loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1)) | |
| return (loss, outputs) if return_outputs else loss | |
| def compute_metrics(pred): | |
| labels = pred.label_ids | |
| preds = pred.predictions.argmax(-1) | |
| acc = accuracy_score(labels, preds) | |
| precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted', zero_division=0) | |
| return {'accuracy': acc, 'f1': f1} | |
| def clean_text(text: str) -> str: | |
| return re.sub(r'[^๊ฐ-ํฃa-zA-Z0-9 ]', '', str(text)) | |
| # --- 3. ๋ฐ์ดํฐ ๋ก๋ ([๋ณ๊ฒฝ] Train/Val/Test ๋ถ๋ฆฌ) --- | |
| def get_data(config: TrainingConfig) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
| if config.mode == 'nsmc': | |
| raise ValueError("์ด ์คํฌ๋ฆฝํธ๋ 'emotion' ๋ชจ๋ ์ ์ฉ์ ๋๋ค.") | |
| elif config.mode == 'emotion': | |
| print("--- ๊ฐ์ ๋ฐ์ดํฐ ๋ก๋ฉ (Train/Val/Test ๋ถ๋ฆฌ) ---") | |
| def load_and_map_labels(file_name): | |
| def map_ecode_to_major_emotion(ecode): | |
| try: code_num = int(ecode[1:]) | |
| except: return None | |
| if 10 <= code_num <= 19: return '๋ถ๋ ธ' | |
| elif 20 <= code_num <= 29: return '์ฌํ' | |
| elif 30 <= code_num <= 39: return '๋ถ์' | |
| elif 40 <= code_num <= 49: return '์์ฒ' | |
| elif 50 <= code_num <= 59: return '๋นํฉ' | |
| elif 60 <= code_num <= 69: return '๊ธฐ์จ' | |
| else: return None | |
| with open(os.path.join(config.data_dir, file_name), 'r', encoding='utf-8') as f: | |
| raw = json.load(f) | |
| data = [{'text': " ".join(d['talk']['content'].values()), 'emotion': d['profile']['emotion']['type']} for d in raw] | |
| df = pd.DataFrame(data) | |
| df['major_emotion'] = df['emotion'].apply(map_ecode_to_major_emotion) | |
| df.dropna(subset=['major_emotion'], inplace=True) | |
| df['cleaned_text'] = df['text'].apply(clean_text) | |
| return df | |
| # 1. Test Set ๋ก๋ (๊ธฐ์กด validation-label.json ์ฌ์ฉ) | |
| df_test = load_and_map_labels("test.json") | |
| # 2. Train Set ๋ก๋ (๊ธฐ์กด training-label.json ์ฌ์ฉ) | |
| df_train_full = load_and_map_labels("training-label.json") | |
| # 3. Train Set์ 9:1๋ก ๋ถ๋ฆฌ (์ ๊ท Train / ์ ๊ท Validation) | |
| label_column_str = 'major_emotion' | |
| df_train, df_val = train_test_split( | |
| df_train_full, | |
| test_size=0.1, # 10%๋ฅผ Validation์ผ๋ก ์ฌ์ฉ | |
| random_state=42, # ๊ฒฐ๊ณผ ์ฌํ์ ์ํด ๊ณ ์ | |
| stratify=df_train_full[label_column_str] # ํด๋์ค ๋น์จ์ ์ ์งํ๋ฉฐ ๋ถ๋ฆฌ | |
| ) | |
| print(f" ์ด ์๋ณธ ํ๋ จ ๋ฐ์ดํฐ: {len(df_train_full)}๊ฐ") | |
| print(f" [์ ๊ท] ํ๋ จ(Train)์ฉ: {len(df_train)}๊ฐ (90%)") | |
| print(f" [์ ๊ท] ๊ฒ์ฆ(Validation)์ฉ: {len(df_val)}๊ฐ (10%)") | |
| print(f" [์ต์ข ] ํ ์คํธ(Test)์ฉ: {len(df_test)}๊ฐ ") | |
| return df_train, df_val, df_test | |
| else: | |
| raise ValueError(f"์ง์ํ์ง ์๋ ๋ชจ๋์ ๋๋ค: {config.mode}") | |
| # --- 4. ๋ฉ์ธ ์คํ ํจ์ --- | |
| def run_training(): | |
| config = TrainingConfig() | |
| df_train, df_val, df_test = get_data(config) | |
| text_column = 'cleaned_text' | |
| label_column_str = 'major_emotion' | |
| # 2. ํ ํฌ๋์ด์ ๋ฐ ๋ผ๋ฒจ ์ธ์ฝ๋ฉ | |
| tokenizer = AutoTokenizer.from_pretrained(config.get_model_name()) | |
| unique_labels = sorted(df_train[label_column_str].unique()) | |
| label_to_id = {label: i for i, label in enumerate(unique_labels)} | |
| id_to_label = {i: label for label, i in label_to_id.items()} | |
| print("\n--- ์์ฑ๋ ๋ผ๋ฒจ ์์ (0~5) ---") | |
| print(unique_labels) # ['๊ธฐ์จ', '๋นํฉ', '๋ถ๋ ธ', '๋ถ์', '์์ฒ', '์ฌํ'] | |
| print("------------------------------") | |
| df_train['label'] = df_train[label_column_str].map(label_to_id) | |
| df_val['label'] = df_val[label_column_str].map(label_to_id) | |
| df_test['label'] = df_test[label_column_str].map(label_to_id) | |
| # 3. ๋ฐ์ดํฐ์ ์์ฑ ๋ฐ ํด๋์ค ๊ฐ์ค์น ๊ณ์ฐ | |
| train_encodings = tokenizer(list(df_train[text_column]), max_length=config.max_length, padding=True, truncation=True, return_tensors="pt") | |
| val_encodings = tokenizer(list(df_val[text_column]), max_length=config.max_length, padding=True, truncation=True, return_tensors="pt") | |
| train_dataset = EmotionDataset(train_encodings, df_train['label'].tolist()) | |
| val_dataset = EmotionDataset(val_encodings, df_val['label'].tolist()) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"\nUsing device: {device}") | |
| # ํด๋์ค ๊ฐ์ค์น ๊ณ์ฐ | |
| manual_weights_list = [6.00, 4.50, 0.85, 1.80, 1.80, 0.92] | |
| class_weights = torch.tensor(manual_weights_list, dtype=torch.float).to(device) | |
| print(f"--- ์๋ ์ ์ฉ๋ ํด๋์ค ๊ฐ์ค์น ---") | |
| print(f"{class_weights.tolist()}") | |
| print(f"---------------------------------") | |
| # 4. ๋ชจ๋ธ ๋ก๋ฉ | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| config.get_model_name(), | |
| num_labels=len(unique_labels), | |
| id2label=id_to_label, | |
| label2id=label_to_id, | |
| ignore_mismatched_sizes=True | |
| ).to(device) | |
| # 5. ํ๋ จ ์คํ | |
| training_args = TrainingArguments( | |
| output_dir=config.get_output_dir(), | |
| num_train_epochs=config.num_train_epochs, | |
| per_device_train_batch_size=config.train_batch_size, | |
| per_device_eval_batch_size=config.eval_batch_size, | |
| learning_rate=config.learning_rate, | |
| weight_decay=config.weight_decay, | |
| warmup_ratio=config.warmup_ratio, | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model="accuracy", | |
| lr_scheduler_type="cosine", | |
| report_to="none" | |
| ) | |
| trainer = CustomTrainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=train_dataset, | |
| eval_dataset=val_dataset, | |
| compute_metrics=compute_metrics, | |
| class_weights=class_weights | |
| ) | |
| print(f"\n '[์ ๊ท ๋ถ๋ฆฌ ๋ฐ์ดํฐ]'๋ก ๋ชจ๋ธ ํ๋ จ์ ์์ํฉ๋๋ค...") | |
| trainer.train() | |
| print("\n ๋ชจ๋ธ ํ๋ จ ์๋ฃ!") | |
| output_dir = config.get_output_dir() | |
| trainer.save_model(output_dir) | |
| tokenizer.save_pretrained(output_dir) | |
| print(f"์ต์ข ๋ชจ๋ธ๊ณผ ํ ํฌ๋์ด์ ๊ฐ {output_dir} ๊ฒฝ๋ก์ ์ ์ฅ๋์์ต๋๋ค.") | |
| # ํ๋ จ ์ค ์ฌ์ฉํ ๊ฒ์ฆ ๋ฐ์ดํฐ(10%)์ ๋ํ ํ๊ฐ ๊ฒฐ๊ณผ | |
| print("\n--- ์ ๊ท Validation Set(10%) ํ๊ฐ ๊ฒฐ๊ณผ (์ฐธ๊ณ ์ฉ) ---") | |
| results = trainer.evaluate() # ๊ธฐ๋ณธ๊ฐ (eval_dataset) | |
| print(results) | |
| # --- ์ต์ข Test Set์ผ๋ก '์ง์ง ์ฑ๋ฅ' ํ๊ฐ --- | |
| print("\n" + "="*50) | |
| print("--- ์ต์ข Test Set์ผ๋ก '์ง์ง ์ฑ๋ฅ' ํ๊ฐ ์์ ---") | |
| print("="*50) | |
| # Test Set์ ์ํ ๋ฐ์ดํฐ์ ์์ฑ | |
| test_encodings = tokenizer(list(df_test[text_column]), max_length=config.max_length, padding=True, truncation=True, return_tensors="pt") | |
| test_dataset = EmotionDataset(test_encodings, df_test['label'].tolist()) | |
| # trainer.predict()๋ฅผ ์ฌ์ฉํ์ฌ Test Set์ ๋ํ ์์ธก ์ํ | |
| test_predictions = trainer.predict(test_dataset) | |
| # compute_metrics ํจ์๋ฅผ ์ฌ์ฌ์ฉํ์ฌ '์ง์ง ์ฑ๋ฅ' ๊ณ์ฐ | |
| final_metrics = compute_metrics(test_predictions) | |
| print(f"*** ์ต์ข Test Set '์ง์ง' ์ฑ๋ฅ ๊ฒฐ๊ณผ ***") | |
| print(f" - ์ต์ข Accuracy: {final_metrics['accuracy']:.4f}") | |
| print(f" - ์ต์ข F1-Score (Weighted): {final_metrics['f1']:.4f}") | |
| print("="*50) | |
| results_path = os.path.join(output_dir, "final_test_results.json") | |
| with open(results_path, "w", encoding='utf-8') as f: | |
| json.dump(final_metrics, f, indent=4, ensure_ascii=False) | |
| print(f"์ต์ข ํ ์คํธ ๊ฒฐ๊ณผ๊ฐ {results_path}์ ์ ์ฅ๋์์ต๋๋ค.") | |
| # --- Test Set ๊ธฐ์ค ํผ๋ ํ๋ ฌ ์์ฑ --- | |
| print("\n--- Test Set ๊ธฐ์ค ํผ๋ ํ๋ ฌ ์์ฑ ---") | |
| y_pred = test_predictions.predictions.argmax(-1) | |
| y_true = test_predictions.label_ids | |
| labels = [id_to_label[i] for i in sorted(id_to_label.keys())] | |
| cm = confusion_matrix(y_true, y_pred, labels=[label_to_id[l] for l in labels]) | |
| plt.figure(figsize=(10, 8)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels) | |
| plt.xlabel('์์ธก ๋ผ๋ฒจ (Predicted Label)') | |
| plt.ylabel('์ค์ ๋ผ๋ฒจ (True Label)') | |
| plt.title('Test Set Confusion Matrix') | |
| cm_path = os.path.join(output_dir, "final_test_confusion_matrix.png") | |
| plt.savefig(cm_path) | |
| print(f"์ต์ข ํผ๋ ํ๋ ฌ์ด {cm_path}์ ์ ์ฅ๋์์ต๋๋ค.") | |
| if __name__ == "__main__": | |
| run_training() |