传奇简单网站模板,江苏常州烽火台网络推广,温州网站开发定制,网站权限设计前言
随着越来越多的应用逐渐微服务化后#xff0c;分布式服务之间的RPC调用使得异常排查的难度骤增#xff0c;最明显的一个问题#xff0c;就是整个调用链路的日志不在一台机器上#xff0c;往往定位问题就要花费大量时间。如何在一个分布式网络中把单次请求的整个调用日…前言
随着越来越多的应用逐渐微服务化后分布式服务之间的RPC调用使得异常排查的难度骤增最明显的一个问题就是整个调用链路的日志不在一台机器上往往定位问题就要花费大量时间。如何在一个分布式网络中把单次请求的整个调用日志给串起来变得刻不容缓。
笔者基于 Dubbo 框架的 Filter 扩展点实现了一个分布式日志跟踪工具 dubbo-tracing源码地址https://github.com/panchanghe/dubbo-tracing
实现思路
Dubbo 作为国内最热门的 RPC 框架之一对外提供了丰富的功能扩展点日志跟踪就需要用到其中org.apache.dubbo.rpc.Filter扩展点。
Filter 扩展点可以在 Consumer 发起 RPC 调用前和 Provider 处理请求前发起拦截执行我们特定的业务逻辑来对 Dubbo 做增强。另外Dubbo RPC 调用除了方法入参还额外提供了 Map 类型的 attachments 来隐式的传递参数。
有了这些前提要实现分布式日志跟踪就简单了。通过实现 Filter 扩展点拦截 RPC 调用最早的 Consumer 端生成一个唯一的 TraceId 进行透传TraceId 在整个调用链路里保持一致TraceId 会被写到日志上下文 MDC 中最终和业务日志一起打印到日志文件里这样通过 TraceId 检索就能获取整个调用链路的所有日志。一个完整的 RPC 调用链路是一个树状结构最早发起调用的节点是根节点一直向下延伸为了把整个链路的日志构造成树状结构展示我们还需要一个 SpanId它代表了当前日志在整个调用链路中的层级。有了这些日志数据再搭配日志检索服务 图形化展示分布式问题的排查就会简单很多。
TraceId和SpanId生成规则
这里借鉴一下阿里的做法。
TraceId一般由接受请求的第一个服务器产生具有唯一性且在整个调用链路中保持不变。
TraceId的生成规则是服务器IP 时间戳 自增序列 进程号比如
c0a861711731309291125100068524前8位c0a86171是生成TraceId的服务器IP它被编码为十六进制每2位代表IP地址中的一段转换成十进制结果就是192.168.97.113可以根据该号段快速定位到生成TraceId的服务器。
后面的13位1731309291125是生成TraceId的毫秒级时间戳之后的4位1000是一个自增的序列从 1000 开始涨到 9999 后又会回到 1000最后的部分68524是当前进程的ID主要是为了防止单机多进程间产生的TraceId发生冲突。
SpanId 代表本次调用在整个调用链路树中的位置。
假设一个 Web 系统 A 接收了一次用户请求那么在这个系统的 MVC 日志中记录下的 SpanId 是 0代表是整个调用的根节点如果 A 系统处理这次请求需要通过 RPC 依次调用 B、C、D 三个系统那么在 A 系统的 RPC 客户端日志中SpanId 分别是 0.10.2 和 0.3在 B、C、D 三个系统的 RPC 服务端日志中SpanId 也分别是 0.10.2 和 0.3如果 C 系统在处理请求的时候又调用了 EF 两个系统那么 C 系统中对应的 RPC 客户端日志是 0.2.1 和 0.2.2E、F 两个系统对应的 RPC 服务端日志也是 0.2.1 和 0.2.2。
根据上面的描述可以知道如果把一次调用中所有的 SpanId 收集起来可以组成一棵完整的链路树。
假设一次分布式调用中产生的 TraceId 是 0a1234实际不会这么短那么根据上文 SpanId 的产生过程如下图所示 具体实现
1、首先是实现一个根据 机器IP、时间戳、自增序列、进程ID 生成 TraceId 的方案
public class IdUtils {private static final String PROCESS_ID;private static final String IP_HEX_CODE;private static final AtomicInteger COUNTER;private static final int COUNT_INIT_VALUE 1000;private static final int COUNT_MAX_VALUE 9999;private static long lastTimestamp 0L;static {PROCESS_ID ProcessIdUtil.getProcessId();IP_HEX_CODE getIpHexCode();COUNTER new AtomicInteger(COUNT_INIT_VALUE);}/*** 8位 13位 4位* 服务器 IP ID 产生的时间 自增序列 当前进程号** return*/public static synchronized String newTraceId() {final long timestamp System.currentTimeMillis();long count;if (timestamp lastTimestamp) {COUNTER.set(COUNT_INIT_VALUE);count COUNT_INIT_VALUE;lastTimestamp timestamp;} else {count COUNTER.incrementAndGet();if (count COUNT_MAX_VALUE) {COUNTER.set(COUNT_INIT_VALUE - 1);}}return IP_HEX_CODE timestamp count PROCESS_ID;}private static String getIpHexCode() {final StringBuilder builder new StringBuilder();String host NetUtils.getLocalHost();String[] split host.split(\\.);for (String s : split) {String hex Integer.toHexString(Integer.valueOf(s));if (hex.length() 1) {hex 0 hex;}builder.append(hex);}return builder.toString();}
}2、为了方便本地透传 TraceId 等信息必然要用到 ThreadLocal 来记录所以我们创建一个 TraceContext 类来读写当前线程的 Trace 信息。
public class TraceContext {private static final ThreadLocalMapString, Object TRACE_THREAD_LOCAL new ThreadLocal() {Overrideprotected Object initialValue() {return new HashMap();}};public static boolean isStarted() {return !get().isEmpty();}public static void start(String traceId) {start(traceId, 0);}public static void start(String traceId, String spanId) {get().put(TracingConstant.TRACE_ID, traceId);get().put(TracingConstant.SPAN_ID, spanId);get().put(TracingConstant.LOGIC_ID, new AtomicInteger(0));}public static String getTraceId() {return (String) get().get(TracingConstant.TRACE_ID);}public static String getSpanId() {String s (String) get().get(TracingConstant.SPAN_ID);return s;}public static int nextLogicId() {return ((AtomicInteger) get().get(TracingConstant.LOGIC_ID)).incrementAndGet();}private static MapString, Object get() {return TRACE_THREAD_LOCAL.get();}public static void clear() {TRACE_THREAD_LOCAL.remove();}
}3、Consumer 端的 Filter 扩展判断当前线程是否已经生成 TraceId如果没有则生成新的 TraceId 和 SpanId 写入到 ThreadLocal 同时通过 attachments 透传到 Provider。
Activate(group {consumer})
public class ConsumerTraceFilter implements Filter {Overridepublic Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {if (!TraceContext.isStarted()) {TraceContext.start(getTraceId());}ThreadContext.put(TracingConstant.TRACE_ID, TraceContext.getTraceId());ThreadContext.put(TracingConstant.SPAN_ID, TraceContext.getSpanId());invocation.setAttachment(TracingConstant.DUBBO_TRACE_ID, TraceContext.getTraceId());invocation.setAttachment(TracingConstant.DUBBO_SPAN_ID, TraceContext.getSpanId() . TraceContext.nextLogicId());return invoker.invoke(invocation);}private String getTraceId() {String traceId ThreadContext.get(TracingConstant.TRACE_ID);if (StringUtils.isEmpty(traceId)) {traceId IdUtils.newTraceId();}return traceId;}
}4、Provider 端的 Filter 扩展读取 attachments 透传过来的 TraceId 和 SpanId如果能读到就将它们写入本地 ThreadLocal 里开启 TraceContext后续如果自己再发起下游的 RPC 调用则会以它们为基础数据发给下游节点整个链路就能串起来了。
Activate(group {provider})
public class ProviderTraceFilter implements Filter {Overridepublic Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {final String traceId RpcContext.getServerAttachment().getAttachment(TracingConstant.DUBBO_TRACE_ID);final String spanId RpcContext.getServerAttachment().getAttachment(TracingConstant.DUBBO_SPAN_ID);if (StringUtils.isAnyEmpty(traceId, spanId)) {return invoker.invoke(invocation);}TraceContext.start(traceId, spanId);ThreadContext.put(TracingConstant.TRACE_ID, TraceContext.getTraceId());ThreadContext.put(TracingConstant.SPAN_ID, TraceContext.getSpanId());try {return invoker.invoke(invocation);} catch (Throwable e) {throw e;} finally {TraceContext.clear();ThreadContext.remove(TracingConstant.TRACE_ID);ThreadContext.remove(TracingConstant.SPAN_ID);}}
}5、为了让我们自定义的 Filter 能被 Dubbo 加载并执行还需要在 META-INF/dubbo/org.apache.dubbo.rpc.Filter文件里配置一下
ProviderTraceFiltertop.javap.dubbo.tracing.ProviderTraceFilter
ConsumerTraceFiltertop.javap.dubbo.tracing.ConsumerTraceFilter