SpringBoot @Async 异步注解最佳使用实践
SpringBoot @Async 异步注解最佳使用实践
一、线程池配置
生产环境绝对不能使用
@Async默认的SimpleAsyncTaskExecutor,它每次调用都新建线程,无上限,高并发下直接 OOM。
标准配置模板:
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("Async method threw exception. method: {}, params: {}, exception: {}",
method.getName(),
paramsToString(params),
ex.getMessage(),
ex
);
}
private String paramsToString(Object[] params) {
if (params == null || params.length == 0) {
return "[]";
}
try {
return Arrays.toString(params);
} catch (Exception e) {
return "[unprintable params]";
}
}
}
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
/**
* 默认异步线程池,所有不指定线程池的 @Async 都走这里
*/
@Override
@Bean("asyncExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:建议设置为 CPU 核心数(IO密集型可适当放大,如 2N)
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// 最大线程数
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 队列容量:防止任务堆积,设置合理上限
executor.setQueueCapacity(500);
// 线程名前缀:方便日志追踪
executor.setThreadNamePrefix("async-default-");
// 拒绝策略:CallerRunsPolicy 让调用方线程执行,起到限速作用
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 优雅关闭:等待队列中的任务执行完再关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
/**
* 异步任务异常处理器
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
按业务隔离线程池:
核心业务与非核心业务必须使用不同线程池,防止发短信、写日志等非核心任务把核心业务线程全部占满。
@Configuration
public class ExecutorConfig {
/** 订单核心业务线程池 */
@Bean("orderExecutor")
public Executor orderExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("order-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/** 通知/消息推送线程池(非核心,允许丢弃) */
@Bean("notifyExecutor")
public Executor notifyExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("notify-async-");
// 非核心任务满载直接丢弃,不影响主流程
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}
使用时指定线程池:
@Async("orderExecutor")
public void processOrder(Order order) { ... }
@Async("notifyExecutor")
public void sendSms(String phone, String content) { ... }
二、异常处理
@Async方法在独立线程中运行,主线程感知不到异常。必须显式处理,否则异常会被静默吞掉。
2.1 void 方法:实现全局异常处理器
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("[Async异常] method={}, params={}, error={}",
method.getName(), Arrays.toString(params), ex.getMessage(), ex);
// 可接入告警:钉钉、飞书、Sentry 等
alertService.sendAlert("异步任务异常: " + method.getName(), ex);
}
}
2.2 有返回值:通过 CompletableFuture 捕获
@Async("orderExecutor")
public CompletableFuture<OrderResult> processOrderAsync(Order order) {
try {
OrderResult result = orderService.process(order);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
log.error("[订单异步处理失败] orderId={}", order.getId(), e);
// 返回异常的 Future,调用方通过 exceptionally 处理
return CompletableFuture.failedFuture(e);
}
}
调用方处理:
orderService.processOrderAsync(order)
.thenAccept(result -> log.info("订单处理完成: {}", result))
.exceptionally(ex -> {
log.error("订单处理失败,触发补偿逻辑", ex);
compensationService.compensate(order);
return null;
});
三、TraceId 透传(生产必备)
异步线程是新线程,MDC 中的 TraceId 不会自动传递,日志会断链,无法串联一次请求的完整调用链。
方案:装饰 ThreadPoolTaskExecutor,提交任务时捕获并传递 MDC。
public class MdcAwareExecutor extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable task) {
// 提交任务时,在当前线程(主线程)捕获 MDC 快照
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
super.execute(() -> {
// 在异步线程中恢复 MDC
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try {
task.run();
} finally {
MDC.clear(); // 线程池线程复用,必须清理,防止污染下一个任务
}
});
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
return super.submit(() -> {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try {
return task.call();
} finally {
MDC.clear();
}
});
}
}
在
AsyncConfig中替换使用:
@Bean("asyncExecutor")
public Executor getAsyncExecutor() {
MdcAwareExecutor executor = new MdcAwareExecutor(); // 替换这里
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
四、优雅关闭
Spring 容器关闭时(发版、重启),队列中还没执行完的异步任务需要等待完成,否则会造成数据不一致。
executor.setWaitForTasksToCompleteOnShutdown(true);
// 最长等待时间,超过则强制关闭(根据业务最长耗时设置)
executor.setAwaitTerminationSeconds(30);
同时确保应用能收到关闭信号(Kubernetes 场景需配置
terminationGracePeriodSeconds):
# application.yml
server:
shutdown: graceful # Spring Boot 2.3+ 支持优雅关闭
spring:
lifecycle:
timeout-per-shutdown-phase: 30s
五、监控线程池状态
生产环境需要对线程池核心指标进行监控,及时发现任务堆积、线程耗尽等问题。
@Slf4j
@Component
@RequiredArgsConstructor
public class ExecutorMonitor {
@Qualifier("asyncExecutor")
private final Executor asyncExecutor;
/**
* 每分钟打印线程池状态,接入 Prometheus/Grafana 可改为上报 Gauge
*/
@Scheduled(fixedRate = 60_000)
public void monitor() {
if (asyncExecutor instanceof ThreadPoolTaskExecutor taskExecutor) {
ThreadPoolExecutor pool = taskExecutor.getThreadPoolExecutor();
log.info("[线程池监控] name={} active={} poolSize={} coreSize={} maxSize={} queueSize={} completedTasks={}",
taskExecutor.getThreadNamePrefix(),
pool.getActiveCount(),
pool.getPoolSize(),
pool.getCorePoolSize(),
pool.getMaximumPoolSize(),
pool.getQueue().size(),
pool.getCompletedTaskCount()
);
// 队列使用率超过 80% 告警
double queueUsage = (double) pool.getQueue().size() / (pool.getQueue().size() + pool.getQueue().remainingCapacity());
if (queueUsage > 0.8) {
log.warn("[线程池告警] 队列使用率={:.1f}%,请关注任务堆积", queueUsage * 100);
}
}
}
}
六、典型使用场景
场景一:下单后并行触发多个后续流程
@Service
@RequiredArgsConstructor
public class OrderFacadeService {
private final InventoryService inventoryService;
private final NotificationService notificationService;
private final PointService pointService;
public OrderVO createOrder(OrderRequest request) {
// 核心逻辑:同步执行,保证事务
Order order = orderService.create(request);
// 非核心后续流程:异步并行,不阻塞主流程响应
notificationService.sendOrderConfirm(order); // 发短信/推送
pointService.grantPoints(order); // 积分发放
inventoryService.syncToWms(order); // 同步 WMS
return OrderVO.from(order);
}
}
@Service
public class NotificationService {
@Async("notifyExecutor")
public void sendOrderConfirm(Order order) { ... }
}
场景二:异步任务编排(并行请求多个接口)
三个接口串行调用总耗时 = 三者之和;改为并行后总耗时 = max(三者耗时),性能提升显著。
@Service
@RequiredArgsConstructor
public class ProductDetailService {
private final PriceService priceService;
private final StockService stockService;
private final ReviewService reviewService;
public ProductDetailVO getDetail(Long productId) throws Exception {
CompletableFuture<Price> priceFuture = priceService.getPriceAsync(productId);
CompletableFuture<Stock> stockFuture = stockService.getStockAsync(productId);
CompletableFuture<List<Review>> reviewFuture = reviewService.getReviewsAsync(productId);
// 等待全部完成
CompletableFuture.allOf(priceFuture, stockFuture, reviewFuture).join();
return ProductDetailVO.builder()
.price(priceFuture.get())
.stock(stockFuture.get())
.reviews(reviewFuture.get())
.build();
}
}
@Service
public class PriceService {
@Async("orderExecutor")
public CompletableFuture<Price> getPriceAsync(Long productId) {
return CompletableFuture.completedFuture(fetchPrice(productId));
}
}
七、生产 Checklist
| 检查项 | 说明 |
|---|---|
| ✅ 配置自定义线程池 | 禁止使用默认 SimpleAsyncTaskExecutor
|
| ✅ 按业务隔离线程池 | 核心业务与非核心业务分开,防止资源争抢 |
| ✅ 实现异常处理器 |
AsyncUncaughtExceptionHandler + 告警接入 |
| ✅ MDC TraceId 透传 | 装饰 Executor,保证日志链路完整 |
| ✅ 配置优雅关闭 | waitForTasksToCompleteOnShutdown = true |
| ✅ 监控线程池指标 | 队列深度、活跃线程数接入 Prometheus/日志 |
✅ @Async 方法必须 public |
private/protected 方法代理失效 |
| ✅ 避免同类内部调用 | 走 this 引用绕过代理,注解失效 |
✅ 异步方法不叠加 @Transactional
|
线程切换后事务上下文丢失 |
本博客所有文章除特别声明外,均采用
CC BY-NC-SA 4.0
许可协议,转载请注明出处!