#!/usr/bin/env python3 import argparse import heapq import numpy as np import sklearn.datasets from sklearn.metrics import mean_squared_error, accuracy_score from sklearn.model_selection import train_test_split parser = argparse.ArgumentParser() parser.add_argument("--dataset", default="wine", type=str, help="Použitý dataset; buď `wine` nebo `diabetes`", choices=["wine", "diabetes"]) parser.add_argument("--max_depth", default=None, type=int, help="Maximální hloubka rozhodovacího stromu") parser.add_argument("--max_leaves", default=None, type=int, help="Maximální počet listů stromu") parser.add_argument("--min_to_split", default=2, type=int, help="Minimální počet dat pro rozdělení vrcholu (listu)") parser.add_argument("--seed", default=42, type=int, help="Náhodný seed") parser.add_argument("--test_size", default=0.25, type=float, help="Velikost testovací množiny") class DecisionTree: root: 'Node' = None def __init__( self, args: argparse.Namespace, ): self.args = args self.max_depth = args.max_depth self.max_leaves = args.max_leaves self.min_to_split = args.min_to_split def fit(self, X, y): self.root = Node(X, y, depth=0, args=self.args) min_heap = [] time_created = 0 heapq.heappush(min_heap, (self.root.split_criteria_improve, time_created, self.root)) time_created += 1 number_leaves = 1 while len(min_heap) > 0: node: 'Node' _, _, node = heapq.heappop(min_heap) if self.max_depth is not None and node.depth == self.max_depth: continue if self.max_leaves is not None and number_leaves == self.max_leaves: break if self.min_to_split is not None and node.instances < self.min_to_split: continue if node.split_criteria_improve == 0: continue left, right = node.split_node(self.args) # POZOR: toto není rekurzivní přístup a pro random forest je tato # část změněna heapq.heappush(min_heap, (left.split_criteria_improve, time_created, left)) heapq.heappush(min_heap, (right.split_criteria_improve, time_created+1, right)) number_leaves += 1 time_created += 2 def predict(self, X): if self.root is None: raise Exception('You must call fit before predict') predictions = [] for data in X: predictions.append(self.root.predict(data, self.args)) return predictions class Node: depth: int data: np.ndarray target: np.ndarray leaf: bool = True left: 'Node' = None right: 'Node' = None criterion_value: float split_feature_idx: int split_feature_value: float split_criteria_improve: float _most_frequent_class: int = None def __init__(self, data, target, depth, args: argparse.Namespace): self.data = data self.target = target self.criterion_value = count_criteria(args, target) self.depth = depth self.find_best_split(args) def find_best_split(self, args: argparse.Namespace): best_criterion = None for feature_idx in range(self.data.shape[1]): features = sorted( self.data[:, feature_idx]) split_points = [] for i in range(1, len(features)): if features[i - 1] == features[i]: continue else: split_points.append( (features[i] + features[i - 1]) / 2) if len(split_points) == 0: # v bodech pro rozdělení budeme mít alespoň jeden bod split_points = [ features[0] ] for split_point in split_points: left_targets = self.target[ self.data[:, feature_idx] <= split_point] right_targets = self.target[ self.data[:, feature_idx] > split_point] left_criteria = count_criteria(args, left_targets) right_criteria = count_criteria(args, right_targets) criterion_improve = (left_criteria + right_criteria) - self.criterion_value if best_criterion == None or criterion_improve < best_criterion: self.split_feature_idx = feature_idx self.split_feature_value = split_point best_criterion = criterion_improve self.split_criteria_improve = best_criterion return best_criterion def split_node(self, args: argparse.Namespace): left_data_selector = self.data[:, self.split_feature_idx] <= self.split_feature_value right_data_selector = self.data[:, self.split_feature_idx] > self.split_feature_value left_data = self.data[ left_data_selector, : ] right_data = self.data[ right_data_selector, :] self.leaf = False self.left = Node(left_data, self.target[left_data_selector], self.depth + 1, args) self.right = Node(right_data, self.target[right_data_selector], self.depth + 1, args) return (self.left, self.right) def predict(self, data_to_predict, args) -> int: if self.leaf: if args.criterion == 'se': return np.mean(self.target) return self.most_frequent_class if data_to_predict[ self.split_feature_idx ] <= self.split_feature_value: return self.left.predict(data_to_predict, args) else: return self.right.predict(data_to_predict, args) @property def most_frequent_class(self) -> int: if self._most_frequent_class == None: classes = list(set(self.target)) sums = [] for class_t in classes: number_class_t = len(self.target[ self.target == class_t ]) sums.append(number_class_t) self._most_frequent_class = classes[ np.argmax( sums ) ] return self._most_frequent_class @property def instances(self): return len(self.data) def count_criteria(args: argparse.Namespace, targets_vector: np.ndarray) -> float: if args.criterion == 'se': if len(targets_vector) == 0: return 0 return ((targets_vector - np.mean(targets_vector))**2).sum() _, counts = np.unique(targets_vector, return_counts=True) propability = counts / len(targets_vector) if args.criterion == 'entropy': return propability @ np.log(propability) * len(targets_vector) * -1 else: raise Exception(f'Unknown criteria {args.criterion}') def main(args: argparse.Namespace): # Načtení datasetu. data, target = getattr(sklearn.datasets, "load_{}".format(args.dataset))(return_X_y=True) # TODO: Rozdělte dataset na trénovací a testovací část, funkci z knihovny sklearn # předejte argumenty `test_size=args.test_size, random_state=args.seed`. train_data, test_data, train_target, test_target = train_test_split( data, target, test_size=args.test_size, random_state=args.seed) # TODO: Implementace binárního rozhodovacího stromu # # - Budete implementovat strom pro klasifikaci i regresi podle typu # datasetu. `wine` dataset je pro klasifikaci a `diabetes` pro regresi. # # - Pro klasifikaci: Pro každý list predikujte třídu, která je nejčastější # (a pokud je těchto tříd několik, vyberte tu s nejmenším číslem). # - Pro regresi: Pro každý list predikujte průměr target (cílových) hodnot. # # - Pro klasifikaci použijte jako kritérium entropy kritérium a pro regresi # použijte jako kritérium SE (squared error). # # - Pro rozdělení vrcholu vyzkoušejte postupně všechny featury. Pro každou # featuru vyzkoušejte všechna možná místa na rozdělení seřazená vzestupně # a rozdělte vrchol na místě, který nejvíce sníží kritérium. # Pokud existuje takových míst několik, vyberte první z nich. # Každé možné místo na rozdělení je průměrem dvou nejbližších unikátních hodnot # featur z dat odpovídající danému vrcholu. # Např. pro čtyři data s hodnotami featur 1, 7, 3, 3 jsou místa na rozdělení 2 a 5. # # - Rozdělení vrcholu povolte pouze pokud: # - pokud hloubka vrcholu je menší než `args.max_depth` # (hloubka kořenu je nula). Pokud `args.max_depth` je `None`, # neomezujte hloubku stromu. # - pokud je méně než `args.max_leaves` listů ve stromě # (list je vrchol stromu bez synů). Pokud `args.max_leaves` je `None`, # neomezujte počet listů. # - je alespoň `args.min_to_split` dat v daném vrcholu. # - hodnota kritéria není nulová. # # - Pokud `args.max_leaves` není `None`: opakovaně rozdělujte listové vrcholy, # které splňují podmínky na rozdělení vrcholu a celková hodnota kritéria # ($c_{levý syn} + c_{pravý syn} - c_{rodič}$) nejvíce klesne. # Pokud je několik takových vrcholů, vyberte ten, který byl vytvořen dříve # (levý syn je považován, že je vytvořený dříve než pravý syn). # # Pokud `args.max_leaves` je `None`, použijte rekurzivní přístup (nejdříve rozdělujte # levé syny, poté pravé syny) a rozdělte každý vrchol, který splňuje podmínky. # Tento rekurzivní přístup není důležitý v této úloze, ale až v nasledující # úloze - Random Forest. # # - Nakonec vypočítejte trénovací a testovací chybu # - RMSE, root mean squared error, pro regresi # - accuracy pro klasifikaci if args.dataset == 'wine': args.criterion = 'entropy' elif args.dataset == 'diabetes': args.criterion = 'se' tree = DecisionTree(args) tree.fit(train_data, train_target) train_prediction = tree.predict(train_data) test_prediction = tree.predict(test_data) if args.dataset == 'diabetes': train_rmse = mean_squared_error(train_target, train_prediction, squared=False) test_rmse = mean_squared_error(test_target, test_prediction, squared=False) print("Train RMSE: {:.5f}".format(train_rmse)) print("Test RMSE: {:.5f}".format(test_rmse)) else: train_accuracy = accuracy_score(train_target, train_prediction) test_accuracy = accuracy_score(test_target, test_prediction) print("Train accuracy: {:.1f}%".format(100 * train_accuracy)) print("Test accuracy: {:.1f}%".format(100 * test_accuracy)) if __name__ == "__main__": args = parser.parse_args() main(args)