Java 虚拟线程并发最佳实践:高并发编程新范式
今天我们来聊聊 Java 虚拟线程的并发最佳实践,这是 Java 21+ 带来的革命性特性。
一、虚拟线程概述
虚拟线程(Virtual Threads)是 Java 21 引入的轻量级线程实现,它彻底改变了 Java 的并发编程模型。与传统的操作系统线程(平台线程)相比,虚拟线程具有以下特点:
- 极低的创建成本:可以轻松创建数百万个虚拟线程
- 自动管理:由 JVM 自动调度,无需手动管理线程池
- 阻塞友好:阻塞操作不会占用操作系统线程
- 兼容性好:与现有 Thread API 完全兼容
二、虚拟线程 vs 平台线程
1. 性能对比
public class ThreadComparison { public static void main(String[] args) throws InterruptedException { // 测试平台线程 long startPlatform = System.currentTimeMillis(); try (var executor = Executors.newFixedThreadPool(1000)) { IntStream.range(0, 10_000).forEach(i -> { executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return i; }); }); } long platformTime = System.currentTimeMillis() - startPlatform; System.out.println("Platform threads: " + platformTime + "ms"); // 测试虚拟线程 long startVirtual = System.currentTimeMillis(); try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { IntStream.range(0, 10_000).forEach(i -> { executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return i; }); }); } long virtualTime = System.currentTimeMillis() - startVirtual; System.out.println("Virtual threads: " + virtualTime + "ms"); } }2. 资源占用对比
| 特性 | 平台线程 | 虚拟线程 |
|---|---|---|
| 内存占用 | ~1 MB | ~几百字节 |
| 创建时间 | ~毫秒级 | ~微秒级 |
| 最大数量 | ~几千 | ~数百万 |
| 上下文切换 | 内核态 | 用户态 |
三、虚拟线程的核心用法
1. 创建虚拟线程
public class VirtualThreadCreation { // 方式 1:使用 Thread.ofVirtual() public void createVirtualThread1() { Thread virtualThread = Thread.ofVirtual() .name("my-virtual-thread") .unstarted(() -> { System.out.println("Running in virtual thread: " + Thread.currentThread()); }); virtualThread.start(); } // 方式 2:使用 Thread.startVirtualThread() public void createVirtualThread2() { Thread virtualThread = Thread.startVirtualThread(() -> { System.out.println("Running in virtual thread: " + Thread.currentThread()); }); } // 方式 3:使用 Executors.newVirtualThreadPerTaskExecutor() public void createVirtualThread3() { try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { executor.submit(() -> { System.out.println("Running in virtual thread pool"); }); } } }2. 虚拟线程与同步代码
虚拟线程特别适合处理阻塞操作:
@Service public class UserService { private final RestTemplate restTemplate; private final UserRepository userRepository; public UserService(RestTemplate restTemplate, UserRepository userRepository) { this.restTemplate = restTemplate; this.userRepository = userRepository; } // 虚拟线程下,阻塞操作不会占用 OS 线程 public User fetchUserWithDetails(String userId) { // 数据库查询 - 阻塞操作 User user = userRepository.findById(userId) .orElseThrow(() -> new UserNotFoundException(userId)); // HTTP 调用 - 阻塞操作 UserProfile profile = restTemplate.getForObject( "https://api.example.com/users/{id}/profile", UserProfile.class, userId); user.setProfile(profile); return user; } }四、最佳实践
1. 避免使用 synchronized 和 ReentrantLock
虚拟线程在阻塞时会释放载体线程,但使用synchronized或ReentrantLock时会钉住(pin)载体线程:
// 不推荐:会钉住载体线程 public synchronized void synchronizedMethod() { // 长时间操作 } // 推荐:使用 ReentrantLock 配合 try-lock private final ReentrantLock lock = new ReentrantLock(); public void betterLocking() { lock.lock(); try { // 长时间操作 } finally { lock.unlock(); } } // 更好的选择:使用 java.util.concurrent 中的并发集合 private final ConcurrentHashMap<String, User> userCache = new ConcurrentHashMap<>(); public User getUser(String userId) { return userCache.computeIfAbsent(userId, this::fetchUserFromDatabase); }2. 合理使用线程局部变量
public class ThreadLocalBestPractice { // 不推荐:ThreadLocal 在虚拟线程下会有性能问题 private static final ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); // 推荐:使用 DateTimeFormatter(线程安全) private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); // 或者使用 ScopedValue(Java 21+) private static final ScopedValue<String> requestId = ScopedValue.newInstance(); public void processRequest(String reqId) { ScopedValue.where(requestId, reqId).run(() -> { // 在作用域内使用 requestId System.out.println("Processing request: " + requestId.get()); // 调用其他方法 processData(); }); } private void processData() { // 可以直接获取 requestId System.out.println("Request ID in sub-method: " + requestId.get()); } }3. 正确处理异常
public class VirtualThreadExceptionHandling { public void handleExceptions() { try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { Future<String> future = executor.submit(() -> { if (someCondition()) { throw new BusinessException("Something went wrong"); } return "Success"; }); try { String result = future.get(); System.out.println("Result: " + result); } catch (ExecutionException e) { // 处理业务异常 Throwable cause = e.getCause(); if (cause instanceof BusinessException) { System.err.println("Business error: " + cause.getMessage()); } else { System.err.println("Unexpected error: " + cause.getMessage()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("Operation interrupted"); } } } }4. 资源管理
public class VirtualThreadResourceManagement { // 使用 try-with-resources 确保资源释放 public void processWithResources() { try (var executor = Executors.newVirtualThreadPerTaskExecutor(); var connection = dataSource.getConnection()) { List<Future<Result>> futures = new ArrayList<>(); for (Query query : queries) { Future<Result> future = executor.submit(() -> { try (var statement = connection.prepareStatement(query.sql())) { // 执行查询 return executeQuery(statement); } }); futures.add(future); } // 收集结果 List<Result> results = new ArrayList<>(); for (Future<Result> future : futures) { results.add(future.get()); } } catch (Exception e) { throw new ProcessingException("Failed to process queries", e); } } }五、Spring Boot 中的虚拟线程
1. 启用虚拟线程
spring: threads: virtual: enabled: true2. 自定义虚拟线程执行器
@Configuration public class VirtualThreadConfig { @Bean(name = "virtualThreadExecutor") public Executor virtualThreadExecutor() { ThreadFactory factory = Thread.ofVirtual() .name("virtual-thread-", 0) .factory(); return Executors.newThreadPerTaskExecutor(factory); } @Bean public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() { return protocolHandler -> { protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); }; } }3. 异步方法使用虚拟线程
@Service public class AsyncUserService { @Async("virtualThreadExecutor") public CompletableFuture<User> fetchUserAsync(String userId) { // 这个方法会在虚拟线程中执行 User user = userRepository.findById(userId) .orElseThrow(() -> new UserNotFoundException(userId)); return CompletableFuture.completedFuture(user); } @Async("virtualThreadExecutor") public CompletableFuture<List<Order>> fetchUserOrdersAsync(String userId) { List<Order> orders = orderRepository.findByUserId(userId); return CompletableFuture.completedFuture(orders); } }六、实践案例:高并发 Web 服务
场景描述
构建一个高并发的用户服务,需要处理大量并发请求。
实现方案
@SpringBootApplication public class HighConcurrencyApplication { public static void main(String[] args) { SpringApplication.run(HighConcurrencyApplication.class, args); } } @RestController @RequestMapping("/api/users") public class UserController { @Autowired private UserService userService; @GetMapping("/{userId}") public User getUser(@PathVariable String userId) { return userService.getUser(userId); } @GetMapping("/{userId}/details") public UserDetails getUserDetails(@PathVariable String userId) { return userService.getUserDetails(userId); } } @Service public class UserService { @Autowired private UserRepository userRepository; @Autowired private OrderServiceClient orderServiceClient; @Autowired private InventoryServiceClient inventoryServiceClient; public User getUser(String userId) { return userRepository.findById(userId) .orElseThrow(() -> new UserNotFoundException(userId)); } public UserDetails getUserDetails(String userId) { // 并行获取用户信息、订单信息和库存信息 try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<User> userFuture = scope.fork(() -> getUser(userId)); Future<List<Order>> ordersFuture = scope.fork(() -> orderServiceClient.getUserOrders(userId)); Future<InventoryStatus> inventoryFuture = scope.fork(() -> inventoryServiceClient.getUserInventory(userId)); scope.join(); scope.throwIfFailed(); return new UserDetails( userFuture.resultNow(), ordersFuture.resultNow(), inventoryFuture.resultNow() ); } catch (Exception e) { throw new UserServiceException("Failed to get user details", e); } } }性能测试
使用虚拟线程后,系统可以轻松处理数万个并发连接,而内存占用保持在较低水平。
七、常见陷阱与避免方法
1. 避免在虚拟线程中使用 ThreadLocal
// 不推荐 private static final ThreadLocal<RequestContext> context = new ThreadLocal<>(); // 推荐:使用 ScopedValue private static final ScopedValue<RequestContext> context = ScopedValue.newInstance();2. 避免长时间占用锁
// 不推荐 public synchronized void longOperation() { // 长时间操作会钉住载体线程 Thread.sleep(Duration.ofMinutes(5)); } // 推荐:使用非阻塞方式 public void betterLongOperation() { // 分段处理,避免长时间持有锁 processInChunks(); }3. 注意数据库连接池配置
spring: datasource: hikari: # 虚拟线程下可以配置更大的连接池 maximum-pool-size: 100 minimum-idle: 20 connection-timeout: 30000 idle-timeout: 600000 max-lifetime: 1800000八、总结与建议
虚拟线程是 Java 并发编程的重大突破,它让高并发编程变得更加简单。以下是一些关键建议:
- 拥抱虚拟线程:在 IO 密集型应用中优先使用虚拟线程
- 避免钉住线程:避免使用 synchronized 和 ThreadLocal
- 合理配置资源:根据虚拟线程特性调整连接池等配置
- 充分测试:确保应用在虚拟线程下正常工作
这其实可以更优雅一点,通过结合结构化并发和虚拟线程,我们可以构建出既高效又易维护的并发程序。
希望这篇文章能帮助你掌握 Java 虚拟线程的并发最佳实践。欢迎在评论区分享你的使用经验!