参考内容:
Java 服务调用全流程追踪 简易实现方案
自实现分布式链路追踪 方案&实践
概述
系统不需要收集太多的指标,但是需要实现链路追踪。升级SpringBoot3.2之后官方推荐使用micorometer组件,使用比较麻烦。
所以采用拦截器的方式,主要的方式是通过MDC中增加traceId字段,并且在上下游对traceId进行传递。
需要特殊处理的地方主要有:远程调用(restTemplate)、异步、多线程、消息队列(kafka)
工具类
public class TraceIdUtil { public static final String REGEX = "-"; public static final String TRACE_ID= "X-B3-TraceId"; /** * 从header和参数中获取traceId * 从网关传入数据 * * @param request HttpServletRequest * @return traceId */ public static String getTraceIdByRequest(HttpServletRequest request) { String traceId = request.getParameter(TRACE_ID); if (StringUtils.isBlank(traceId)) { traceId = request.getHeader(TRACE_ID); } return traceId; } public static String getTraceIdByLocal() { return MDC.get(TRACE_ID); } /** * 传递traceId至MDC * * @param traceId 链路id */ public static void setTraceId(String traceId) { if (StringUtils.isNotBlank(traceId)) { MDC.put(TRACE_ID, traceId); } } /** * 清理traceId */ public static void cleanTraceId() { MDC.clear(); } public static void setTraceIdIfAbsent() { if (MDC.get(TRACE_ID) == null) { MDC.put(TRACE_ID, createTraceId()); } } public static String createTraceId() { String uuid = UUID.randomUUID().toString(); return DigestUtils.md5Hex(uuid).substring(8, 24); } public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } setTraceIdIfAbsent(); try { return callable.call(); } finally { MDC.clear(); } }; } public static Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } setTraceIdIfAbsent(); try { runnable.run(); } finally { MDC.clear(); } }; } }
网关
在网关过滤器中生成并设置traceId到MDC(此刻网关服务的日志中将会打印traceId) ,同时通过header传递到下游服务。
@Slf4j @Configuration @EnableDiscoveryClient public class AuthGatewayFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest serverHttpRequest = exchange.getRequest().mutate() .headers(h->h.add(TraceIdUtil.REQUEST_TRACE_ID,traceId)).build(); return chain.filter(exchange.mutate().request(serverHttpRequest) .build())); }
拦截器
public class AuthenticationFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest req, HttpServletResponse rsp, FilterChain filterChain) throws ServletException, IOException { String traceId = TraceIdUtil.getTraceIdByRequest(req); TraceIdUtil.setTraceId(traceId); } }
远程调用
在restTemplate中添加拦截器,以下是拦截器中的部分代码
public class AuthorizationInterceptor implements ClientHttpRequestInterceptor { @Override public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { String traceId = TraceIdUtil.getTraceIdByLocal(); HttpHeaders headers = request.getHeaders(); if(ObjectUtil.isNotEmpty(traceId)){ headers.add(TraceIdUtil.TRACE_ID,traceId); } return execution.execute(request, body); } }
异步、多线程
重写ThreadPoolTaskExecutor
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolTaskExecutor { public ThreadPoolExecutorMdcWrapper(){ super(); } @Override public void execute(Runnable task) { super.execute(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap())); } }
修改原来的线程,使用重写了的ThreadPoolExecutorMdcWrapper
@Bean("myExecutor") public ThreadPoolTaskExecutor taskExecutor() { //ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 这里就是修改的地方 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolExecutorMdcWrapper(); // 最大线程数一般设置为cpu 数 + 1 taskExecutor.setCorePoolSize(8); taskExecutor.setMaxPoolSize(16); taskExecutor.setQueueCapacity(1000); taskExecutor.setKeepAliveSeconds(3000); // 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务 taskExecutor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setThreadNamePrefix("claimTask-"); return taskExecutor; }
kafka
系统使用的SteamBridge,目前尝试了使用ChannelInterrupter,并重写preSend方法,但是提示header是不可修改的,所以就在每次发送前在header中增加traceId信息,这样代码侵入性太大了,但是目前没有找到更好的方法,所以暂时先采用这种,后续有更好的方法了在修改。
public void sendMessage(MessageObject content) { boolean sendbool = streamBridge.send(ShopChannel.SHOP_OUTPUT, MessageBuilder.withPayload(content) .setHeader(TraceIdUtil.TRACE_ID,TraceIdUtil .getTraceIdByLocal()).build()); } public void receive(Message<MessageObject> message) { MDC.put(TraceIdUtil.TRACE_ID, (String) message.getHeaders().get(TraceIdUtil.TRACE_ID)); MDC.clear(); }