最近在学习reactor,在学习背压时,看到有onBackpressureBuffer方法,但是配置了 buffer后好像不起效。代码如下,给定了一个生产速度远大于消费速度的情况,且buffer值较小,预期是抛出异常,但是却没有,请问是哪里的使用姿势不对吗?
@Test
public void test_onBackpressureBuffer() throws InterruptedException {
Flux.interval(Duration.of(10, ChronoUnit.MILLIS)) //每毫秒产生10个数
.onBackpressureBuffer(10) //设置buffer为10
.subscribe( //每秒消费一个数
i -> {
System.out.println(Thread.currentThread().getName() + " "+i);
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e.getMessage());
}
},
System.out::println
);
TimeUnit.MINUTES.sleep(10);
}
类似的逻辑,使用RxJava2,则会正常爆出异常:
Observable.interval(1, TimeUnit.MILLISECONDS)
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(256)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) {
}
});
TimeUnit.SECONDS.sleep(1);
异常信息如下:
0
1
2
io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
在Spring Cloud WebFlux中,ReactorBackpressureBuffer
是一个用于处理微服务间阻塞连接的工具。然而,你提到的onBackpressureBuffer
方法并没有在ReactorBackpressureBuffer
接口中定义。
这个方法通常是在生产环境中使用的,用于处理服务器的负载。如果你没有使用生产环境,你可能需要在你的微服务之间添加一个异步处理逻辑,然后在你的服务内部调用onBackpressureBuffer
方法。
如果你使用的是Spring Cloud WebFlux,你可以参考Spring Cloud WebFlux的官方文档,找到相关的实现。或者,你也可以查阅Spring Cloud的相关项目,例如Spring Cloud Netflix Ribbon
、Spring Cloud Config
等,它们的实现中可能包含了onBackpressureBuffer
方法。