Spaces:
Sleeping
Sleeping
| import os | |
| import math | |
| class MaxEntropyModel: | |
| def __init__(self, ): | |
| self.train_set = [] | |
| self.features = {} | |
| self.labels = {} | |
| self.labels = { | |
| '1': 1, '2': 1 | |
| } | |
| def load_data(self, fn): | |
| with open(fn, "r") as rf: | |
| for line in rf: | |
| label, review = line.strip().split(',') | |
| label = label[1: -1] | |
| review = review.split(' ') | |
| fields = [str(int(label))] + review | |
| if review != '': | |
| label = str(int(label)) | |
| self.labels[label] = 1 | |
| for s in set(fields[1:]): | |
| if (label, s) not in self.features: | |
| self.features[(label, s)] = 1 | |
| else: | |
| self.features[(label, s)] += 1 | |
| self.train_set.append(fields) | |
| rf.close() | |
| def initialize_parameters(self, ): | |
| self.train_set_size = len(self.train_set) | |
| self.M = max([len(record)-1 for record in self.train_set]) | |
| self.ep = [0.0 for _ in range(len(self.features))] | |
| for i_f, feat in enumerate(self.features): | |
| self.ep[i_f] = float(self.features[feat]) / float(self.train_set_size) | |
| self.features[feat] = i_f | |
| self.weights = [0.0 for _ in range(len(self.features))] | |
| self.last_weights = self.weights | |
| def get_prob_weight(self, features, label): | |
| weight = 0.0 | |
| for feat in features: | |
| # print(label, feat) | |
| if (label, feat) in self.features: | |
| weight += self.weights[self.features[(label, feat)]] | |
| prob_weight = math.exp(weight) | |
| # print(f"label: {label}, prob_weight: {prob_weight}") | |
| return prob_weight | |
| def get_expected_features(self, ): | |
| expected_features = [0.0 for _ in range(len(self.features))] | |
| for record in self.train_set: | |
| features = record[1:] | |
| prob = self.calculate_probability(features) | |
| for feat in features: | |
| for w, l in prob: | |
| if (l, feat) in self.features: | |
| idx = self.features[(l, feat)] | |
| expected_features[idx] += w * (1.0 / self.train_set_size) | |
| return expected_features | |
| def calculate_probability(self, features): | |
| weights = [(self.get_prob_weight(features, l), l) for l in self.labels] | |
| tot_weights = [w for w, l in weights] | |
| Z = sum(tot_weights) | |
| prob = [(w / Z, l) for w, l in weights] | |
| return prob | |
| def train(self, max_iter=10000): | |
| self.initialize_parameters() | |
| for i in range(max_iter): | |
| print(f"[Training] iter {i + 1} ...") | |
| self.new_ep = self.get_expected_features() | |
| self.last_weights = self.weights[:] | |
| for i, w in enumerate(self.weights): | |
| delta = 1.0 / self.M * math.log(self.ep[i] / self.new_ep[i]) | |
| self.weights[i] = self.weights[i] + delta | |
| if i % 10 == 0: | |
| test_data_path = "../preprocessed_data/yelp_test.txt" | |
| print(f"Start testing...") | |
| self.test(test_data_path) | |
| def test(self, test_data_path): | |
| f = open(file=test_data_path) | |
| tot_test_nn = 0 | |
| correct_test_nn = 0 | |
| for line in f: | |
| label, review = line.strip().split(',') | |
| label = label[1: -1] | |
| review = review.split(' ') | |
| # fields = [str(int(label))] + review ## get split review ## # | |
| # input text: review # | |
| # output: label # | |
| # review # | |
| prob = self.calculate_probability(review) | |
| prob.sort(reverse=True) | |
| print(label, prob) | |
| ##### Calculate whether the prediction is correct ##### | |
| maxx_prob_idx = int(prob[0][1]) | |
| label_idx = int(label) | |
| if maxx_prob_idx == label_idx: | |
| correct_test_nn += 1 | |
| tot_test_nn += 1 | |
| ##### Calculate whether the prediction is correct ##### | |
| f.close() | |
| acc = float(correct_test_nn) / float(tot_test_nn) | |
| print(f"[Test] Acc: {acc}") | |
| def save_ckpt(self, sv_ckpt_path): | |
| sv_features = self.features | |
| sv_weights = self.last_weights | |
| sv_ckpt = { | |
| 'features': sv_features, | |
| 'weights': sv_weights | |
| } | |
| np.save(sv_ckpt_path, sv_ckpt) | |
| print(f"ckpt with features and weights saved to {sv_ckpt_path}") | |