import numpy as npimport matplotlib.pyplot as pltclass LRClosedFormSol:def __init__(self, fit_intercept=True, normalize=True):""":param fit_intercept: 是否训练bias:param normalize: 是否标准化数据"""self.theta = None# 训练权重系数self.fit_intercept = fit_intercept# 线性模型的常数项。也即偏置bias,模型中的theta0self.normalize = normalize# 是否标准化数据if normalize:self.feature_mean, self.feature_std = None, None# 特征的均值,标准方差self.mse = np.infty# 训练样本的均方误差self.r2, self.r2_adj = 0.0, 0.0# 判定系数和修正判定系数self.n_samples, self.n_features = 0, 0# 样本量和特征数def fit(self, x_train, y_train):"""模型训练,根据是否标准化与是否拟合偏置项分类讨论:param x_train: 训练样本集:param y_train: 训练目标集:return:"""if self.normalize:self.feature_mean = np.mean(x_train, axis=0)# 按样本属性计算样本均值self.feature_std = np.std(x_train, axis=0) + 1e-8# 样本方差,为避免零除,添加噪声x_train = (x_train - self.feature_mean) / self.feature_std# 标准化if self.fit_intercept:x_train = np.c_[x_train, np.ones_like(y_train)]# 添加一列1,即偏置项样本# 训练模型self._fit_closed_form_solution(x_train, y_train)# 求闭式解def _fit_closed_form_solution(self, x_train, y_train):"""线性回归的闭式解,单独函数,以便后期扩充维护:param x_train: 训练样本集:param y_train: 训练目标集:return:"""# pinv伪逆,即(A^T * A)^(-1) * A^Tself.theta = np.linalg.pinv(x_train).dot(y_train)# 非正则化# xtx = np.dot(x_train.T, x_train) + 0.01 * np.eye(x_train.shape[1])# 按公式书写# self.theta = np.dot(np.linalg.inv(xtx), x_train.T).dot(y_train)def get_params(self):"""返回线性模型训练的系数:return:"""if self.fit_intercept:# 存在偏置项weight, bias = self.theta[:-1], self.theta[-1]else:weight, bias = self.theta, np.array([0])if self.normalize:# 标准化后的系数weight = weight / self.feature_std.reshape(-1)# 还原模型系数bias = bias - weight.T.dot(self.feature_mean.reshape(-1))return np.r_[weight.reshape(-1), bias.reshape(-1)]def predict(self, x_test):"""测试数据预测,x_test:待预测样本集,不包括偏置项1:param x_test::return:"""try:self.n_samples, self.n_features = x_test.shape[0], x_test.shape[1]except IndexError:self.n_samples, self.n_features = x_test.shape[0], 1# 测试样本数和特征数if self.normalize:x_test = (x_test - self.feature_mean) / self.feature_std# 测试数据标准化if self.fit_intercept:x_test = np.c_[x_test, np.ones(shape=x_test.shape[0])]# 存在偏置项,添加一列1return x_test.dot(self.theta)def cal_mse_r2(self, y_pred, y_test):"""计算均方误差,计算拟合优度的判定系数R方和修正判定系数:param y_pred: 模型预测目标真值:param y_test: 测试目标真值:return:"""self.mse = ((y_test - y_pred) ** 2).mean()# 均方误差# 计算测试样本的判定系数和修正判定系数self.r2 = 1 - ((y_test - y_pred) ** 2).sum() / ((y_test - y_test.mean()) ** 2).sum()self.r2_adj = 1 - (1 - self.r2) * (self.n_samples - 1) / (self.n_samples - self.n_features - 1)return self.mse, self.r2, self.r2_adjdef plt_predict(self, y_pred, y_test, is_show=True, is_sort=True):"""绘制预测值与真实值对比图:return:"""if self.mse is np.infty:self.cal_mse_r2(y_pred, y_test)if is_show:plt.figure(figsize=(7, 5))if is_sort:idx = np.argsort(y_test)plt.plot(y_pred[idx], "r:", lw=1.5, label="Predictive Val")plt.plot(y_test[idx], "k--", lw=1.5, label="Test True Val")else:plt.plot(y_pred, "r:", lw=1.5, label="Predictive Val")plt.plot(y_test, "k--", lw=1.5, label="Test True Val")plt.xlabel("Test sample observation serial number", fontdict={"fontsize": 12})plt.ylabel("Predicted sample value", fontdict={"fontsize": 12})plt.title("The predictive values of test samples \n MSE = %.5e, R2 = %.5f, R2_adj = %.5f"% (self.mse, self.r2, self.r2_adj), fontdict={"fontsize": 14})plt.legend(frameon=False)plt.grid(ls=":")if is_show:plt.show()from sklearn.datasets import fetch_california_housingfrom sklearn.model_selection import train_test_splitfrom sklearn.linear_model import LinearRegressionfrom sklearn.metrics import r2_score, mean_squared_errorhousing = fetch_california_housing()X, y = housing.data, housing.target# 获取样本数据和目标数据X_train, X_test, y_train, y_test = \train_test_split(X, y, test_size=0.3, random_state=1, shuffle=True)lgcfs_obj = LRClosedFormSol(normalize=True, fit_intercept=True)lgcfs_obj.fit(X_train, y_train)theta = lgcfs_obj.get_params()# 获得模型系数print("线性回归模型拟合系数如下:")for i, fn in enumerate(housing.feature_names):print(fn + ":", theta[i])print("Const:", theta[-1])# 模型预测,即针对测试样本进行预测y_pred = lgcfs_obj.predict(X_test)lgcfs_obj.plt_predict(y_pred, y_test, is_sort=True)# 采用sklearn库函数进行线性回归和预测lr = LinearRegression().fit(X_train, y_train)print("sklearn截距:", lr.intercept_)# 打印截距print("sklearn系数:", lr.coef_)# 打印模型系数y_test_predict = lr.predict(X_test)mse = mean_squared_error(y_test, y_test_predict)r2 = r2_score(y_test, y_test_predict)print("sklearn均方误差与判定系数为:", mse, r2)