回顾上期的问题,当我们搭建完成Skywalking的搭建,顺利完成应用监控之后,就会面临一类问题,怎么利用获取的监控数据,包括三方面:
1 应用的Trace和SW收集Service/Endpoint不一定完全一致,可能定位不到,更无法在UI展示
2 按Trace-Span进行下钻分析,SW并不支持,更别说,对于按Trace不同Span特征(可以理解为一项业务在不同阶段的特征数据)进行分析
3 业务本身要求监控展示统一技术标准

好在SkyWalking提供了GraphQL数据接口,并配合OAL观察查询语句,使得用户可以直接通过简单的GraphQL查询语言获得数据,
其原理和提供原生查询介绍可以参考官方文档:SW 官方文档
或者网络材料Skywalking-11:Skywalking查询协议——案例分析

我们监控使用grafana8, 因此选择 ,在grafana8通过GraphQL数据接口,接入SW监控数据,接入的过程参考
Linux环境安装开发grafana插件
以及grafana结合Skywalking追踪Trace

但是grafana本身的数据处理能力太弱,于是可以选择在grafana和Skywalking之间增加一个java开发的数据处理模块TraceProcessor,通过TraceProcessor获取SW的trace和Span数据,然后进行加工处理后在ES进行持久化,然后由grafana直接展示ES的数据。

TraceProcessor的主要架构是基于多线程多任务的定时任务,定时获取,计算Trace数据,并支持Graphql,ES接口,以及按配置定制任务的能力,架构如下

我们先从配置工具 config tools入手,希望通过配置文件完成配置数据源(graqhQL)和持久化工具(ES)以及各类定时任务的配置关联,运行时通过反射方式加载各个操作类和定时任务(参考java以SSL方式连ES),以满足敏捷灵活的开发需求

{ "datasource" : {"name": "datasource.GraphQLServiceImp","para": {"url":"http://127.0.0.1:8090/graphql" } }, "targetdb" : {"name": "target.EsServiceImp","para": {"url":"http://127.0.0.1:9200"} }, "tasks" : [{"name": "task.QueryTraces","para" : {"serviceName" : "TradeService","endpointName" : "OrderSend","businessTag" : { "key": "businessTag", "value": "Auto"},"tags" : {},"traces_index" :"traces_-"},"switch" : "on","interval" : "60"},...{ "name": "task.Caculator", "para" : { "businessTags" :[{ "key": "businessTag", "value": "Auto"},{"key": "systemFlag","value": "RealTime"}], "traces_index" :"traces-", "stat_index" : "traces_index-" }, "switch" : "on", "interval" : "60", "delay" : 10 },... ]}

config.json的总体风格定义 执行类(包括路径)和参数,例如数据源graphql,连接,查询执行类就是datasource.GraphQLServiceImp,参数只有一个url http://127.0.0.1:8090/graphql 具体连接参考备忘:python和 java graphql client连Sky walking Server查询数据的联通性中的java部分

除了联通性,datasource.GraphQLServiceImp包含方法还有(以接口形式罗列)

//GraphQLServiceImp对应的interfacepublic interface DatasourceService {//联通及初始化方法public void initConnect(String url);//按ServiceName查询ServiceIdpublic String queryServiceId(String ServiceName);//按ServiceName和EndpointName查询EndpointIdpublic String queryEndPointId(String endpointName,String serviceName);//单页查询,按ServiceId,EndpointId,start_time和End_time以及tags查询trace (page=1)public ArrayNode getTotalTraces(String serviceId,String endpointId,String start_time, String end_time,JsonNode tags);//多页查询,按ServiceId,EndpointId,start_time和End_time以及tags,和pages 查询trace public ArrayNode getTotalTraces2(String serviceId,String endpointId,String start_time,String end_time,JsonNode tags,int pageNum); //按TraceId查spanpublic ArrayNode getTraceSpans(String traceId);}

同理,ES执行类是target.EsServiceImp,参数url为http://127.0.0.1:9200,同样按接口方式罗列操作方法
具体连接可以参考java以SSL方式连ES

public interface TargetdbService {//初始化连接public void initConnect(String url,String userName,String password);//按索引名判断是否存在public boolean isExisted(String indexName);//按key value判断健值对是否在指定索引中存在public boolean isNotInTheIndex(String indexName,String key,String value);//按索引名和mapping创建索引public boolean createForm(String indexName, XContentBuilder mapping);//按索引名删除索引public boolean deleteForm(String indexName);//按索引名和关键值seqNo,插入Mappublic boolean insertDate(String indexName, String seqNo,Map dataMap); //按索引名,批量更新map(List)public boolean updateDate(String indexName,Map<String, List<Map<String,Object>>> resultMap);//按关键字(startTime,endTime和tag标签)查询索引public Map<String, Object> queryData(String indexName, ArrayNode businessTags, String startTime, String endTime, String resultTag);}

对于定时任务,分为两类:
1) tarce查询任务:按需求和Skywalking查询条件,定时查询并筛选Trace,加上业务标签,并持久化

2) trace的指标计算任务:根据查询数据,按业务标签定时计算指标,例如每分钟请求数,平均/最大/百分位数延时、并持久化

这些任务可以根据定时框架调用,分为定时任务类TaskManager

public class TaskManager {private ScheduledExecutorService executorService;public TaskManager() {executorService = Executors.newScheduledThreadPool(10);}public void addTask(Runnable task, long delay, long period, TimeUnit timeUnit) {executorService.scheduleAtFixedRate(task, delay, period, timeUnit);}public void shutdown() {executorService.shutdown();}}

和调度类MyTaskProcess

public class MyTaskProcess {private final static Logger logger = LoggerFactory.getLogger(MyTaskProcess.class);public static void main(String[] args) {TaskManager taskManager = new TaskManager(); //任务管理器try{// 读入配置文件ConfigParser config=new ConfigParser("config.json");// 连接SW Server 数据接口DatasourceService datasourceInstance=config.getDatasource();String datasourceUrl= config.getGraphqlUrl();datasourceInstance.initConnect(datasourceUrl);// 连接ES,获得可用的数据库TargetdbService targetdbInstance=config.getTargetdb();String targetdbUrl=config.getTargetDBUrl();targetdbInstance.initConnect(targetdbUrl);logger.info("start:: {} ...",new Date());//读入任务列表,并且遍历ArrayNode taskList=config.getTaskList();taskList.forEach(JsonNode->{String taskName=JsonNode.get("name").asText();String switch_on=JsonNode.get("switch").asText();logger.info("taskName:: {} switch_on:: {}",taskName,switch_on);if(switch_on.equals("on")){//判断开关是否打开int interval=JsonNode.get("interval").asInt();int delay=1; // 默认延迟if(null!=JsonNode.get("delay"))if(JsonNode.get("delay").asInt()>0){delay=JsonNode.get("delay").asInt();logger.info("delay:: {}",delay);}try {TaskService task=(TaskService)config.getClass(taskName);task.init(JsonNode.get("para"),datasourceInstance,targetdbInstance);taskManager.addTask((Runnable) task, delay, interval, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}}});}catch (Exception e){e.printStackTrace();}finally {// 注册钩子线程Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("Shutting down SWTraceProcessor...");taskManager.shutdown();}));}//taskManager.shutdown();}}

后续我们会给出一个例子,探讨对trace数据深加工的目标和具体实现