一、k近邻算法的定义

二、KD树结点信息封装

kdtree_node.py

class KDTreeNode:"""KD树结点信息封装"""def __init__(self, instance_node=None, instance_label=None, instance_idx=None, split_feature=None, left_child=None, right_child=None, kdt_depth=None):"""用于封装kd树的结点信息结构:param instance_node: 实例点,一个样本:param instance_label: 实例点对应的类别标记:param instance_idx: 该实例点对应的样本索引,用于kd树的可视化:param split_feature: 划分的特征属性,x^(i):param left_child: 左子树,小于划分点的:param right_child: 右子树,大于切分点的:param kdt_depth: kd树的深度"""self.instance_node = instance_nodeself.instance_label = instance_labelself.instance_idx = instance_idxself.split_feature = split_featureself.left_child = left_childself.right_child = right_childself.kdt_depth = kdt_depth

三、距离度量的工具类

distUtils.py

import numpy as npclass DistanceUtils:"""距离度量的工具类,此处仅实现闵可夫斯基距离"""def __init__(self, p=2):self.p = p# 默认欧式距离,p=1曼哈顿距离,p=np。inf是切比雪夫距离def distance_func(self, xi, xj):"""特征空间中两个样本示例的距离计算:param xi: k维空间某个样本示例:param xj: k维空间某个样本示例:return:"""xi, xj = np.asarray(xi), np.asarray(xj)if self.p == 1 or self.p == 2:return (((np.abs(xi - xj)) ** self.p).sum()) ** (1 / self.p)elif self.p == np.inf:return np.max(np.abs(xi - xj))elif self.p == "cos":# 余弦距离或余弦相似度return xi.dot(xj) / np.sqrt((xi ** 2).sum()) / np.sqrt((xj ** 2).sum())else:raise ValueError("目前仅支持p=1、p=2、p=np.inf或余弦距离四种距离...")

四、K近邻算法的实现

knn_kdtree.py

import numpy as npfrom kdtree_node import KDTreeNodefrom distUtils import DistanceUtilsimport heapq# 堆结构,实现堆排序from collections import Counter# 集合中的计数功能import networkx as nx# 网络图,可视化import matplotlib.pyplot as pltclass KNearestNeighborKDTree:"""K近邻算法的实现,基于KD树结构1. fit: 特征向量空间的划分,即构建KD树(建立KNN算法模型)2. predict: 预测,近邻搜索3. 可视化kd树"""def __init__(self, k: int=5, p=2, view_kdt=False):"""KNN算法的初始化必要参数:param k: 近邻数:param p: 距离度量标准:param view_kdt: 是否可视化KD树"""self.k = k# 预测,近邻搜索时,使用的参数,表示近邻树self.p = p# 预测,近邻搜索时,使用的参数,表示样本的近邻度self.view_kdt = view_kdtself.dis_utils = DistanceUtils(self.p)# 距离度量的类对象self.kdt_root: KDTreeNode() = None# KD树的根节点self.k_dimension = 0# 特征空间维度,即样本的特征属性数self.k_neighbors = []# 用于记录某个测试样本的近邻实例点def fit(self, x_train, y_train):"""递归创建KD树,即对特征向量空间进行划分,递归调用进行创建:param x_train: 训练样本集:param y_train: 训练样本目标集合:return:"""if self.k < 1:raise ValueError("k must be greater than 0 and be int.")x_train, y_train = np.asarray(x_train), np.asarray(y_train)self.k_dimension = x_train.shape[1]# 特征维度idx_array = np.arange(x_train.shape[0])# 训练样本索引编号self.kdt_root = self._build_kd_tree(x_train, y_train, idx_array, 0)if self.view_kdt:self.draw_kd_tree()# 可视化kd树def _build_kd_tree(self, x_train, y_train, idx_array, kdt_depth):"""递归创建KD树,KD树是二叉树,严格区分左子树右子树,表示对k维空间的一个划分:param x_train: 递归划分的训练样本子集:param y_train: 递归划分的训练样本目标子集:param idx_array: 递归划分的样本索引:param depth: kd树的深度:return:"""if x_train.shape[0] == 0:# 递归出口returnsplit_dimension = kdt_depth % self.k_dimension# 数据的划分维度x^(i)sorted(x_train, key=lambda x: x[split_dimension])# 按某个划分维度排序median_idx = x_train.shape[0] // 2# 中位数所对应的数据的索引median_node = x_train[median_idx]# 切分点作为当前子树的根节点# 划分左右子树区域left_instances, right_instances = x_train[:median_idx], x_train[median_idx + 1:]left_labels, right_labels = y_train[:median_idx], y_train[median_idx + 1:]left_idx, right_idx = idx_array[:median_idx], idx_array[median_idx + 1:]# 递归调用left_child = self._build_kd_tree(left_instances, left_labels, left_idx, kdt_depth + 1)right_child = self._build_kd_tree(right_instances, right_labels, right_idx, kdt_depth + 1)kdt_new_node = KDTreeNode(median_node, y_train[median_idx], idx_array[median_idx],split_dimension, left_child, right_child, kdt_depth)return kdt_new_nodedef _search_kd_tree(self, kd_tree: KDTreeNode, x_test):"""kd树的递归搜索算法,后序遍历,搜索k个最近邻实例点数据结构:堆排序,搜索过程中,维护一个小根堆:param kd_tree: 已构建的kd树:param x_test: 单个测试样本:return:"""if kd_tree is None:# 递归出口return# 计算测试样本与当前kd子树的根结点的距离(相似度)distance = self.dis_utils.distance_func(kd_tree.instance_node, x_test)# 1. 如果不够k个样本,继续递归# 2. 如果搜索了k个样本,但是k个样本未必是最近邻的。# 当计算的当前实例点的距离小于k个样本的最大距离,则递归,大于最大距离,没必要递归if (len(self.k_neighbors) < self.k) or (distance < self.k_neighbors[-1]["distance"]):self._search_kd_tree(kd_tree.left_child, x_test)# 递归左子树self._search_kd_tree(kd_tree.right_child, x_test)# 递归右子树# 在整个搜索路径上的kd树的结点,存储在self.k_neighbors中,包含三个值# 当前实例点,类别,距离self.k_neighbors.append({"node": kd_tree.instance_node,# 结点"label": kd_tree.instance_label,# 当前实例的类别"distance": distance# 当前实例点与测试样本的距离})# 按照距离进行排序,选择最小的k个最近邻样本实例,更新最近邻距离# 小根堆,k_neighbors中第一个结点是距离测试样本最近的self.k_neighbors = heapq.nsmallest(self.k, self.k_neighbors, key=lambda d: d["distance"])def predict(self, x_test):"""KD树的近邻搜索,即测试样本的预测:param x_test: 测试样本,ndarray: (n * k):return:"""x_test = np.asarray(x_test)if self.kdt_root is None:raise ValueError("KDTree is None, Please fit KDTree...")elif x_test.shape[1] != self.k_dimension:raise ValueError("Test Sample dimension unmatched KDTree's dimension.")else:y_test_hat = []# 用于存储测试样本的预测类别for i in range(x_test.shape[0]):self.k_neighbors = []# 调用递归搜索,则包含了k个最近邻的实例点self._search_kd_tree(self.kdt_root, x_test[i])# print(self.k_neighbors)y_test_labels = []# 取每个近邻样本的类别标签for k in range(self.k):y_test_labels.append(self.k_neighbors[k]["label"])# 按分类规则(多数表决法)# print(y_test_labels)counter = Counter(y_test_labels)idx = int(np.argmax(list(counter.values())))y_test_hat.append(list(counter.keys())[idx])return np.asarray(y_test_hat)def _create_kd_tree(self, graph, kdt_node: KDTreeNode, pos=None, x=0, y=0, layer=1):"""递归可视化KD树,递归构造树的结点、边。:param graph: 有向图对象,递归中逐步增加结点和左子树右子树:param kdt_node: 递归创建KD树的结点:param pos: 可视化中树结点位置,初始化(0, 0)绘制根结点:param x: 对应pos中的横坐标,随着递归,更新:param y: 对应pos中的纵坐标,随着递归,更新:param layer: kd树的层次:return:"""if pos is None:pos = {}pos[str(kdt_node.instance_idx)] = (x, y)if kdt_node.left_child:# 父结点指向左子树graph.add_edge(str(kdt_node.instance_idx), str(kdt_node.left_child.instance_idx))l_x, l_y = x - 1 / 2 ** layer, y - 1# 下一个树结点位置的计算l_layer = layer + 1# 树的层次 + 1self._create_kd_tree(graph, kdt_node.left_child, x=l_x, y=l_y, pos=pos, layer=l_layer)# 递归if kdt_node.right_child:# 父结点指向右子树graph.add_edge(str(kdt_node.instance_idx), str(kdt_node.right_child.instance_idx))r_x, r_y = x + 1 / 2 ** layer, y - 1r_layer = layer + 1self._create_kd_tree(graph, kdt_node.right_child, x=r_x, y=r_y, pos=pos, layer=r_layer)# 递归return graph, posdef draw_kd_tree(self):"""可视化kd树:return:"""directed_graph = nx.DiGraph()# 初始化一个有向图,树graph, pos = self._create_kd_tree(directed_graph, self.kdt_root)fig, ax = plt.subplots(figsize=(20, 10))# 比例可以根据树的深度适当调节nx.draw_networkx(graph, pos, ax=ax, node_size=500, font_color="w", font_size=15, arrowsize=20)plt.tight_layout()plt.show()

五、K近邻算法的测试

test_knn_1.py

import numpy as npfrom sklearn.datasets import load_iris, load_breast_cancerfrom knn_kdtree import KNearestNeighborKDTreefrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import classification_report, accuracy_scoreimport matplotlib.pyplot as pltfrom sklearn.model_selection import StratifiedKFoldfrom sklearn.preprocessing import StandardScaleriris = load_iris()X, y = iris.data, iris.target# bc_data = load_breast_cancer()# X, y = bc_data.data, bc_data.targetX = StandardScaler().fit_transform(X)X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0, stratify=y)k_neighbors = np.arange(3, 21)# acc = []# for k in k_neighbors:# knn = KNearestNeighborKDTree(k=k)# knn.fit(X_train, y_train)# y_test_hat = knn.predict(X_test)# # print(classification_report(y_test, y_test_hat))# acc.append(accuracy_score(y_test, y_test_hat))accuracy_scores = []# 存储每个alpha阈值下的交叉验证均分for k in k_neighbors:scores = []k_fold = StratifiedKFold(n_splits=10).split(X, y)for train_idx, test_idx in k_fold:# knn = KNearestNeighborKDTree(k=k, p="cos")knn = KNearestNeighborKDTree(k=k)knn.fit(X[train_idx], y[train_idx])y_test_pred = knn.predict(X[test_idx])scores.append(accuracy_score(y[test_idx], y_test_pred))del knnprint("k = %d:" % k, np.mean(scores))accuracy_scores.append(np.mean(scores))plt.figure(figsize=(7, 5))plt.plot(k_neighbors, accuracy_scores, "ko-", lw=1)plt.grid(ls=":")plt.xlabel("K Neighbors", fontdict={"fontsize": 12})plt.ylabel("Accuracy Scores", fontdict={"fontsize": 12})plt.title("KNN(KDTree) Testing Scores under different K Neighbors", fontdict={"fontsize": 14})plt.show()# knn = KNearestNeighborKDTree(k=3)# knn.fit(X_train, y_train)# knn.draw_kd_tree()