参考内容: Java 服务调用全流程追踪 简易实现方案自实现分布式链路追踪 方案&实践
概述
系统不需要收集太多的指标,但是需要实现链路追踪。升级SpringBoot3.2之后官方推荐使用micorometer组件,使用比较麻烦。 所以采用拦截器的方式,主要的方式是通过MDC中增加traceId字段,并且在上下游对traceId进行传递。 需要特殊处理的地方主要有:远程调用(restTemplate)、异步、多线程、消息队列(kafka)
工具类
public class TraceIdUtil {
public static final String REGEX = "-";
public static final String TRACE_ID= "X-B3-TraceId";
/**
* 从header和参数中获取traceId
* 从网关传入数据
*
* @param request HttpServletRequest
* @return traceId
*/
public static String getTraceIdByRequest(HttpServletRequest request) {
String traceId = request.getParameter(TRACE_ID);
if (StringUtils.isBlank(traceId)) {
traceId = request.getHeader(TRACE_ID);
}
return traceId;
}
public static String getTraceIdByLocal() {
return MDC.get(TRACE_ID);
}
/**
* 传递traceId至MDC
*
* @param traceId 链路id
*/
public static void setTraceId(String traceId) {
if (StringUtils.isNotBlank(traceId)) {
MDC.put(TRACE_ID, traceId);
}
}
/**
* 清理traceId
*/
public static void cleanTraceId() {
MDC.clear();
}
public static void setTraceIdIfAbsent() {
if (MDC.get(TRACE_ID) == null) {
MDC.put(TRACE_ID, createTraceId());
}
}
public static String createTraceId() {
String uuid = UUID.randomUUID().toString();
return DigestUtils.md5Hex(uuid).substring(8, 24);
}
public static Callable wrap(final Callable callable, final Map context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
return callable.call();
} finally {
MDC.clear();
}
};
}
public static Runnable wrap(final Runnable runnable, final Map context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}
网关
在网关过滤器中生成并设置traceId到MDC(此刻网关服务的日志中将会打印traceId) ,同时通过header传递到下游服务。
@Slf4j
@Configuration
@EnableDiscoveryClient
public class AuthGatewayFilter implements GlobalFilter, Ordered {
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest serverHttpRequest = exchange.getRequest().mutate()
.headers(h->h.add(TraceIdUtil.REQUEST_TRACE_ID,traceId)).build();
return chain.filter(exchange.mutate().request(serverHttpRequest)
.build()));
}
拦截器
public class AuthenticationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest req, HttpServletResponse rsp, FilterChain filterChain)
throws ServletException, IOException {
String traceId = TraceIdUtil.getTraceIdByRequest(req);
TraceIdUtil.setTraceId(traceId);
}
}
远程调用
在restTemplate中添加拦截器,以下是拦截器中的部分代码
public class AuthorizationInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
String traceId = TraceIdUtil.getTraceIdByLocal();
HttpHeaders headers = request.getHeaders();
if(ObjectUtil.isNotEmpty(traceId)){
headers.add(TraceIdUtil.TRACE_ID,traceId);
}
return execution.execute(request, body);
}
}
异步、多线程
重写ThreadPoolTaskExecutor
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolTaskExecutor {
public ThreadPoolExecutorMdcWrapper(){
super();
}
@Override
public void execute(Runnable task) {
super.execute(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future submit(Callable task) {
return super.submit(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future submit(Runnable task) {
return super.submit(TraceIdUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
修改原来的线程,使用重写了的ThreadPoolExecutorMdcWrapper
@Bean("myExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
//ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 这里就是修改的地方
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolExecutorMdcWrapper();
// 最大线程数一般设置为cpu 数 + 1
taskExecutor.setCorePoolSize(8);
taskExecutor.setMaxPoolSize(16);
taskExecutor.setQueueCapacity(1000);
taskExecutor.setKeepAliveSeconds(3000);
// 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务
taskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setThreadNamePrefix("claimTask-");
return taskExecutor;
}
kafka
系统使用的SteamBridge,目前尝试了使用ChannelInterrupter,并重写preSend方法,但是提示header是不可修改的,所以就在每次发送前在header中增加traceId信息,这样代码侵入性太大了,但是目前没有找到更好的方法,所以暂时先采用这种,后续有更好的方法了在修改。
public void sendMessage(MessageObject content) {
boolean sendbool = streamBridge.send(ShopChannel.SHOP_OUTPUT,
MessageBuilder.withPayload(content)
.setHeader(TraceIdUtil.TRACE_ID,TraceIdUtil
.getTraceIdByLocal()).build());
}
public void receive(Message message)
{
MDC.put(TraceIdUtil.TRACE_ID, (String) message.getHeaders().get(TraceIdUtil.TRACE_ID));
MDC.clear();
}