当一个服务拥有太多处理逻辑时,会导致代码结构异常的混乱,很难分辨一段逻辑是在哪个阶段发挥作用的。
这时就可以引入状态机模型,帮助代码结构变得清晰。

一、状态机库概述一)简介

状态机由一组状态组成:
【初始状态 -> 中间状态 -> 最终状态】。
在一个状态机中,每个状态会接收一组特定的事件,根据事件类型进行处理,并转换到下一个状态。当转换到最终状态时则退出。

二)状态转换方式

状态间转换会有下面这三种类型:

三)Yarn 状态机类

在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮助定义状态机。如何使用,我们直接写个 demo。

二、案例 demo

在上一篇文章《Yarn 服务库和事件库》案例基础上进行扩展,增加状态机库的内容。如果还不了解服务库和事件库的同学,建议先学习下上一篇文章。
案例已上传至 github,有帮助可以点个 ⭐️
https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo

一)状态机实现

状态机实现,可以直接嵌入到上篇文章中的 AsyncDispatcher使用。
这里仅给出状态机JobStateMachine以及各种事件处理的代码。完整的代码项目执行,请到 github demo 中查看。

import com.shuofxz.event.JobEvent;import com.shuofxz.event.JobEventType;import org.apache.hadoop.yarn.event.EventHandler;import org.apache.hadoop.yarn.state.*;import java.util.EnumSet;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** 可参考 Yarn 中实现的状态机对象:* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,* NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等* */@SuppressWarnings({"rawtypes", "unchecked"})public class JobStateMachine implements EventHandler {    private final String jobID;    private EventHandler eventHandler;    private final Lock writeLock;    private final Lock readLock;    // 定义状态机    protected static final StateMachineFactory            stateMachineFactory = new StateMachineFactory(JobStateInternal.NEW)            .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())            .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())            .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())            .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())            .installTopology();    private final StateMachine stateMachine;    public JobStateMachine(String jobID, EventHandler eventHandler) {        this.jobID = jobID;        // 多线程异步处理,state 有可能被同时读写,使用读写锁来避免竞争        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();        this.readLock = readWriteLock.readLock();        this.writeLock = readWriteLock.writeLock();        this.eventHandler = eventHandler;        stateMachine = stateMachineFactory.make(this);    }    protected StateMachine getStateMachine() {        return stateMachine;    }    public static class InitTransition implements SingleArcTransition {        @Override        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {            System.out.println("Receiving event " + jobEvent);            // do something...            // 完成后发送新的 Event —— JOB_START            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));        }    }    public static class StartTransition implements SingleArcTransition {        @Override        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {            System.out.println("Receiving event " + jobEvent);            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));        }    }    public static class SetupCompletedTransition implements SingleArcTransition {        @Override        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {            System.out.println("Receiving event " + jobEvent);            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));        }    }    public static class JobTasksCompletedTransition implements MultipleArcTransition {        @Override        public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {            System.out.println("Receiving event " + jobEvent);            // 这是多结果状态部分,因此需要人为制定后续状态            // 这里整个流程结束,设置一下对应的状态            boolean flag = true;            if (flag) {                return JobStateInternal.SUCCEEDED;            } else {                return JobStateInternal.KILLED;            }        }    }    @Override    public void handle(JobEvent jobEvent) {        try {            // 注意这里为了避免静态条件,使用了读写锁            writeLock.lock();            JobStateInternal oldState = getInternalState();            try {                getStateMachine().doTransition(jobEvent.getType(), jobEvent);            } catch (InvalidStateTransitionException e) {                System.out.println("Can't handle this event at current state!");            }            if (oldState != getInternalState()) {                System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());            }        } finally {            writeLock.unlock();        }    }    public JobStateInternal getInternalState() {        readLock.lock();        try {            return getStateMachine().getCurrentState();        } finally {            readLock.unlock();        }    }    public enum JobStateInternal {        NEW,        SETUP,        INITED,        RUNNING,        SUCCEEDED,        KILLED    }}

二)状态机可视化

hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java,可以拷贝到我们的工程中使用。
根据提示,运行需要三个参数:

Usage: %s   %n

运行后会在项目根目录生成图文件 jsm.gv
需要使用 graphviz工具将 gv 文件转换成 png 文件:

# linux 安装yum install graphviz# mac 安装brew install graphviz

转换:

dot -Tpng jsm.gv > jsm.png

可视化状态机展示:

再使用这个工具对 Yarn 中的 Application 状态进行展示:

三)如果不用状态机库

【思考】
如果不用状态机,代码结构会是什么样呢?
下面这样的代码,如果要增加或修改逻辑可能就是很痛苦的一件事情了。

// 一堆的函数调用// 一堆的 if 嵌套// 或者 switch case

三、总结

本节对 Yarn 状态机库进行了介绍。实际使用时会结合事件库、服务库一同使用。
状态机库的使用帮助代码结构更加的清晰,新增状态处理逻辑只需要增加一个状态类别,或者增加一个方法处理对应类型的事件即可。将整个处理逻辑进行了拆分,便于编写和维护。


参考文章:
源码|Yarn的事件驱动模型与状态机