SpringBoot @Async 异步注解最佳使用实践

SpringBoot @Async 异步注解最佳使用实践

一、线程池配置

生产环境绝对不能使用 @Async 默认的 SimpleAsyncTaskExecutor,它每次调用都新建线程,无上限,高并发下直接 OOM。

标准配置模板:

@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.error("Async method threw exception. method: {}, params: {}, exception: {}",
                method.getName(),
                paramsToString(params),
                ex.getMessage(),
                ex
        );
    }

    private String paramsToString(Object[] params) {
        if (params == null || params.length == 0) {
            return "[]";
        }
        try {
            return Arrays.toString(params);
        } catch (Exception e) {
            return "[unprintable params]";
        }
    }
}
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    /**
     * 默认异步线程池,所有不指定线程池的 @Async 都走这里
     */
    @Override
    @Bean("asyncExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数:建议设置为 CPU 核心数(IO密集型可适当放大,如 2N)
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        // 最大线程数
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        // 队列容量:防止任务堆积,设置合理上限
        executor.setQueueCapacity(500);
        // 线程名前缀:方便日志追踪
        executor.setThreadNamePrefix("async-default-");
        // 拒绝策略:CallerRunsPolicy 让调用方线程执行,起到限速作用
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 优雅关闭:等待队列中的任务执行完再关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }

    /**
     * 异步任务异常处理器
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

按业务隔离线程池:

核心业务与非核心业务必须使用不同线程池,防止发短信、写日志等非核心任务把核心业务线程全部占满。

@Configuration
public class ExecutorConfig {

    /** 订单核心业务线程池 */
    @Bean("orderExecutor")
    public Executor orderExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("order-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /** 通知/消息推送线程池(非核心,允许丢弃) */
    @Bean("notifyExecutor")
    public Executor notifyExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("notify-async-");
        // 非核心任务满载直接丢弃,不影响主流程
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
        return executor;
    }
}

使用时指定线程池:

@Async("orderExecutor")
public void processOrder(Order order) { ... }

@Async("notifyExecutor")
public void sendSms(String phone, String content) { ... }

二、异常处理

@Async 方法在独立线程中运行,主线程感知不到异常。必须显式处理,否则异常会被静默吞掉。

2.1 void 方法:实现全局异常处理器

@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.error("[Async异常] method={}, params={}, error={}",
                method.getName(), Arrays.toString(params), ex.getMessage(), ex);
        // 可接入告警:钉钉、飞书、Sentry 等
        alertService.sendAlert("异步任务异常: " + method.getName(), ex);
    }
}

2.2 有返回值:通过 CompletableFuture 捕获

@Async("orderExecutor")
public CompletableFuture<OrderResult> processOrderAsync(Order order) {
    try {
        OrderResult result = orderService.process(order);
        return CompletableFuture.completedFuture(result);
    } catch (Exception e) {
        log.error("[订单异步处理失败] orderId={}", order.getId(), e);
        // 返回异常的 Future,调用方通过 exceptionally 处理
        return CompletableFuture.failedFuture(e);
    }
}

调用方处理:

orderService.processOrderAsync(order)
    .thenAccept(result -> log.info("订单处理完成: {}", result))
    .exceptionally(ex -> {
        log.error("订单处理失败,触发补偿逻辑", ex);
        compensationService.compensate(order);
        return null;
    });

三、TraceId 透传(生产必备)

异步线程是新线程,MDC 中的 TraceId 不会自动传递,日志会断链,无法串联一次请求的完整调用链。

方案:装饰 ThreadPoolTaskExecutor,提交任务时捕获并传递 MDC。

public class MdcAwareExecutor extends ThreadPoolTaskExecutor {

    @Override
    public void execute(Runnable task) {
        // 提交任务时,在当前线程(主线程)捕获 MDC 快照
        Map<String, String> mdcContext = MDC.getCopyOfContextMap();
        super.execute(() -> {
            // 在异步线程中恢复 MDC
            if (mdcContext != null) {
                MDC.setContextMap(mdcContext);
            }
            try {
                task.run();
            } finally {
                MDC.clear();  // 线程池线程复用,必须清理,防止污染下一个任务
            }
        });
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Map<String, String> mdcContext = MDC.getCopyOfContextMap();
        return super.submit(() -> {
            if (mdcContext != null) {
                MDC.setContextMap(mdcContext);
            }
            try {
                return task.call();
            } finally {
                MDC.clear();
            }
        });
    }
}

AsyncConfig 中替换使用:

@Bean("asyncExecutor")
public Executor getAsyncExecutor() {
    MdcAwareExecutor executor = new MdcAwareExecutor();  // 替换这里
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(500);
    executor.setThreadNamePrefix("async-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(30);
    executor.initialize();
    return executor;
}

四、优雅关闭

Spring 容器关闭时(发版、重启),队列中还没执行完的异步任务需要等待完成,否则会造成数据不一致。

executor.setWaitForTasksToCompleteOnShutdown(true);
// 最长等待时间,超过则强制关闭(根据业务最长耗时设置)
executor.setAwaitTerminationSeconds(30);

同时确保应用能收到关闭信号(Kubernetes 场景需配置 terminationGracePeriodSeconds):

# application.yml
server:
  shutdown: graceful # Spring Boot 2.3+ 支持优雅关闭

spring:
  lifecycle:
    timeout-per-shutdown-phase: 30s

五、监控线程池状态

生产环境需要对线程池核心指标进行监控,及时发现任务堆积、线程耗尽等问题。

@Slf4j
@Component
@RequiredArgsConstructor
public class ExecutorMonitor {

    @Qualifier("asyncExecutor")
    private final Executor asyncExecutor;

    /**
     * 每分钟打印线程池状态,接入 Prometheus/Grafana 可改为上报 Gauge
     */
    @Scheduled(fixedRate = 60_000)
    public void monitor() {
        if (asyncExecutor instanceof ThreadPoolTaskExecutor taskExecutor) {
            ThreadPoolExecutor pool = taskExecutor.getThreadPoolExecutor();
            log.info("[线程池监控] name={} active={} poolSize={} coreSize={} maxSize={} queueSize={} completedTasks={}",
                    taskExecutor.getThreadNamePrefix(),
                    pool.getActiveCount(),
                    pool.getPoolSize(),
                    pool.getCorePoolSize(),
                    pool.getMaximumPoolSize(),
                    pool.getQueue().size(),
                    pool.getCompletedTaskCount()
            );
            // 队列使用率超过 80% 告警
            double queueUsage = (double) pool.getQueue().size() / (pool.getQueue().size() + pool.getQueue().remainingCapacity());
            if (queueUsage > 0.8) {
                log.warn("[线程池告警] 队列使用率={:.1f}%,请关注任务堆积", queueUsage * 100);
            }
        }
    }
}

六、典型使用场景

场景一:下单后并行触发多个后续流程

@Service
@RequiredArgsConstructor
public class OrderFacadeService {

    private final InventoryService inventoryService;
    private final NotificationService notificationService;
    private final PointService pointService;

    public OrderVO createOrder(OrderRequest request) {
        // 核心逻辑:同步执行,保证事务
        Order order = orderService.create(request);

        // 非核心后续流程:异步并行,不阻塞主流程响应
        notificationService.sendOrderConfirm(order);  // 发短信/推送
        pointService.grantPoints(order);               // 积分发放
        inventoryService.syncToWms(order);             // 同步 WMS

        return OrderVO.from(order);
    }
}

@Service
public class NotificationService {
    @Async("notifyExecutor")
    public void sendOrderConfirm(Order order) { ... }
}

场景二:异步任务编排(并行请求多个接口)

三个接口串行调用总耗时 = 三者之和;改为并行后总耗时 = max(三者耗时),性能提升显著。

@Service
@RequiredArgsConstructor
public class ProductDetailService {

    private final PriceService priceService;
    private final StockService stockService;
    private final ReviewService reviewService;

    public ProductDetailVO getDetail(Long productId) throws Exception {
        CompletableFuture<Price> priceFuture = priceService.getPriceAsync(productId);
        CompletableFuture<Stock> stockFuture = stockService.getStockAsync(productId);
        CompletableFuture<List<Review>> reviewFuture = reviewService.getReviewsAsync(productId);

        // 等待全部完成
        CompletableFuture.allOf(priceFuture, stockFuture, reviewFuture).join();

        return ProductDetailVO.builder()
                .price(priceFuture.get())
                .stock(stockFuture.get())
                .reviews(reviewFuture.get())
                .build();
    }
}

@Service
public class PriceService {
    @Async("orderExecutor")
    public CompletableFuture<Price> getPriceAsync(Long productId) {
        return CompletableFuture.completedFuture(fetchPrice(productId));
    }
}

七、生产 Checklist

检查项 说明
✅ 配置自定义线程池 禁止使用默认 SimpleAsyncTaskExecutor
✅ 按业务隔离线程池 核心业务与非核心业务分开,防止资源争抢
✅ 实现异常处理器 AsyncUncaughtExceptionHandler + 告警接入
✅ MDC TraceId 透传 装饰 Executor,保证日志链路完整
✅ 配置优雅关闭 waitForTasksToCompleteOnShutdown = true
✅ 监控线程池指标 队列深度、活跃线程数接入 Prometheus/日志
@Async 方法必须 public private/protected 方法代理失效
✅ 避免同类内部调用 this 引用绕过代理,注解失效
✅ 异步方法不叠加 @Transactional 线程切换后事务上下文丢失
本文作者:
本文链接: https://hgnulb.github.io/blog/2026/81701465
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处!