#!/usr/bin/env python3 import argparse from typing import List import numpy as np import sklearn.datasets from sklearn.metrics import accuracy_score from sklearn.model_selection import train_test_split parser = argparse.ArgumentParser() parser.add_argument("--bagging", default=False, action="store_true", help="Použít bagging") parser.add_argument("--dataset", default="digits", type=str, help="Použitý dataset") parser.add_argument("--feature_subsampling", default=1, type=float, help="Pravděpodobnost vybrání jednolivých featur") parser.add_argument("--max_depth", default=None, type=int, help="Maximální hloubka rozhodovacího stromu") parser.add_argument("--seed", default=42, type=int, help="Náhodný seed") parser.add_argument("--test_size", default=0.25, type=float, help="Velikost testovací množiny") parser.add_argument("--trees", default=1, type=int, help="Počet stromů použitých v random forestu") class DecisionTree: root: 'Node' = None def __init__( self, args: argparse.Namespace, subsample_features ): self.args = args self.max_depth = args.max_depth self.subsample_features = subsample_features def fit(self, X, y): self.root = Node(X, y, depth=0, args=self.args) self.split_node(self.root) def split_node(self, node: 'Node'): if self.max_depth is not None and node.depth == self.max_depth: return node.find_best_split(self.args, self.subsample_features) if node.split_criteria_improve == 0: return left, right = node.split_node(self.args) self.split_node(left) self.split_node(right) def predict(self, X): if self.root is None: raise Exception('You must call fit before predict') predictions = [] for data in X: predictions.append(self.root.predict(data)) return predictions class Node: depth: int data: np.ndarray target: np.ndarray leaf: bool = True left: 'Node' = None right: 'Node' = None criterion_value: float split_feature_idx: int split_feature_value: float split_criteria_improve: float _most_frequent_class: int = None def __init__(self, data, target, depth, args: argparse.Namespace): self.data = data self.target = target self.criterion_value = count_criteria(args, target) self.depth = depth def find_best_split(self, args: argparse.Namespace, subsample_features): best_criterion = None selection = subsample_features( self.data.shape[1] ) for feature_idx, used in enumerate(selection): if not used: continue features = sorted( self.data[:, feature_idx] ) split_points = [] for i in range(1, len(features)): if features[i - 1] == features[i]: continue else: split_points.append( (features[i] + features[i - 1]) / 2) if len(split_points) == 0: # v bodech pro rozdělení budeme mít alespoň jeden bod split_points = [ features[0] ] for split_point in split_points: left_targets = self.target[ self.data[:, feature_idx] <= split_point] right_targets = self.target[ self.data[:, feature_idx] > split_point] left_criteria = count_criteria(args, left_targets) right_criteria = count_criteria(args, right_targets) criterion_improve = (left_criteria + right_criteria) - self.criterion_value if best_criterion == None or criterion_improve < best_criterion: self.split_feature_idx = feature_idx self.split_feature_value = split_point best_criterion = criterion_improve self.split_criteria_improve = best_criterion return best_criterion def split_node(self, args: argparse.Namespace): left_data_selector = self.data[:, self.split_feature_idx] <= self.split_feature_value right_data_selector = self.data[:, self.split_feature_idx] > self.split_feature_value left_data = self.data[ left_data_selector, : ] right_data = self.data[ right_data_selector, :] self.leaf = False self.left = Node(left_data, self.target[left_data_selector], self.depth + 1, args) self.right = Node(right_data, self.target[right_data_selector], self.depth + 1, args) return (self.left, self.right) def predict(self, data_to_predict) -> int: if self.leaf: return self.most_frequent_class if data_to_predict[ self.split_feature_idx ] <= self.split_feature_value: return self.left.predict(data_to_predict) else: return self.right.predict(data_to_predict) @property def most_frequent_class(self) -> int: if self._most_frequent_class == None: classes = list(set(self.target)) sums = [] for class_t in classes: number_class_t = len(self.target[ self.target == class_t ]) sums.append(number_class_t) self._most_frequent_class = classes[ np.argmax( sums ) ] return self._most_frequent_class @property def instances(self): return len(self.data) def count_criteria(args: argparse.Namespace, targets_vector: np.ndarray) -> float: _, counts = np.unique(targets_vector, return_counts=True) propability = counts / len(targets_vector) if args.criterion == 'entropy': return propability @ np.log(propability) * len(targets_vector) * -1 else: raise Exception(f'Unknown criteria {args.criterion}') def main(args: argparse.Namespace): # Načtení datasetu. data, target = getattr(sklearn.datasets, "load_{}".format(args.dataset))(return_X_y=True) # TODO: Rozdělte dataset na trénovací a testovací část, funkci z knihovny sklearn # předejte argumenty `test_size=args.test_size, random_state=args.seed`. train_data, test_data, train_target, test_target = train_test_split( data, target, test_size=args.test_size, random_state=args.seed) # Vytvoření náhodných generátorů. generator_feature_subsampling = np.random.RandomState(args.seed) def subsample_features(number_of_features: int) -> np.ndarray: '''Vrátí masku featur, které mají být použity při hledání nejlepšího rozdělení.''' return generator_feature_subsampling.uniform(size=number_of_features) <= args.feature_subsampling generator_bootstrapping = np.random.RandomState(args.seed) def bootstrap_dataset(train_data: np.ndarray) -> np.ndarray: '''Vrátí indexy dat, které mají být použity pro trénování rozhodovacího stromu.''' return generator_bootstrapping.choice(len(train_data), size=len(train_data), replace=True) # TODO: Implementace Random Forest # # - použijte upravený rozhodovací strom z předchozího úkolu: # - použijte jen strom pro klasifikaci # - jednotlivé vrcholy (listy) stromů je povoleno rozdělit pouze pokud: # - jejich hloubka je menší než `args.max_depth` # - hodnota kritéria není nulová # # Při rozdělování vrcholů postupujte rekurzivně, rozdělujte všechny vrcholy # v levém podstromu před vrcholy v pravém podstromu. # # Dále implementujte: # - feature subsampling: při hledání nejlepšího místa na rozdělení vybíráte # pouze z podmnožiny featur. Při rozdělování vrcholu # (tj. pokud jsou splněny podmínky pro rozdělení [hloubka, kritérium != 0]), # si vygenerujete masku featur pomocí # subsample_features(number_of_features) # která vrací pro každou featuru boolean hodnotu, kde `True` znamená, že featura # má být použita při hledání nejlepšího místa na rozdělení, # a `False` že nemá být použita. # Pokud `feature_subsampling == 1`, použijí se všechny featury. # # - natrénujte náhodný les skládající se z `args.trees` rozhodovacích stromů # # - pokud je `args.bagging` zapnuto, před trénováním každého rozhodovacího stromu # vytvořte bootstrap dat z trénovacích dat voláním # dataset_indices = bootstrap_dataset(train_data) # a pokud `args.bagging` vypnuto, použijte původní trénovací data. # # Pro výslednou predikci použijte hlasovací techniku pro nalezení nejčastější třídy # pro daný vstup, při výskytu remízy vyberte tu s nejmenším číslem třídy. # # Nakonec vypočítejte trénovací a testovací přesnost. class RandomForest: trees: List[DecisionTree] = None def fit(self, X, y): nonlocal args self.trees = [] for _ in range(args.trees): data = X target = y if args.bagging: indices = bootstrap_dataset(X) data = X[ indices ] target = y[ indices ] newTree = DecisionTree(args, subsample_features) newTree.fit(data, target) self.trees.append( newTree ) def predict(self, X): predictions = [] for tree in self.trees: predictions.append(tree.predict( X )) predictions = np.array(predictions) voted_predictions = [] for i in range(len(X)): values, counts = np.unique(predictions[:, i], return_counts=True) voted_predictions.append(values[ np.argmax(counts) ]) return voted_predictions args.criterion = 'entropy' forest = RandomForest() forest.fit(train_data, train_target) train_predictions = forest.predict(train_data) test_predictions = forest.predict(test_data) train_accuracy = accuracy_score(train_target, train_predictions) test_accuracy = accuracy_score(test_target, test_predictions) print("Train accuracy: {:.1f}%".format(100 * train_accuracy)) print("Test accuracy: {:.1f}%".format(100 * test_accuracy)) if __name__ == "__main__": args = parser.parse_args() main(args)