文章目录

  • 前言
  • 环境准备
  • 完整代码
  • 配置文件(conf.json)
  • 获取数据集(datasets.py)
  • 获取PyTorch中自带深度学习网络预训练模型(models.py)
  • 客户端(client.py)
  • 服务端(server.py)
  • main.py
  • 运行
  • 知识点补充
    • argparse基本用法
    • tensor.copy_()
    • [Python dict() 函数](https://www.runoob.com/python/python-func-dict.html)
    • 什么是状态字典:state_dict?
    • view_as()函数

前言

本文通过阅读《联邦学习实战—杨强》中第3章“用Python实现横向联邦图像分类”入门横向联邦。

核心思想: 使用Python在本地模拟多个客户端,然后由服务器统一管理进行联邦学习,客户端在本地用自己的数据对模型进行训练,服务器将训练结果聚合更新模型并分发给客户端,客户端继续训练。

用横向联邦学习来实现对cifar10图像数据集的分类,模型使用的是ResNet-18。方式为在本地以循环的方式来模拟。

多多调试来理解代码

环境准备

本实验基于Python实现,使用机器学习库PyTorch。

  • Python、PyTorch
  • 编译器使用PyCharm
  • 数据集:cifar10
  • 模型:ResNet-18

基本流程:

  1. 服务器按配置文件生成初始化模型,客户端按照自己的ID将数据集横向不重叠切割
  2. 服务器将全局模型发送给客户端
  3. 客户端接收全局模型(来自服务器)并通过本地多次迭代计算本地参数差值返回给服务器
  4. 服务器聚合各个客户端差值更新模型,再评估当前模型性能
  5. 如果性能未达标,则重复2过程,否则结束

CIFAR10数据集:

每个批处理文件都包含一个包含以下元素的字典,加起来有6万张图片:

  • 数据 —— 60000×3072(32x32x3)的uint8 numpy数组。数组的每一行存储一个 32×32 彩色图像。前 1024 个条目包含红色通道值,接下来的 1024 个条目包含绿色通道值,最后 1024 个条目包含蓝色通道值。图像以行优先顺序存储,因此数组的前 32 个条目是图像第一行的红色通道值。
  • labels —— 0-9 范围内的 60000 个数字的列表。索引i处的数字表示数组数据中第i个图像的标签。

完整代码

链接: https://pan.baidu.com/s/1PPCd_YB6w537OYif9TQZPg” />

获取数据集(datasets.py)

import torch from torchvision import datasets, transforms# 获取数据集def get_dataset(dir, name):if name=='mnist':# root(这里是dir): 数据路径# train参数表示是否是训练集(True)或者测试集(False)# download=true表示从互联网上下载数据集并把数据集放在root路径中# transform:图像类型的转换# 将图片转化为张量对象。通过ToTensor实例将图像数据从PIL类型变换成32位浮点数格式,并除以255使得所有像素的数值均在 0 到 1 之间train_dataset = datasets.MNIST(dir, train=True, download=True, transform=transforms.ToTensor())eval_dataset = datasets.MNIST(dir, train=False, transform=transforms.ToTensor())elif name=='cifar':# transforms.Compose(图片预处理步骤)是将多个transform组合起来使用(由transform构成的列表)transform_train = transforms.Compose([transforms.RandomCrop(32, padding=4), # 从图片中随机裁剪出尺寸为size的图片。size: 所需裁剪出的图片尺寸。padding: 设置填充大小,当为a时,上下左右均填充a个像素。transforms.RandomHorizontalFlip(), # 水平翻转(左右翻转图像通常不会改变对象的类别。这是最早且最广泛使用的图像增广方法)transforms.ToTensor(), # 将图片转化为张量对象。通过ToTensor实例将图像数据从PIL类型变换成32位浮点数格式,并除以255使得所有像素的数值均在 0 到 1 之间# transforms.Normalize: 标准化图像的每个通道。第一个参数: 均值; 第二个参数: 方差# 对于RGB(红、绿、蓝)颜色通道,我们分别标准化每个通道。具体而言,该通道的每个值减去该通道的平均值,然后将结果除以该通道的标准差。transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),])transform_test = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),])train_dataset = datasets.CIFAR10(dir, train=True, download=True,transform=transform_train)eval_dataset = datasets.CIFAR10(dir, train=False, transform=transform_test)return train_dataset, eval_dataset

获取PyTorch中自带深度学习网络预训练模型(models.py)

import torchfrom torchvision import modelsdef get_model(name="vgg16", pretrained=True):# 卷积神经网络的训练是耗时的,很多场合不可能每次都从随机初始化参数开始训练网络。# pytorch中自带几种常用的深度学习网络预训练模型,如VGG、ResNet等。往往为了加快学习的进度,在训练的初期我们直接加载pre-train模型中预先训练好的参数if name == "resnet18":# 使用在ImageNet数据集上预训练的ResNet-18作为源模型。指定pretrained=True自动下载预训练的模型参数model = models.resnet18(pretrained=pretrained) elif name == "resnet50":# 使用在ImageNet数据集上预训练的ResNet-50作为源模型。指定pretrained=True自动下载预训练的模型参数model = models.resnet50(pretrained=pretrained)elif name == "densenet121":model = models.densenet121(pretrained=pretrained)elif name == "alexnet":model = models.alexnet(pretrained=pretrained)elif name == "vgg16":model = models.vgg16(pretrained=pretrained)elif name == "vgg19":model = models.vgg19(pretrained=pretrained)elif name == "inception_v3":model = models.inception_v3(pretrained=pretrained)elif name == "googlenet":model = models.googlenet(pretrained=pretrained)if torch.cuda.is_available():return model.cuda()else:return model 

客户端(client.py)

横向联邦学习的客户端的主要功能是接收服务端的下发指令和全局模型(参数),并利用本地数据进行局部模型训练

# 在项目文件夹下创建client.py文件,客户端的主要功能是接受服务器传来的全局模型,并利用本地数据对模型进行训练后返回"差值",包括构造函数、本地训练函数。import models, torch, copy# 客户端类class Client(object): # clients.append(Client(conf, server.global_model, train_datasets, c))# 构造函数def __init__(self, conf, model, train_dataset, id = -1):# 读取配置文件self.conf = conf# 根据配置文件获取客户端本地模型(一般由服务器传输)self.local_model = models.get_model(self.conf["model_name"])# 客户端IDself.client_id = id# 客户端本地数据集self.train_dataset = train_dataset# 按ID对数据集集合进行拆分all_range = list(range(len(self.train_dataset))) # all_range列表: 0~49999data_len = int(len(self.train_dataset) / self.conf['no_models']) # 50000/总客户端数量=data_len:5000train_indices = all_range[id * data_len: (id + 1) * data_len] # 将切分数据集,每一份有5000张图片# 若id为0: [0:5000]; 若id为1: [5000, 10000]; 若id为2: [10000,15000],......; 若id为9: [45000, 50000]self.train_loader = torch.utils.data.DataLoader(self.train_dataset, batch_size=conf["batch_size"], sampler=torch.utils.data.sampler.SubsetRandomSampler(train_indices))# SubsetRandomSampler用来打乱数据# 模型本地训练函数def local_train(self, model):# 客户端获取服务器的模型,然后通过部分本地数据集进行训练for name, param in model.state_dict().items():# 用服务器下发的全局模型参数(weight、bias)覆盖本地模型参数(weight、bias)。# 本文服务端、客户端采用的都是ResNet-18模型,所以其实参数时一样的,但是按照代码完整性,还是覆盖一下。self.local_model.state_dict()[name].copy_(param.clone())#print(id(model))# 定义最优化函数器用户本地模型训练optimizer = torch.optim.SGD(self.local_model.parameters(), lr=self.conf['lr'],momentum=self.conf['momentum'])#print(id(self.local_model))# 本地训练模型# 设置开启模型训练,如果模型中有BN层(Batch Normalization)和Dropout,需要在训练时添加local_model.train(),在测试时添加local_model.eval()self.local_model.train()# 开始训练模型,本文每个客户端都是本地训练3次for e in range(self.conf["local_epochs"]):# 每次(共3次)本地训练训练5000张图片,每次下面的循环需要循环157次for batch_id, batch in enumerate(self.train_loader): # self.train_loader有5000张图片,分成了5000/32=157份data, target = batchif torch.cuda.is_available(): # 如果可以的话加载到gpudata = data.cuda()target = target.cuda()optimizer.zero_grad() # 在计算参数的梯度之前,通常需要清零梯度信息。清零模型参数的梯度值output = self.local_model(data) # # 训练预测loss = torch.nn.functional.cross_entropy(output, target) # 计算损失函数cross_entropy交叉熵误差loss.backward() # 利用自动求导函数loss.backward()求出模型中所有参数的梯度信息"loss对权重或偏置求导",这些梯度会自动保存在每个张量的grad成员变量中optimizer.step() # 根据梯度下降算法更新参数。w'=w-lr*grad; b'=b-lr*gradprint(f'客户端: {self.client_id} 的', "Epoch %d done." % e)# 创建差值字典(结构与模型参数同规格),用于记录差值diff = dict()# 此时self.local_model.state_dict()和model.state_dict()不一样了for name, data in self.local_model.state_dict().items(): # ResNet-18有122个层需要更新参数,所以这里执行122次循环(通过调试理解)# 计算训练后与训练前的差值diff[name] = (data - model.state_dict()[name])#print(diff[name])print("客户端 %d 本地训练结束" % self.client_id)#客户端返回差值return diff

服务端(server.py)

横向联邦学习的服务端的主要功能是将被选择的客户端上传的本地模型进行模型聚合(使用FedAvg算法)

import models, torch# 服务器类class Server(object):def __init__(self, conf, eval_dataset): # 定义构造函数self.conf = conf # 将配置信息拷贝到服务端中self.global_model = models.get_model(self.conf["model_name"]) # 按照配置中的模型信息获取模型,返回类型为model(nn.Module)# self.eval_loader有 313 份测试集batch(32张图片为一份batch)self.eval_loader = torch.utils.data.DataLoader(eval_dataset, batch_size=self.conf["batch_size"], shuffle=True)# 模型聚合函数# weight_accumulator 存储了每个客户端上传参数的变化值def model_aggregate(self, weight_accumulator):# 遍历服务器的全局模型for name, data in self.global_model.state_dict().items(): # ResNet-18有122个层需要更新参数,所以这里执行122次循环(通过调试理解)# 书第44页FedAvg算法公式的右半部分update_per_layer = weight_accumulator[name] * self.conf["lambda"]# 累加if data.type() != update_per_layer.type():# 因为update_per_layer的type是floatTensor,所以将其转换为模型的LongTensor(损失精度)# 这里就是整个书44页的FedAvg算法公式,结果为聚合之后的全局模型的weight、biasdata.add_(update_per_layer.to(torch.int64))else:data.add_(update_per_layer)# 模型评估函数def model_eval(self):# 开启模型评估模式self.global_model.eval()total_loss = 0.0correct = 0dataset_size = 0# 遍历评估数据集合for batch_id, batch in enumerate(self.eval_loader): # self.eval_loader有 313 份测试集batch(32张图片为一份batch)data, target = batch# 获取所有样本总量大小dataset_size += data.size()[0] # data.size()=torch.Size([32, 3, 32, 32])if torch.cuda.is_available(): # 如果可以的话存储到gpudata = data.cuda()target = target.cuda()# 加载到模型中训练output = self.global_model(data) # output的shape为(32, 1000)# 聚合所有损失 cross_entropy 交叉熵函数计算损失total_loss += torch.nn.functional.cross_entropy(output, target,  reduction='sum').item() # sum up batch loss# 获取最大的对数概率的索引值,即在所有预测结果中选择可能性最大的作为最终结果# 解读output.data.max(1): 首先output的shape是(32, 1000)。通过output的第二个维度找到每行最大值(共32个),返回是值和索引(通过调试理解)# 解读output.data.max(1)[1]: 提取出"索引"pred = output.data.max(1)[1]  # get the index of the max log-probability# 统计预测结果与真实标签的匹配个数correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()# 计算准确率acc = 100.0 * (float(correct) / float(dataset_size))# 计算总损失值total_l = total_loss / dataset_sizereturn acc, total_l

server.py涉及的调试截图:

  1. output.data.max(1)
    解读output.data.max(1): 首先output的shape是(32, 1000)。通过output的第二个维度找到每行最大值(共32个),返回是值和索引(通过调试理解)
  2. 解读output.data.max(1)[1]: 提取出”索引”

    验证一下:

main.py

import argparse, jsonimport datetimeimport osimport loggingimport torch, randomfrom server import *from client import *import models, datasetsif __name__ == '__main__':# 设置命令行程序parser = argparse.ArgumentParser(description='Federated Learning')parser.add_argument('-c', '--conf', dest='conf') # argparse默认的变量名是--或-后面的字符串,# 但是你也可以通过dest=xxx来设置参数的变量名,然后在代码中用args.xxx来获取参数的值。args = parser.parse_args()# 终端执行python main.py -c ./utils/conf.json。此时,args.conf = ./utils/conf.json# 这里我使用文件路径地址替换了"arg.conf",可以直接运行main.pywith open('./utils/conf.json', 'r') as f: # args.confconf = json.load(f)# conf = {"model_name" : "resnet18","no_models" : 10,"type" : "cifar",# "global_epochs" : 20,"local_epochs" : 3,"k" : 5,"batch_size" : 32,# "lr" : 0.001,"momentum" : 0.0001,"lambda" : 0.1}# eval_datasets是"字典",键为"数据"——一个 10000x3072(32*32*3=3072)的uint8 numpy数组,值为"labels" -- 0-9 范围内的 10000 个数字的列表。# conf中的 "type" : "cifar"train_datasets, eval_datasets = datasets.get_dataset("./data/", conf["type"])server = Server(conf, eval_datasets) # Server类实例化。self.eval_loader有 10000/32=313 份测试集batch(32张图片为一份batch)clients = [] # 定义客户端列表# clients列表里面存放10个客户端类的实例for c in range(conf["no_models"]): # c = 0~9。"c"是客户端的id号clients.append(Client(conf, server.global_model, train_datasets, c))print("\n\n")# 全局训练for e in range(conf["global_epochs"]): # e = 0~19print(f"当前是第 {e} 次大循环Global Epoch")# 每次训练从clients列表中随机抽取k个进行训练。candidates也是列表,里面有5个客户端实例candidates = random.sample(clients, conf["k"])print("selected clients are: ")for c in candidates:print('client_id: ', c.client_id)# 累加参数变化。把每一个大循环(global_epochs)的差值加起来weight_accumulator = {}# 初始化空模型参数weight_accumulatorfor name, params in server.global_model.state_dict().items():# 这里务必调试理解!!!# name分别为: 'conv1.weight'、'bn1.weight'、'bn1.bias'等等(调试看看);# params分别为对应各个name的参数值weight_accumulator[name] = torch.zeros_like(params) # 生成一个和参数矩阵大小相同的零矩阵# 遍历选中的客户端,每个客户端本地进行训练for c in candidates:diff = c.local_train(server.global_model)# 根据客户端返回的参数差值字典更新总体权重for name, params in server.global_model.state_dict().items(): # ResNet-18有122个层需要更新参数,所以这里执行122次循环(通过调试理解)weight_accumulator[name].add_(diff[name])# 模型参数聚合server.model_aggregate(weight_accumulator) # 执行完这行代码后,模型全局参数就更新了# 模型评估acc, loss = server.model_eval()print("Epoch %d, acc: %f, loss: %f\n" % (e, acc, loss))

main.py涉及的调试截图:

  1. 初始化candidates列表(从clients列表里随机选择5个)

  2. server.global_model.state_dict().items():

  3. 初始化字典weight_accumulator
    weight_accumulator[name] = torch.zeros_like(params) # 生成一个和参数矩阵大小相同的零矩阵

运行

在PyCharm里直接运行main.py便可

知识点补充

argparse基本用法

  1. 可以看出经过view_as()操作后,t2 Tensor转变为了与t1 相同的形状。(需要重新对t2赋值,这是因为不是进行的原地操作)