小乌龟今天学习的是 Spark的 通讯框架。因为Spark 毕竟是分布式的,各模块之间需要进行通信,那么就必然用到通信框架。

Spark 通信架构概述

Spark1.6 之前使用的是 Akka 作为内部通讯组件,Spark1.6 之后将 Akka 换成了 Netty 。但是它借鉴了 Akka 中的设计,即 Actor 模型。
Spark 是一个分布式计算系统,因此节点间存在很多通信,那么Spark 就会借助这些通讯框架进行RPC 通信。
Spark 很多节点间都存在通信,例如:

  • driver 和 master 之间的通信,比如 driver 回向 master 申请计算资源
  • masetr 和 worker 之间进行通信,比如 worker 会向 master 上报 worker 上运行的 Executor 信息
  • exexutor 和 driver 通信,executor 向 driver 汇报任务运行结果
  • worker 和 worker 之间的通信,task之间需要互相拉取数据

Akka 介绍

Actor 模型

Akka 基于 Actor 模型,Actor 模型如下图所示。

简单案例

模拟yarn的 ResourceManager 和 NodeManager 互相通信。

  • MyResourceManager.scala
class MyResourceManager(var hostname: String, var port: Int) extends Actor {// 用来存储每个注册的NodeManager节点的信息private var id2nodemanagerinfo = new mutable.HashMap[String, NodeManagerInfo]()// 对所有注册的NodeManager进行去重,其实就是一个HashSetprivate var nodemanagerInfoes = new mutable.HashSet[NodeManagerInfo]()// actor在最开始的时候,会执行一次override def preStart(): Unit = {import scala.concurrent.duration._import context.dispatcher// 调度一个任务, 每隔五秒钟执行一次context.system.scheduler.schedule(0 millis, 5000 millis, self, CheckTimeOut)}override def receive: Receive = {case RegisterNodeManager(nodemanagerid, memory, cpu) => {val nodeManagerInfo = new NodeManagerInfo(nodemanagerid, memory, cpu)println(s"节点 ${nodemanagerid} 上线")// 对注册的NodeManager节点进行存储管理id2nodemanagerinfo.put(nodemanagerid, nodeManagerInfo)nodemanagerInfoes += nodeManagerInfo//把信息存到zookeepersender() ! RegisteredNodeManager(hostname + ":" + port)}case Heartbeat(nodemanagerid) => {val currentTime = System.currentTimeMillis()val nodeManagerInfo = id2nodemanagerinfo(nodemanagerid)nodeManagerInfo.lastHeartBeatTime = currentTimeid2nodemanagerinfo(nodemanagerid) = nodeManagerInfonodemanagerInfoes += nodeManagerInfo}// 检查过期失效的 NodeManagercase CheckTimeOut => {val currentTime = System.currentTimeMillis()// 15 秒钟失效nodemanagerInfoes.filter(nm => {val bool = currentTime - nm.lastHeartBeatTime > 15000if (bool) {println(s"节点 ${nm.nodemanagerid} 下线")}bool}).foreach(deadnm => {nodemanagerInfoes -= deadnmid2nodemanagerinfo.remove(deadnm.nodemanagerid)})println("当前注册成功的节点数" + nodemanagerInfoes.size + "\t分别是:" + nodemanagerInfoes.map(x => x.toString).mkString(","));}}}object MyResourceManager {def main(args: Array[String]): Unit = {//val RESOURCEMANAGER_HOSTNAME="localhost" //解析的配置的日志//val RESOURCEMANAGER_PORT=6789val str =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = localhost|akka.remote.netty.tcp.port = 6789""".stripMarginval conf = ConfigFactory.parseString(str)// TODO_MA 注释:ActorSystemval actorSystem = ActorSystem(Constant.RMAS, conf)// TODO_MA 注释:启动了一个actor : MyResourceManageractorSystem.actorOf(Props(new MyResourceManager("localhost", 6789)), Constant.RMA)}}
  • MyNodeManager
class MyNodeManager(val nmhostname: String, val resourcemanagerhostname: String, val resourcemanagerport: Int,val memory: Int, val cpu: Int) extends Actor {var nodemanagerid: String = nmhostnamevar rmRef: ActorSelection = _override def preStart(): Unit = {// 远程path  akka.tcp://(ActorSystem的名称)@(远程地址的IP) : (远程地址的端口)/user/(Actor的名称)rmRef = context.actorSelection(s"akka.tcp://${Constant.RMAS}@${resourcemanagerhostname}:${resourcemanagerport}/user/${Constant.RMA}")// val nodemanagerid:String// val memory:Int// val cpu:Int//nodemanagerid = UUID.randomUUID().toString//nodemanagerid = "hadoop05"println(nodemanagerid + " 正在注册")rmRef ! RegisterNodeManager(nodemanagerid, memory, cpu)}override def receive: Receive = {case RegisteredNodeManager(masterURL) => {println(masterURL);/** * initialDelay: FiniteDuration, 多久以后开始执行 * interval: FiniteDuration, 每隔多长时间执行一次 * receiver: ActorRef, 给谁发送这个消息 * message:Any发送的消息是啥 */import scala.concurrent.duration._import context.dispatchercontext.system.scheduler.schedule(0 millis, 4000 millis, self, SendMessage)}case SendMessage => {//向主节点发送心跳信息rmRef ! Heartbeat(nodemanagerid)println(Thread.currentThread().getId)}}}object MyNodeManager {def main(args: Array[String]): Unit = {val HOSTNAME = args(0)val RM_HOSTNAME = args(1)val RM_PORT = args(2).toIntval NODEMANAGER_MEMORY = args(3).toIntval NODEMANAGER_CORE = args(4).toIntvar NODEMANAGER_PORT = args(5).toIntvar NMHOSTNAME = args(6)val str =s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname =${HOSTNAME} |akka.remote.netty.tcp.port=${NODEMANAGER_PORT}""".stripMarginval conf = ConfigFactory.parseString(str)val actorSystem = ActorSystem(Constant.NMAS, conf)actorSystem.actorOf(Props(new MyNodeManager(NMHOSTNAME, RM_HOSTNAME, RM_PORT, NODEMANAGER_MEMORY, NODEMANAGER_CORE)), Constant.NMA)}}

Spark 通信架构解析

组件间的关系

Spark 通讯架构中各个组件(Client/Master/Worker)可独立认为是一个个独立的实体,各个实体之间通过消息来通信。具体各个组件之间的关系如下:

EndPoint 有1个InBox 和 N 个 OutBox(N>=1,N取决于当前 EndPoint 与多少其他的 EndPoint 进行通信,一个与其通讯的其他 EndPoint 对应一个 OutBox),EndPoint 接收到的消息被写入 InBox,发送出去的消息写入 OutBox 并被发送到其他的EndPoint 的 InBox 中。

通讯架构

Spark的通讯架构如下图所示:

  1. RpcEndPoint
RPC 端点,Spark针对每个节点(Client/Master/Worker)都称为Rpc 端点,且都实现 RpcEndPoint 接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher。

RpcEndPoint类似于 Akka 中的 Actor。一个RpcEndpoint 经历的过程依次是:构建 -> onStart -> receive -> onStop。其中 onStart 在接收任务消息前调用,reveive 和 receiveAndReply 分别用来接收另外一个 RpcEndpoint(也可以是本身) send 和 ask 过来的消息,应答通过 RpcContext 回调。

  1. RpcEndpointRef
类似于 Akka 中的 ActorRef,是 RpcEndPoint 的引用,持有远程 RpcEndPoint 的地址名称等,提供了send 方法和 ask 方法用来发送请求。

RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当我们需要向一个具体的 RpcEndPoint 发送消息时,一般我们需要获取到该 RpcEndpoint 的引用,然后通过该引用发送消息。

  1. RpcEnv 和 NettyRpcEnv
RPC 上下文环境,RpcEnv 类似于 ActorSystem, 服务端和客户端都可以使用它来做通信。

对于 server 端来说,RpcEnv 是 RpcEndpoint 的运行环境,负责 RpcEndPoint 的生命周期管理,解析 Tcp 层的数据包以及反序列化数据封装成 RpcMessage,然后根据路由传送到对应的 Endpoint;

对于 client 端来说,可以通过 RpcEnv 获取 RpcEnvpoint 的引用,也就是 RpcEndpointRef,然后通过 RpcEndpointRef 与对应的 Endpoint 通信。

RpcEnv 为 RpcEndpoint 提供处理消息处理的环境,RpcEnv 负责 RpcEndpoint 整个生命周期的管理,包括:注册 endpoint。endpoint 之间消息的路由,以及停止 endpoint。

  1. Dispathcer 、InBox以及 OutBox
NettyRpcEnv 中包含 Dispatcher,主要针对服务端,帮助路由器到指定的 RpcEndPoint,并调用起业务逻辑。

Dispathcer : 消息分发器,针对RPC 断点需要发送消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方手机自己则存入收件箱,如果指令接受方不是自己,则放入发件箱;

Inbox:指令消息收件箱,一个本地RpcEndPoint 对应一个收件箱,Dispatcher 在每次向 Inbox 存入消息时,都将对应 EndpointData 加入内部 ReceiverQueue中;另外 Dispatcher 创建时会启动一个单独的线程进行轮询 Receiver Queue,进行收件箱消息消费。

OutBox: 指令消息发送箱,对于当前 RpcEndpoint 来说,一个目标 RpcEndPoint 对应一个发件箱,如果多个目标 RpcEndpoint 发送消息,则有多个OutBox。当消息放入 Outbox 后,紧接着通过 TransportClient 将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行,这样做得主要原因是远程消息分为 RpcOutboxMessage,OneWayOutBoxMessage 两种消息,而针对需要应答的消息直接发送且需要得到结果进行处理。

  1. RpcAddress、TransportClient、TransportServer
RpcAddress: 表示远程的RpcEndPointRef 的地址,Host + Port。

TransportClient:Netty 通信客户端,一个OutBox 对应一个 TransportClient,TransportClient 不断轮询 OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer;

TransportServer:Netty 通信服务端,一个 RpcEndpoint 对应一个TransportServer,接受远程消息后调用 Dispatcher 分发消息至对应收发件箱。

根据上面的分析, Spark 通信架构的高层视图如下图所示:

Spark 的 Rpc 类图关系


核心要点如下:

  1. 核心的 RpcEnv 是一个特质(trait),它主要提供了停止、注册,获取 endpoint 等方法的定义,而NettyRpcEnv 提供了该特质的一个具体的实现。
  2. 通过工厂 RpcEnvFactory 来生产一个 RpcEnv,而 NettyRpcEnvFactory 用来生成 NettyRpcEnv 的一个对象。
  3. 当我们调用RpcEnv 中的 SetupEndpoint 来注册一个 endpoint 到 rpcEnv 的时候,在 NettyRpcEnv 内部,会将该 endpoint 的名称与其本身的映射关系,rpcEndpoint 与 rpcEndpointRef 之间映射关系保存在 dispatcher 对应的成员变量中。