import numpy as np class DTLearner: LEAF = -1 NA = -1 def __init__(self, leaf_size=1, verbose=False): self.leaf_size = leaf_size self.verbose = verbose def author(self): return 'felixm' # replace tb34 with your Georgia Tech username def create_node(self, factor, split_value, left, right): return np.array([(factor, split_value, left, right), ], dtype='|i4, f4, i4, i4') def query_point(self, point): node_index = 0 while self.rel_tree[node_index][0] != self.LEAF: node = self.rel_tree[node_index] split_factor = node[0] split_value = node[1] if point[split_factor] <= split_value: # Recurse into left sub-tree. node_index += node[2] else: node_index += node[3] v = self.rel_tree[node_index][1] return v def query(self, points): """ @summary: Estimate a set of test points given the model we built. @param points: should be a numpy array with each row corresponding to a specific query. @returns the estimated values according to the saved model. """ def query_point(p): return self.query_point(p) r = np.apply_along_axis(query_point, 1, points) return r def build_tree(self, xs, y): """ @summary: Build a decision tree from the training data. @param dataX: X values of data to add @param dataY: the Y training values """ assert(xs.shape[0] == y.shape[0]) assert(xs.shape[0] > 0) # If this is 0 something went wrong. if xs.shape[0] <= self.leaf_size: value = np.mean(y) return self.create_node(self.LEAF, value, self.NA, self.NA) if np.all(y[0] == y): return self.create_node(self.LEAF, y[0], self.NA, self.NA) i, split_value = self.get_i_and_split_value(xs, y) select_l = xs[:, i] <= split_value select_r = xs[:, i] > split_value lt = self.build_tree(xs[select_l], y[select_l]) rt = self.build_tree(xs[select_r], y[select_r]) root = self.create_node(i, split_value, 1, lt.shape[0] + 1) root = np.concatenate([root, lt, rt]) return root def addEvidence(self, data_x, data_y): """ @summary: Add training data to learner @param dataX: X values of data to add @param dataY: the Y training values """ self.rel_tree = self.build_tree(data_x, data_y) def get_correlations(self, xs, y): """ Return a list of sorted 2-tuples where the first element is the correlation and the second element is the index. Sorted by highest correlation first. """ # a = np.argmax([abs(np.corrcoef(xs[:,i], y)[0, 1]) # for i in range(xs.shape[1])]) correlations = [] for i in range(xs.shape[1]): c = abs(np.corrcoef(xs[:, i], y=y)[0, 1]) correlations.append((c, i)) correlations.sort(reverse=True) return correlations def get_i_and_split_value(self, xs, y): # If all elements are true we would get one sub-tree with zero # elements, but we need at least one element in both trees. We avoid # zero-trees in two steps. First we take the average between the median # value and a smaller value an use that as the new split value. If that # doesn't work (when all values are the same) we choose the X with the # next smaller correlation. We assert that not all values are # smaller/equal to the split value at the end. for _, i in self.get_correlations(xs, y): split_value = np.median(xs[:, i]) select = xs[:, i] <= split_value if select.all(): for value in xs[:, i]: if value < split_value: split_value = (value + split_value) / 2.0 select = xs[:, i] <= split_value if not select.all(): break assert(not select.all()) return i, split_value