news 2026/5/14 8:43:26

Java 并发编程模式:构建高效的并发系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java 并发编程模式:构建高效的并发系统

Java 并发编程模式:构建高效的并发系统

一、引言

在现代软件开发中,并发编程已经成为必备技能。Java 提供了丰富的并发工具和 API,帮助开发者构建高效、可靠的并发系统。掌握并发编程模式能够帮助我们更好地利用多核处理器的性能,提升应用的吞吐量和响应速度。

本文将深入探讨 Java 中常用的并发编程模式,包括线程池、生产者-消费者、读写锁、信号量、屏障等,并结合实际案例展示如何在项目中应用这些模式。

二、线程池模式

2.1 线程池基础

public class ThreadPoolExample { private static final ExecutorService executor = Executors.newFixedThreadPool(10); public static void main(String[] args) { for (int i = 0; i < 100; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName()); }); } executor.shutdown(); } }

2.2 自定义线程池配置

@Configuration public class ThreadPoolConfig { @Bean("taskExecutor") public ExecutorService taskExecutor() { ThreadFactory threadFactory = new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("task-executor-" + counter.incrementAndGet()); thread.setDaemon(false); return thread; } }; return new ThreadPoolExecutor( 10, // corePoolSize 20, // maximumPoolSize 60L, // keepAliveTime TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy() ); } }

2.3 线程池监控

public class ThreadPoolMonitor { private final ThreadPoolExecutor executor; public ThreadPoolMonitor(ThreadPoolExecutor executor) { this.executor = executor; } public Map<String, Object> getPoolStats() { Map<String, Object> stats = new HashMap<>(); stats.put("poolSize", executor.getPoolSize()); stats.put("activeCount", executor.getActiveCount()); stats.put("queueSize", executor.getQueue().size()); stats.put("completedTaskCount", executor.getCompletedTaskCount()); stats.put("largestPoolSize", executor.getLargestPoolSize()); return stats; } }

三、生产者-消费者模式

3.1 使用 BlockingQueue

public class ProducerConsumerPattern { private static final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); static class Producer implements Runnable { @Override public void run() { try { for (int i = 0; i < 20; i++) { String message = "Message " + i; queue.put(message); System.out.println("Produced: " + message); Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } static class Consumer implements Runnable { @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { String message = queue.take(); System.out.println("Consumed: " + message); Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Producer()); executor.submit(new Consumer()); executor.submit(new Consumer()); executor.shutdown(); } }

3.2 带信号量的生产者-消费者

public class BoundedProducerConsumer { private final Semaphore emptySlots; private final Semaphore filledSlots; private final Object[] buffer; private int putIndex, takeIndex; public BoundedProducerConsumer(int capacity) { this.buffer = new Object[capacity]; this.emptySlots = new Semaphore(capacity); this.filledSlots = new Semaphore(0); this.putIndex = 0; this.takeIndex = 0; } public void put(Object item) throws InterruptedException { emptySlots.acquire(); synchronized (this) { buffer[putIndex] = item; putIndex = (putIndex + 1) % buffer.length; } filledSlots.release(); } public Object take() throws InterruptedException { filledSlots.acquire(); Object item; synchronized (this) { item = buffer[takeIndex]; takeIndex = (takeIndex + 1) % buffer.length; } emptySlots.release(); return item; } }

四、读写锁模式

4.1 ReadWriteLock 使用

public class Cache<K, V> { private final Map<K, V> cache = new HashMap<>(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); public V get(K key) { rwLock.readLock().lock(); try { return cache.get(key); } finally { rwLock.readLock().unlock(); } } public void put(K key, V value) { rwLock.writeLock().lock(); try { cache.put(key, value); } finally { rwLock.writeLock().unlock(); } } public void remove(K key) { rwLock.writeLock().lock(); try { cache.remove(key); } finally { rwLock.writeLock().unlock(); } } }

4.2 读写锁优化

public class OptimizedCache<K, V> { private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private static final int THRESHOLD = 1000; public V get(K key) { V value = cache.get(key); if (value != null) { return value; } rwLock.readLock().lock(); try { value = cache.get(key); if (value != null) { return value; } rwLock.readLock().unlock(); rwLock.writeLock().lock(); try { value = cache.get(key); if (value == null) { value = loadFromDatabase(key); cache.put(key, value); } rwLock.readLock().lock(); } finally { rwLock.writeLock().unlock(); } return value; } finally { rwLock.readLock().unlock(); } } private V loadFromDatabase(K key) { return null; } }

五、信号量模式

5.1 资源限流

public class ResourceLimiter { private final Semaphore semaphore; public ResourceLimiter(int maxConcurrent) { this.semaphore = new Semaphore(maxConcurrent); } public void acquire() throws InterruptedException { semaphore.acquire(); } public void release() { semaphore.release(); } public int availablePermits() { return semaphore.availablePermits(); } } // 使用示例 public class ApiRateLimiter { private final ResourceLimiter limiter = new ResourceLimiter(100); public void processRequest(Runnable task) throws InterruptedException { limiter.acquire(); try { task.run(); } finally { limiter.release(); } } }

5.2 多资源协调

public class MultiResourceCoordinator { private final Semaphore databaseSemaphore = new Semaphore(10); private final Semaphore networkSemaphore = new Semaphore(20); public void executeTask(Runnable task) throws InterruptedException { databaseSemaphore.acquire(); networkSemaphore.acquire(); try { task.run(); } finally { networkSemaphore.release(); databaseSemaphore.release(); } } }

六、屏障模式

6.1 CyclicBarrier

public class DataProcessingPipeline { private final CyclicBarrier barrier; private final List<DataProcessor> processors; public DataProcessingPipeline(List<DataProcessor> processors) { this.processors = processors; this.barrier = new CyclicBarrier(processors.size(), this::aggregateResults); } public void process(Data data) { for (DataProcessor processor : processors) { executor.submit(() -> { processor.process(data); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } }); } } private void aggregateResults() { System.out.println("All processors completed, aggregating results..."); } interface DataProcessor { void process(Data data); } }

6.2 CountDownLatch

public class TaskCoordinator { public void executeTasks(List<Runnable> tasks) throws InterruptedException { CountDownLatch latch = new CountDownLatch(tasks.size()); for (Runnable task : tasks) { executor.submit(() -> { try { task.run(); } finally { latch.countDown(); } }); } latch.await(); System.out.println("All tasks completed"); } }

七、原子操作模式

7.1 AtomicReference

public class NonBlockingStack<T> { private final AtomicReference<Node<T>> top = new AtomicReference<>(); public void push(T value) { Node<T> newNode = new Node<>(value); Node<T> oldTop; do { oldTop = top.get(); newNode.next = oldTop; } while (!top.compareAndSet(oldTop, newNode)); } public T pop() { Node<T> oldTop; Node<T> newTop; do { oldTop = top.get(); if (oldTop == null) { return null; } newTop = oldTop.next; } while (!top.compareAndSet(oldTop, newTop)); return oldTop.value; } private static class Node<T> { final T value; Node<T> next; Node(T value) { this.value = value; } } }

7.2 AtomicInteger 计数器

public class ConcurrentCounter { private final AtomicInteger count = new AtomicInteger(0); public int increment() { return count.incrementAndGet(); } public int decrement() { return count.decrementAndGet(); } public int get() { return count.get(); } public int add(int delta) { return count.addAndGet(delta); } }

八、CompletableFuture 异步模式

8.1 异步任务组合

public class AsyncService { public CompletableFuture<User> fetchUser(Long userId) { return CompletableFuture.supplyAsync(() -> userRepository.findById(userId)); } public CompletableFuture<Order> fetchOrder(Long orderId) { return CompletableFuture.supplyAsync(() -> orderRepository.findById(orderId)); } public CompletableFuture<UserOrderSummary> getUserOrderSummary(Long userId) { return fetchUser(userId) .thenCompose(user -> fetchOrder(user.getLatestOrderId()) .thenApply(order -> new UserOrderSummary(user, order))); } public CompletableFuture<List<User>> fetchUsers(List<Long> userIds) { List<CompletableFuture<User>> futures = userIds.stream() .map(this::fetchUser) .collect(Collectors.toList()); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); } }

8.2 异常处理

public class AsyncExceptionHandling { public CompletableFuture<String> processAsync() { return CompletableFuture.supplyAsync(this::computeValue) .thenApply(this::transformValue) .exceptionally(this::handleException) .thenApply(this::finalizeResult); } private String computeValue() { if (Math.random() > 0.5) { throw new RuntimeException("Computation failed"); } return "success"; } private String transformValue(String value) { return "transformed-" + value; } private String handleException(Throwable e) { log.error("Error occurred", e); return "fallback-value"; } private String finalizeResult(String value) { return "final-" + value; } }

九、并发集合模式

9.1 ConcurrentHashMap

public class ConcurrentCache<K, V> { private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>(); private final Function<K, V> loader; public ConcurrentCache(Function<K, V> loader) { this.loader = loader; } public V get(K key) { return cache.computeIfAbsent(key, loader); } public void invalidate(K key) { cache.remove(key); } public void invalidateAll() { cache.clear(); } }

9.2 CopyOnWriteArrayList

public class ThreadSafeList<E> { private final CopyOnWriteArrayList<E> list = new CopyOnWriteArrayList<>(); public boolean add(E element) { return list.add(element); } public boolean remove(Object o) { return list.remove(o); } public E get(int index) { return list.get(index); } public void forEach(Consumer<? super E> action) { list.forEach(action); } public int size() { return list.size(); } }

十、实战案例:并发订单处理系统

@Service public class OrderProcessingService { private final ExecutorService executorService; private final AtomicInteger processingCount = new AtomicInteger(0); private final Semaphore semaphore = new Semaphore(50); public OrderProcessingService(@Qualifier("taskExecutor") ExecutorService executorService) { this.executorService = executorService; } public CompletableFuture<Order> processOrderAsync(OrderRequest request) { return CompletableFuture.supplyAsync(() -> { try { semaphore.acquire(); processingCount.incrementAndGet(); Order order = validateOrder(request); order = createOrder(order); order = processPayment(order); order = sendNotification(order); return order; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Order processing interrupted", e); } finally { semaphore.release(); processingCount.decrementAndGet(); } }, executorService); } private Order validateOrder(OrderRequest request) { Order order = new Order(); order.setUserId(request.getUserId()); order.setTotalAmount(request.getTotalAmount()); return order; } private Order createOrder(Order order) { return orderRepository.save(order); } private Order processPayment(Order order) { paymentService.process(order.getId(), order.getTotalAmount()); order.setStatus("PAID"); return order; } private Order sendNotification(Order order) { notificationService.sendOrderConfirmation(order); order.setNotificationSent(true); return order; } }

十一、并发编程最佳实践

11.1 线程安全原则

// 1. 最小化共享状态 // 2. 使用线程安全的数据结构 // 3. 避免不必要的同步 // 4. 使用原子操作替代同步块 // 5. 合理使用线程池

11.2 死锁避免

// 1. 按顺序获取锁 public void transferMoney(Account from, Account to, double amount) { Account first = from.getId() < to.getId() ? from : to; Account second = from.getId() < to.getId() ? to : from; synchronized (first) { synchronized (second) { from.withdraw(amount); to.deposit(amount); } } }

11.3 性能优化

// 1. 使用无锁数据结构 // 2. 减少锁的粒度 // 3. 使用读写锁分离读操作 // 4. 批量操作减少同步次数

十二、总结

本文详细介绍了 Java 中常用的并发编程模式:

  1. 线程池模式:管理线程生命周期,提高资源利用率
  2. 生产者-消费者模式:解耦生产和消费逻辑
  3. 读写锁模式:优化读多写少场景的性能
  4. 信号量模式:控制并发访问数量
  5. 屏障模式:协调多个任务同步执行
  6. 原子操作模式:无锁并发编程
  7. CompletableFuture:异步任务组合与编排
  8. 并发集合:线程安全的数据结构

通过合理应用这些模式,可以构建高效、可靠的并发系统。在实际项目中,需要根据业务场景选择合适的并发策略,平衡性能和复杂度。

掌握并发编程模式不仅能提升代码质量,还能帮助开发者更好地理解 Java 并发库的设计思想,从而在面对复杂并发问题时能够灵活应对。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/14 8:43:00

ARM系统寄存器架构与核心功能组解析

1. ARM系统寄存器架构概述作为ARM处理器架构的核心控制单元&#xff0c;系统寄存器承担着处理器状态管理、内存控制、异常处理等关键功能。与x86架构不同&#xff0c;ARM采用精简的寄存器设计理念&#xff0c;通过功能分组实现高效管理。在ARMv8/v9架构中&#xff0c;系统寄存器…

作者头像 李华
网站建设 2026/5/14 8:34:04

3分钟告别文献混乱:Zotero Duplicates Merger新手实战手册

3分钟告别文献混乱&#xff1a;Zotero Duplicates Merger新手实战手册 【免费下载链接】ZoteroDuplicatesMerger A zotero plugin to automatically merge duplicate items 项目地址: https://gitcode.com/gh_mirrors/zo/ZoteroDuplicatesMerger 你是否曾经在深夜整理参…

作者头像 李华
网站建设 2026/5/14 8:32:29

第70篇:Vibe Coding时代:AI Coding 平台运维手册,解决 Agent 上线后故障排查没有 SOP 的问题

第70篇:Vibe Coding时代:AI Coding 平台运维手册,解决 Agent 上线后故障排查没有 SOP 的问题 一、问题场景:Agent 平台上线了,但一出问题大家都不知道怎么查 当 AI Coding Agent 进入团队使用后,常见故障会越来越多: 1. 用户说任务一直不动 2. 任务状态卡在 RUNNING 3…

作者头像 李华
网站建设 2026/5/14 8:23:05

FakeLocation:你的手机位置自由指南,3个场景让位置掌控更简单

FakeLocation&#xff1a;你的手机位置自由指南&#xff0c;3个场景让位置掌控更简单 【免费下载链接】FakeLocation Xposed module to mock locations per app. 项目地址: https://gitcode.com/gh_mirrors/fak/FakeLocation 还在为社交软件的位置限制烦恼吗&#xff1f…

作者头像 李华
网站建设 2026/5/14 8:20:32

创业团队如何利用Taotoken管理多模型成本与用量

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 创业团队如何利用Taotoken管理多模型成本与用量 对于资源有限的创业技术团队而言&#xff0c;在拥抱大模型能力的同时&#xff0c;…

作者头像 李华