1. 响应式编程与Reactor Core初探
第一次接触响应式编程时,我盯着满屏的"Flux"和"Mono"发懵,这感觉就像突然被扔进了外语课堂。但当我真正理解Reactor Core后,发现它其实是处理异步数据流的瑞士军刀。简单来说,响应式编程就是用声明式的方式处理数据流,而Reactor Core就是Java生态中最成熟的实现工具之一。
为什么选择3.7.2版本?这个版本在性能优化和API稳定性上达到了很好的平衡。我去年在电商秒杀系统里就用这个版本处理过每秒10万+的订单事件,全程CPU占用率不到40%。相比传统线程池方案,资源消耗直接降了60%。
2. 项目配置:5分钟快速集成
2.1 Maven配置实战
在pom.xml里添加依赖时,建议把BOM(物料清单)也加上,这样可以避免版本冲突。这是我常用的配置模板:
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.7.2</version> </dependency> </dependencies>注意那个2023.0.2的BOM版本,它专门为3.7.2做了兼容性测试。有次我忘记加BOM,结果Sinks的API居然报NoSuchMethodError,排查了整整一下午。
2.2 Gradle极简配置
用Gradle的话更简单,Kotlin DSL的写法是这样的:
dependencies { implementation(platform("io.projectreactor:reactor-bom:2023.0.2")) implementation("io.projectreactor:reactor-core:3.7.2") }3. 核心API实战手册
3.1 Flux与Mono的七十二变
Flux就像个会变魔术的水管:可以装0-N个数据元素。创建方式至少有十几种,我最常用的是这些:
// 从集合创建 Flux<String> fromList = Flux.fromIterable(Arrays.asList("A", "B", "C")); // 动态生成(类似for循环) Flux<Integer> range = Flux.range(1, 10); // 定时发射(做心跳检测超好用) Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 错误处理示例 Flux<String> withError = Flux.error(new RuntimeException("故意的"));Mono则是单元素容器,特别适合返回单个结果的场景。比如从Redis查用户数据:
Mono<User> getUser(String id) { return Mono.fromCallable(() -> redisTemplate.opsForValue().get(id)); }3.2 Sinks的防坑指南
Sinks是3.7.2新增的API,用来替代旧的Processor。但新手容易踩两个坑:
- 忘记设置背压策略:
// 正确写法 Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(); // 错误写法(可能内存溢出) Sinks.Many<String> dangerSink = Sinks.many().replay().all();- 多线程安全问题:
// 线程安全写法 Sinks.One<Result> resultSink = Sinks.one(); // 在异步回调中 executorService.submit(() -> { resultSink.tryEmitValue(computeResult()); });4. 调度器性能调优
4.1 四种调度器对比
通过表格直观对比不同调度器特性:
| 调度器类型 | 适用场景 | 线程行为 | 资源消耗 |
|---|---|---|---|
| Schedulers.immediate() | 快速同步执行 | 当前线程 | 最低 |
| Schedulers.single() | 顺序任务队列 | 单线程池 | 低 |
| Schedulers.parallel() | CPU密集型计算 | 固定大小线程池 | 中 |
| Schedulers.elastic() | IO阻塞操作 | 无界弹性线程池 | 高 |
4.2 实战性能优化
在物流跟踪系统里,我发现这样的组合效果最好:
Flux.fromIterable(packageIds) .parallel(10) // 并行度=CPU核心数*2 .runOn(Schedulers.parallel()) .flatMap(id -> queryShippingInfo(id).subscribeOn(Schedulers.boundedElastic())) .sequential();关键点在于:
- CPU密集型用parallel()
- IO密集型用boundedElastic()(比elastic()更安全)
- 通过subscribeOn切换上下文
5. 错误处理与重试机制
5.1 错误处理三板斧
- 立即恢复:
flux.onErrorReturn("defaultValue");- 优雅降级:
mono.onErrorResume(e -> { log.error("查询失败", e); return getFromCache(); });- 重试策略(带指数退避):
flux.retryWhen(Retry.backoff(3, Duration.ofMillis(100)));5.2 自定义重试逻辑
有次对接第三方支付接口,需要根据错误码决定是否重试。我的实现方案:
Retry customRetry = Retry.from(companion -> companion .handle((retrySignal, sink) -> { if (retrySignal.failure() instanceof PaymentException) { PaymentException e = (PaymentException) retrySignal.failure(); if (e.getCode() == 5001) { // 可重试错误码 sink.next(retrySignal.totalRetries() + 1); } else { sink.error(e); } } else { sink.error(retrySignal.failure()); } }) .withBackoff(Duration.ofMillis(200), Duration.ofSeconds(5)));