基于Netty实现,位于package org.apache.flink.runtime.rest

启动服务 入口类

StandaloneSessionClusterEntrypoint 资源分发工厂类DefaultDispatcherResourceManagerComponentFactory

WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), “DispatcherRestEndpoint”); WebMonitorEndpoint 构造方法

public WebMonitorEndpoint(GatewayRetriever leaderRetriever,Configuration clusterConfiguration,RestHandlerConfiguration restConfiguration,GatewayRetriever resourceManagerRetriever,TransientBlobService transientBlobService,ScheduledExecutorService executor,MetricFetcher metricFetcher,LeaderElectionService leaderElectionService,ExecutionGraphCache executionGraphCache,FatalErrorHandler fatalErrorHandler)throws IOException, ConfigurationException {super(clusterConfiguration);this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);this.restConfiguration = Preconditions.checkNotNull(restConfiguration);this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);this.transientBlobService = Preconditions.checkNotNull(transientBlobService);this.executor = Preconditions.checkNotNull(executor);this.executionGraphCache = executionGraphCache;this.checkpointStatsCache =new CheckpointStatsCache(restConfiguration.getMaxCheckpointStatisticCacheEntries());this.metricFetcher = metricFetcher;this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);}

初始化API

protected List<tuple2> initializeHandlers( final CompletableFuture localAddressFuture)

ClusterOverviewHandler DashboardConfigHandler JobIdsHandler 。。。 </tuple2 抽象类RestServerEndpoint 构造方法

public RestServerEndpoint(Configuration configuration)

获取restAddress、restBindAddress、restBindPortRange等初始化参数 注册具体的处理请求

private static void registerHandler( Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) { switch (httpMethod) { case GET: router.addGet(handlerURL, handler); break; case POST: router.addPost(handlerURL, handler); break; case DELETE: router.addDelete(handlerURL, handler); break; case PATCH: router.addPatch(handlerURL, handler); break; default: throw new RuntimeException(“Unsupported http method: ” + httpMethod + ‘.’); } } 初始化管道

if (isHttpsEnabled()) { ch.pipeline() .addLast( “ssl”, new RedirectingSslHandler( restAddress, restAddressFuture, sslHandlerFactory)); }

 ch.pipeline().addLast(new HttpServerCodec()).addLast(new FileUploadHandler(uploadDir)).addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders));

开启多事件循环

NioEventLoopGroup bossGroup = new NioEventLoopGroup( 1, new ExecutorThreadFactory(“flink-rest-server-netty-boss”)); NioEventLoopGroup workerGroup = new NioEventLoopGroup( 0, new ExecutorThreadFactory(“flink-rest-server-netty-worker”)); 服务事件绑定

bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); 端口绑定

while (portsIterator.hasNext()) { try { chosenPort = portsIterator.next(); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(chosenPort); } else { channel = bootstrap.bind(restBindAddress, chosenPort); } serverChannel = channel.syncUninterruptibly().channel(); break; } catch (final Exception e) { // syncUninterruptibly() throws checked exceptions via Unsafe // continue if the exception is due to the port being in use, fail early // otherwise if (!(e instanceof java.net.BindException)) { throw e; } } } Web实现类 ArrayList<tuple2> handlers = new ArrayList(30); </tuple2 继承的抽象类AbstractRestHandler

文章转自:Flink Rest服务器端点实现_Java-答学网

作者:答学网,转载请注明原文链接:http://www.dxzl8.com/