参考内容:
Java 服务调用全流程追踪 简易实现方案
自实现分布式链路追踪 方案&实践

概述

系统不需要收集太多的指标,但是需要实现链路追踪。升级SpringBoot3.2之后官方推荐使用micorometer组件,使用比较麻烦。
所以采用拦截器的方式,主要的方式是通过MDC中增加traceId字段,并且在上下游对traceId进行传递。
需要特殊处理的地方主要有:远程调用(restTemplate)、异步、多线程、消息队列(kafka)

工具类

public class TraceIdUtil {
    public static final String REGEX = "-";

    public static final String TRACE_ID=  "X-B3-TraceId";

    /**
     * 从header和参数中获取traceId
     * 从网关传入数据
     *
     * @param request  HttpServletRequest
     * @return traceId
     */
    public static String getTraceIdByRequest(HttpServletRequest request) {
        String traceId = request.getParameter(TRACE_ID);
        if (StringUtils.isBlank(traceId)) {
            traceId = request.getHeader(TRACE_ID);
        }
        return traceId;
    }

    public static String getTraceIdByLocal() {
        return MDC.get(TRACE_ID);
    }

    /**
     * 传递traceId至MDC
     *
     * @param traceId  链路id
     */
    public static void setTraceId(String traceId) {
        if (StringUtils.isNotBlank(traceId)) {
            MDC.put(TRACE_ID, traceId);
        }
    }


    /**
     * 清理traceId
     */
    public static void cleanTraceId() {
        MDC.clear();
    }

    public static void setTraceIdIfAbsent() {
        if (MDC.get(TRACE_ID) == null) {
            MDC.put(TRACE_ID, createTraceId());
        }
    }

    public static String createTraceId() {
        String uuid = UUID.randomUUID().toString();
        return DigestUtils.md5Hex(uuid).substring(8, 24);
    }

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

网关

在网关过滤器中生成并设置traceId到MDC(此刻网关服务的日志中将会打印traceId) ,同时通过header传递到下游服务。

@Slf4j
@Configuration
@EnableDiscoveryClient
public class AuthGatewayFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
       ServerHttpRequest serverHttpRequest = exchange.getRequest().mutate()
               .headers(h->h.add(TraceIdUtil.REQUEST_TRACE_ID,traceId)).build();
            return chain.filter(exchange.mutate().request(serverHttpRequest)
                    .build()));
    }

拦截器

public class AuthenticationFilter extends OncePerRequestFilter {
   @Override
    protected void doFilterInternal(HttpServletRequest req, HttpServletResponse rsp, FilterChain filterChain)
            throws ServletException, IOException {
        String traceId = TraceIdUtil.getTraceIdByRequest(req);
        TraceIdUtil.setTraceId(traceId);
   }
}

远程调用

在restTemplate中添加拦截器,以下是拦截器中的部分代码

public class AuthorizationInterceptor implements ClientHttpRequestInterceptor {
  @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body,
            ClientHttpRequestExecution execution) throws IOException {
            String traceId = TraceIdUtil.getTraceIdByLocal();
            HttpHeaders headers = request.getHeaders();
            if(ObjectUtil.isNotEmpty(traceId)){
                headers.add(TraceIdUtil.TRACE_ID,traceId);
            }
            return execution.execute(request, body);
    }
}

异步、多线程

重写ThreadPoolTaskExecutor

public class ThreadPoolExecutorMdcWrapper extends ThreadPoolTaskExecutor {
    public ThreadPoolExecutorMdcWrapper(){
        super();
    }

    @Override
    public void execute(Runnable task) {
        super.execute(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap()));
    }



    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

}

修改原来的线程,使用重写了的ThreadPoolExecutorMdcWrapper

    @Bean("myExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        //ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();  这里就是修改的地方
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolExecutorMdcWrapper();
        // 最大线程数一般设置为cpu 数 + 1
        taskExecutor.setCorePoolSize(8);
        taskExecutor.setMaxPoolSize(16);
        taskExecutor.setQueueCapacity(1000);
        taskExecutor.setKeepAliveSeconds(3000);
        // 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务
        taskExecutor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.setThreadNamePrefix("claimTask-");
        return taskExecutor;
    }

kafka

系统使用的SteamBridge,目前尝试了使用ChannelInterrupter,并重写preSend方法,但是提示header是不可修改的,所以就在每次发送前在header中增加traceId信息,这样代码侵入性太大了,但是目前没有找到更好的方法,所以暂时先采用这种,后续有更好的方法了在修改。

  public void sendMessage(MessageObject content) {
            boolean sendbool = streamBridge.send(ShopChannel.SHOP_OUTPUT, 
               MessageBuilder.withPayload(content)
               .setHeader(TraceIdUtil.TRACE_ID,TraceIdUtil
               .getTraceIdByLocal()).build());
    }

  public void receive(Message<MessageObject> message)
    {
           MDC.put(TraceIdUtil.TRACE_ID, (String) message.getHeaders().get(TraceIdUtil.TRACE_ID));
           MDC.clear();
    }