本文主要研究一下PowerJob的ServerDeployContainerRequest

ServerDeployContainerRequest

tech/powerjob/common/request/ServerDeployContainerRequest.java

@Data@NoArgsConstructor@AllArgsConstructorpublic class ServerDeployContainerRequest implements PowerSerializable {/** * 容器ID */private Long containerId;/** * 容器名称 */private String containerName;/** * 文件名(MD5值),用于做版本校验和文件下载 */private String version;/** * 下载地址 */private String downloadURL;}

ServerDeployContainerRequest定义了containerId、containerName、version、downloadURL属性

onReceiveServerDeployContainerRequest

tech/powerjob/worker/actors/WorkerActor.java

@Handler(path = WORKER_HANDLER_DEPLOY_CONTAINER)public void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {OmsContainerFactory.deployContainer(request);}

WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer

deployContainer

tech/powerjob/worker/container/OmsContainerFactory.java

public static synchronized void deployContainer(ServerDeployContainerRequest request) {Long containerId = request.getContainerId();String containerName = request.getContainerName();String version = request.getVersion();log.info("[OmsContainer-{}] start to deploy container(name={},version={},downloadUrl={})", containerId, containerName, version, request.getDownloadURL());OmsContainer oldContainer = CARGO.get(containerId);if (oldContainer != null && version.equals(oldContainer.getVersion())) {log.info("[OmsContainer-{}] version={} already deployed, so skip this deploy task.", containerId, version);return;}String filePath = CONTAINER_DIR + containerId + "/" + version + ".jar";// 下载Container到本地File jarFile = new File(filePath);try {if (!jarFile.exists()) {FileUtils.forceMkdirParent(jarFile);FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());}// 创建新容器OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);newContainer.init();// 替换容器CARGO.put(containerId, newContainer);log.info("[OmsContainer-{}] deployed new version:{} successfully!", containerId, version);if (oldContainer != null) {// 销毁旧容器oldContainer.destroy();}} catch (Exception e) {log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);// 如果部署失败,则删除该 jar(本次失败可能是下载jar出错导致,不删除会导致这个版本永久无法重新部署)CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));}}

deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法

OmsContainer

tech/powerjob/worker/container/OmsContainer.java

public interface OmsContainer extends LifeCycle {/** * 获取处理器 * @param className 全限定类名 * @return 处理器(可以是 MR、BD等处理器) */BasicProcessor getProcessor(String className);/** * 获取容器的类加载器 * @return 类加载器 */OhMyClassLoader getContainerClassLoader();Long getContainerId();Long getDeployedTime();String getName();String getVersion();/** * 尝试释放容器资源 */void tryRelease();}

OmsContainer接口定义了getProcessor、getContainerClassLoader、getContainerId、getDeployedTime、getName、getVersion、tryRelease方法

OmsJarContainer

tech/powerjob/worker/container/OmsJarContainer.java

@Slf4jpublic class OmsJarContainer implements OmsContainer {private final Long containerId;private final String name;private final String version;private final File localJarFile;private final Long deployedTime;// 引用计数器private final AtomicInteger referenceCount = new AtomicInteger(0);private OhMyClassLoader containerClassLoader;private ClassPathXmlApplicationContext container;private final Map processorCache = Maps.newConcurrentMap();public OmsJarContainer(Long containerId, String name, String version, File localJarFile) {this.containerId = containerId;this.name = name;this.version = version;this.localJarFile = localJarFile;this.deployedTime = System.currentTimeMillis();}//......}

OmsJarContainer实现了OmsContainer接口

getProcessor

public BasicProcessor getProcessor(String className) {BasicProcessor basicProcessor = processorCache.computeIfAbsent(className, ignore -> {Class targetClass;try {targetClass = containerClassLoader.loadClass(className);} catch (ClassNotFoundException cnf) {log.error("[OmsJarContainer-{}] can't find class: {} in container.", containerId, className);return null;}// 先尝试从 Spring IOC 容器加载try {return (BasicProcessor) container.getBean(targetClass);} catch (BeansException be) {log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", containerId);} catch (ClassCastException cce) {log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", containerId, className);return null;} catch (Exception e) {log.error("[OmsJarContainer-{}] get bean failed for {}.", containerId, className, e);return null;}// 直接实例化try {Object obj = targetClass.getDeclaredConstructor().newInstance();return (BasicProcessor) obj;} catch (Exception e) {log.error("[OmsJarContainer-{}] load {} failed", containerId, className, e);}return null;});if (basicProcessor != null) {// 引用计数 + 1referenceCount.getAndIncrement();}return basicProcessor;}

getProcessor方法会先通过containerClassLoader.loadClass去加载对应的processor类,加载不到则直接返回,之后根据targetClass去spring容器查找,若查找不到则直接通过targetClass.getDeclaredConstructor().newInstance()尝试实例化

init

public void init() throws Exception {log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath());URL jarURL = localJarFile.toURI().toURL();// 创建类加载器(父类加载为 Worker 的类加载)this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());// 解析 PropertiesProperties properties = new Properties();try (InputStream propertiesURLStream = containerClassLoader.getResourceAsStream(ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME)) {if (propertiesURLStream == null) {log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);}properties.load(propertiesURLStream);log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties);}String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);if (StringUtils.isEmpty(packageName)) {log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);throw new PowerJobException("invalid jar file");}// 加载用户类containerClassLoader.load(packageName);// 创建 Spring IOC 容器(Spring配置文件需要填相对路径)// 需要切换线程上下文类加载器以加载 JDBC 类驱动(SPI)ClassLoader oldCL = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(containerClassLoader);try {this.container = new ClassPathXmlApplicationContext(new String[]{ContainerConstant.SPRING_CONTEXT_FILE_NAME}, false);this.container.setClassLoader(containerClassLoader);this.container.refresh();}finally {Thread.currentThread().setContextClassLoader(oldCL);}log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath());}

init方法根据jar包地址创建OhMyClassLoader,然后先解析oms-worker-container.properties,执行properties.load(propertiesURLStream),接着获取配置的packageName,执行containerClassLoader.load(packageName)加载类,然后根据oms-worker-container-spring-context.xml创建spring的ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法

destroy

public void destroy() throws Exception {// 没有其余引用时,才允许执行 destroyif (referenceCount.get() <= 0) {try {if (localJarFile.exists()) {FileUtils.forceDelete(localJarFile);}}catch (Exception e) {log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", containerId, localJarFile.getPath(), e);}try {processorCache.clear();container.close();containerClassLoader.close();log.info("[OmsJarContainer-{}] container destroyed successfully", containerId);}catch (Exception e) {log.error("[OmsJarContainer-{}] container destroyed failed", containerId, e);}return;}log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", containerId, referenceCount.get());}

destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close

JarContainerProcessorFactory

tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java

@Slf4jpublic class JarContainerProcessorFactory implements ProcessorFactory {private final WorkerRuntime workerRuntime;public JarContainerProcessorFactory(WorkerRuntime workerRuntime) {this.workerRuntime = workerRuntime;}@Overridepublic Set supportTypes() {return Sets.newHashSet(ProcessorType.EXTERNAL.name());}@Overridepublic ProcessorBean build(ProcessorDefinition processorDefinition) {String processorInfo = processorDefinition.getProcessorInfo();String[] split = processorInfo.split("#");String containerName = split[0];String className = split[1];log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);if (omsContainer != null) {return new ProcessorBean().setProcessor(omsContainer.getProcessor(className)).setClassLoader(omsContainer.getContainerClassLoader());} else {log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);}return null;}}

JarContainerProcessorFactory的build方法它根据#来解析出containerId及className,然后通过OmsContainerFactory.fetchContainer去查找容器,然后通过omsContainer.getProcessor(className)获取对应的processor;JarContainerProcessorFactory的supportTypes为EXTERNAL(外部处理器(动态加载))

小结

WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer;deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法;init方法根据jar包地址创建OhMyClassLoader,创建ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法;destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close。