本篇文章主要是PBFT共识的简单实现,其中有许多地方都做了简化。PBFT的原理已在上篇文章中描述过,如果对PBFT的原理不太清晰的的可以进行查看。文章地址:共识算法学习总结。

代码实现的主要功能有:通过客户端添加区块,使用libp2p的mdns进行节点发现,动态的添加节点。

客户端

在启动客户端时,首先根据端口号创建一个客户端,然后启动客户端。

var clientCmd = &cobra.Command{Use: "client",Short: "client manage",Run: func(cmd *cobra.Command, args []string) {// 获取客户端的端口port, err := cmd.Flags().GetInt("port")if err != nil {log.Println("get param error: ", err)}// 客户端传递的数据data, err := cmd.Flags().GetString("data")if err != nil{log.Println("get param error: ", err)}client := NewClient(port)client.Start(data)},}

创建客户端

创建的客户端为libp2p节点,并设置节点的私钥。这里的加密算法使用的是Ed25519算法,不但效率更高并且可以在别的节点获取当前节点的公钥。

// 创建客户端func NewClient(listenPort int) *Client {// 生成密钥对r := rand.ReaderprvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)if err != nil{log.Println(err)}pubKey := prvKey.GetPublic()sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", listenPort))// 创建libp2p节点h, err := libp2p.New(libp2p.ListenAddrs(sourceMultiAddr),libp2p.Identity(prvKey),)if err != nil { log.Println("创建的客户端节点失败:", err)}h.SetStreamHandler(protocol.ID(protocolID), handleStream)fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\n", "0.0.0.0", listenPort, h.ID().Pretty())keyPair := Keypair{privkey: prvKey,pubkey: pubKey,}// 创建客户端client := &Client{h,keyPair,[]KnownNode{},sync.Mutex{},make(map[string]*common.ReplyMsg),}fmt.Println(">>> 创建客户端成功...")return client}

启动客户端

客户端启动时,首先不断获取网络中的节点,然后发送request请求,并等待回应。
这里做了简化,在客户端启动时就直接发送request请求。

func (c *Client) Start (data string){fmt.Println(">>> 开始启动客户端...")ctx := context.Background()// 通过协程获取网络中的节点,使用libp2p的mdns节点发现go c.getAllKonwons(c.client)// 发送客户端请求c.sendRequest(ctx, data)// 处理响应go c.handleConnection()select {}}

客户端发送请求

客户端首先创建request消息,消息格式为。o: 请求的具体操作,t: 请求时客户端追加的时间戳,c:客户端标识。REQUEST: 包含消息内容m,以及消息摘要d(m)。客户端对请求进行签名。

客户端创建完request请求后就可以向主节点发送该请求。

func (c *Client) sendRequest(ctx context.Context, data string){fmt.Println(">>> 客户端准备request消息...")// 构建requestreq := common.Request{data,hex.EncodeToString(common.GenerateDigest(data)),}// 序列化pubKeymarshalPubkey, err := crypto.MarshalPublicKey(c.keypair.pubkey)sendClient := common.SendClient{c.client.ID().Pretty(),marshalPubkey,}// 构建request消息reqMsg := common.RequestMsg{"solve",int(time.Now().Unix()),sendClient,req,}// 对发送的消息进行签名sig, err := c.signMessage(reqMsg)if err != nil{fmt.Printf("%v\n", err)}// 组合并发送消息c.send(ctx, common.ComposeMsg(common.HRequest, reqMsg, sig), c.findPrimaryNode())fmt.Println(">>> 客户端发送消息完成...")}

数据发送

客户端发送数据时,首先连接到主节点,然后打开与主节点的stream。再打开数据发送的通道,最后序列化数据并发数据添加到发送数据的通道中。

func (c *Client) send(ctx context.Context, msg []byte, node KnownNode) {// 开始连接到主节点if err := c.client.Connect(ctx, node.h); err != nil{log.Println(">>> 连接到主节点失败")}// 打开streams, err := c.client.NewStream(context.Background(), node.h.ID, protocol.ID(protocolID))if err != nil {fmt.Println(">>> 打开stream失败", err)}fmt.Println(">>> 开始连接到: ", node.h)rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))// 准备发送数据的通道go sendData(rw)// 序列化数据data, err := json.Marshal(msg)if err != nil{fmt.Println("序列化数据错误", err)}sendDataChan <- dataclose(sendDataChan)}

服务端命令行

服务端启动启动时,首先创建一个server,然后启动server。

var serverCmd = &cobra.Command{Use: "server",Short: "server manage",Run: func(cmd *cobra.Command, args []string) {port, err := cmd.Flags().GetInt("port")if err != nil {log.Println("get param error: ", err)}// 创建serverserver := NewServer(port)// 开始serverserver.start()},}

创建新的节点

创建服务端与创建客户端类似。

func NewNode(port int) *Node {// 生成密钥对r := rand.ReaderprvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)if err != nil{log.Println(err)}pubKey := prvKey.GetPublic()sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", port))// 创建libp2p节点h, err := libp2p.New(libp2p.ListenAddrs(sourceMultiAddr),libp2p.Identity(prvKey),)if err != nil {log.Println(err)}h.SetStreamHandler(protocol.ID(protocolID), handleStream)fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\n", "0.0.0.0", port, h.ID().Pretty())keyPair := Keypair{privkey: prvKey,pubkey: pubKey,}// 创建nodereturn &Node{[]KnownNode{},ClientNode{},0,h,ViewID,make(chan []byte),keyPair,&MsgLog{make(map[string]map[string]bool),make(map[string]map[string]bool),make(map[string]map[string]bool),make(map[string]bool),},make(map[string]*common.RequestMsg),sync.Mutex{},}}

启动节点

服务端的启动仅仅开启一个消息处理协程。 通过消息处理协程,会把服务端接收到的消息分配到对应的逻辑中进行处理。

func (node *Node)Start(){// 处理消息go node.handleMsg()}func (node *Node) handleMsg() {fmt.Println(">>> 启动节点,等待接收消息...")for{//待改进 todorawData := <- receiveChan// 反序列得到的数据var data []byteerr := json.Unmarshal([]byte(rawData), &data)if err != nil {fmt.Println("反序列消息化失败:", err)return}// 分割消息,分别处理不同的消息header, payload, sign:= common.SplitMsg(data)switch header {case common.HRequest:node.handleRequest(payload, sign)case common.HPrePrepare:node.handlePrePrepare(payload, sign)case common.HPrepare:node.handlePrepare(payload, sign)case common.HCommit:node.handleCommit(payload, sign)default:fmt.Println("===============无法处理对应的消息============")}}}

request处理

节点接收到消息后,首先反序列化request消息,然后设置客户端。接着校验request消息的摘要及签名。通过验证后放入请求池,接着创建pre-prepare消息并进行签名。最组合消息进行发送。剩下的三个函数类似,这里就不再叙述。

func (node *Node) handleRequest(payload []byte, sig []byte){fmt.Println(">>> 主节点接收request消息...")var request common.RequestMsgvar prePrepareMsg common.PrePrepareMsg// 反序列化请求消息err := json.Unmarshal(payload, &request)if err != nil{log.Println("反序列化request错误: ", err)return}// 设置节点的客户端clientPubKey, err := crypto.UnmarshalPublicKey(request.Client.PubKey)if err != nil{fmt.Println(">>> 反序列化客户端公钥失败", err)}clientNode := ClientNode{request.Client.ID,clientPubKey,}node.clientNode = clientNode// 校验request的摘要vdig := common.VerifyDigest(request.CRequest.Message, request.CRequest.Digest)if vdig == false {fmt.Printf("验证摘要错误\n")return}// 校验request的签名_, err= common.VerifySignatrue(request, sig, clientPubKey)if err != nil{fmt.Printf("验证签名错误:%v\n", err)return}// 添加进请求池node.mutex.Lock()node.requestPool[request.CRequest.Digest] = &requestseqID := node.getSequenceID()node.mutex.Unlock()// 构建pre-Prepare消息prePrepareMsg = common.PrePrepareMsg{request,request.CRequest.Digest,ViewID,seqID,}// 消息签名msgSig, err:= node.signMessage(prePrepareMsg)if err != nil{fmt.Printf("%v\n", err)return}// 消息组合msg := common.ComposeMsg(common.HPrePrepare, prePrepareMsg, msgSig)// 日志处理node.mutex.Lock()if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil {node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)}node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = truenode.mutex.Unlock()// 序列化消息data, err := json.Marshal(msg)if err != nil{fmt.Println("序列化request消息出错", err)return}fmt.Println(">>> 主节点广播prePrepare消息...")// 广播消息node.broadcast(data)}

pre-prepare消息处理

func (node *Node) handlePrePrepare(payload []byte, sig []byte) {fmt.Println(">>> 副节点开始接收prePrepare消息...")// 反序列化prePrepare消息var prePrepareMsg common.PrePrepareMsgerr := json.Unmarshal(payload,&prePrepareMsg)if err != nil {fmt.Printf("error happened:%v", err)return}// 找到主节点的公钥pnodeId := node.findPrimaryNode()pubKey, err := pnodeId.h.ID.ExtractPublicKey()if err != nil {fmt.Println("获取主节点的公钥失败", err)return}// 校验消息签名_, err = common.VerifySignatrue(prePrepareMsg, sig, pubKey)if err != nil{fmt.Printf("验证主节点签名错误:%v\n", err)return}// 校验消息的摘要if prePrepareMsg.Digest != prePrepareMsg.Request.CRequest.Digest {fmt.Printf("校验摘要错误\n")return}node.mutex.Lock()node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Requestnode.mutex.Unlock()// 校验request的摘要err = node.verifyRequestDigest(prePrepareMsg.Digest)if err != nil{fmt.Printf("%v\n", err)return}node.mutex.Lock()node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Requestnode.mutex.Unlock()err = node.verifyRequestDigest(prePrepareMsg.Digest)if err != nil{fmt.Printf("%v\n", err)return}node.mutex.Lock()if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil {node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)}node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = truenode.mutex.Unlock()// 构建prePare消息prepareMsg := common.PrepareMsg{prePrepareMsg.Digest,ViewID,prePrepareMsg.SequenceID,node.node.ID(),}// 签名msgSig, err := common.SignMessage(prepareMsg, node.keypair.privkey)if err != nil{fmt.Printf("%v\n", err)return}// 消息组合sendMsg := common.ComposeMsg(common.HPrepare,prepareMsg,msgSig)// 序列化消息data, err := json.Marshal(sendMsg)if err != nil{fmt.Println("序列化prepare消息出错", err)return}fmt.Println(">>> 副节点广播prepare消息...")node.broadcast(data)}

prepare消息处理

func (node *Node) handlePrepare(payload []byte, sig []byte) {fmt.Println(">>> 副节点开始接收prepare消息...")// 反序列化prepare消息var prepareMsg common.PrepareMsgerr := json.Unmarshal(payload,&prepareMsg)if err != nil {fmt.Printf("error happened:%v", err)return}// 得到节点的公钥pnodeID := prepareMsg.NodeIDpubKey, err:= findNodePubkey(pnodeID)if err != nil {fmt.Println("获取主节点的公钥失败", err)return}_, err = common.VerifySignatrue(prepareMsg, sig, pubKey)if err != nil{fmt.Printf("校验签名prepare消息错误:%v\n", err)return}err = node.verifyRequestDigest(prepareMsg.Digest)if err != nil{fmt.Printf("%v\n", err)return}// 日记记录node.mutex.Lock()if node.msgLog.prepareLog[prepareMsg.Digest] == nil {node.msgLog.prepareLog[prepareMsg.Digest] = make(map[string]bool)}node.msgLog.prepareLog[prepareMsg.Digest][prepareMsg.NodeID.String()] = truenode.mutex.Unlock()// if receive prepare msg >= 2f +1, then broadcast commit msglimit := node.countNeedReceiveMsgAmount()sum, err:= node.findVerifiedPrepareMsgCount(prepareMsg.Digest)if err != nil {fmt.Printf("error happened:%v", err)return}if sum >= limit {//send commit msgcommitMsg := common.CommitMsg{prepareMsg.Digest,prepareMsg.ViewID,prepareMsg.SequenceID,node.node.ID(),}sig, err := node.signMessage(commitMsg)if err != nil{fmt.Printf("sign message happened error:%v\n", err)}sendMsg := common.ComposeMsg(common.HCommit,commitMsg,sig)data, err := json.Marshal(sendMsg)if err != nil{fmt.Println("序列化commit消息出错", err)}node.broadcast(data)fmt.Println(">>> 副节点广播commit消息成功")}}

commit 消息处理

func (node *Node) handleCommit(payload []byte, sig []byte) {fmt.Println(">>> 副节点开始接收commit消息")// 反序列化消息var commitMsg common.CommitMsgerr := json.Unmarshal(payload,&commitMsg)if err != nil {fmt.Printf("error happened:%v", err)}msgPubKey, err := findNodePubkey(commitMsg.NodeID)if err != nil{fmt.Println(err)return}verify, err := common.VerifySignatrue(commitMsg, sig, msgPubKey)if err != nil{fmt.Printf("verify signature failed:%v\n", err)return}if verify == false {fmt.Printf("verify signature failed\n")return}err = node.verifyRequestDigest(commitMsg.Digest)if err != nil{fmt.Printf("%v\n", err)return}node.mutex.Lock()if node.msgLog.commitLog[commitMsg.Digest] == nil {node.msgLog.commitLog[commitMsg.Digest] = make(map[string]bool)}node.msgLog.commitLog[commitMsg.Digest][commitMsg.NodeID.String()] = truenode.mutex.Unlock()// if receive commit msg >= 2f +1, then send reply msg to clientlimit := node.countNeedReceiveMsgAmount()sum, err := node.findVerifiedCommitMsgCount(commitMsg.Digest)if err != nil{fmt.Printf("error happened:%v", err)return}if sum >= limit {// if already send reply msg, then do nothingnode.mutex.Lock()exist := node.msgLog.replyLog[commitMsg.Digest]node.mutex.Unlock()if exist == true {return}// send reply msgnode.mutex.Lock()requestMsg := node.requestPool[commitMsg.Digest]node.mutex.Unlock()fmt.Printf("operstion:%smessage:%s executed... \n",requestMsg.Operation, requestMsg.CRequest.Message)done := fmt.Sprintf("operstion:%smessage:%s done ",requestMsg.Operation, requestMsg.CRequest.Message)replyMsg := common.ReplyMsg{node.View,int(time.Now().Unix()),requestMsg.Client.ID,node.node.ID().String(),done,}hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", requestMsg.Client.ID))clientNode, err := peer.AddrInfoFromString(hostAddr.String())fmt.Println(">>> 客户端地址:", clientNode.ID.Pretty())if err != nil{fmt.Println(err)return}fmt.Println(">>> 开始向客户端回复数据...")sendMsg := common.ComposeMsg(common.HReply,replyMsg,[]byte{})data, err := json.Marshal(sendMsg)if err != nil{fmt.Println("序列化commit消息出错", err)}node.reply(context.Background(), data, *clientNode)node.mutex.Lock()node.msgLog.replyLog[commitMsg.Digest] = truenode.mutex.Unlock()fmt.Println(">>> 回复客户端成功...")}}

运行结果

最后

项目中做了很多简化并且有很多设计不合理的地方,以后会继续进行改进。源码:https://github.com/blockchainGuide/Consensus_Algorithm