账号密码登录
微信安全登录
微信扫描二维码登录

登录后绑定QQ、微信即可实现信息互通

手机验证码登录
找回密码返回
邮箱找回手机找回
注册账号返回
其他登录方式
分享
  • 收藏
    X
    Webflux 中,Reactor背压onBackpressureBuffery不起作用
    35
    0

    最近在学习reactor,在学习背压时,看到有onBackpressureBuffer方法,但是配置了 buffer后好像不起效。代码如下,给定了一个生产速度远大于消费速度的情况,且buffer值较小,预期是抛出异常,但是却没有,请问是哪里的使用姿势不对吗?

        @Test
        public void test_onBackpressureBuffer() throws InterruptedException {
    
            Flux.interval(Duration.of(10, ChronoUnit.MILLIS))  //每毫秒产生10个数
                .onBackpressureBuffer(10)                              //设置buffer为10
                .subscribe(                                            //每秒消费一个数 
                        i -> {
                            System.out.println(Thread.currentThread().getName() + " "+i);
                            try {
                               Thread.sleep(1000);
                            } catch (Exception e) {
                                System.out.println(e.getMessage());
                            }
                        },
                        System.out::println
                );
            TimeUnit.MINUTES.sleep(10);
        }

    类似的逻辑,使用RxJava2,则会正常爆出异常:

     Observable.interval(1, TimeUnit.MILLISECONDS)
                    .toFlowable(BackpressureStrategy.BUFFER)
                    .onBackpressureBuffer(256)
                    .observeOn(Schedulers.newThread())
                    .subscribe(
                            i -> {
                                System.out.println(i);
                                try {
                                    Thread.sleep(100);
                                } catch (Exception e) {
                                }
                            });
            TimeUnit.SECONDS.sleep(1);

    异常信息如下:

    0
    1
    2
    io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
        at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
        at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
        at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    0
    打赏
    收藏
    点击回答
        全部回答
    • 0
    • 嗜血超神 普通会员 1楼

      在Spring Cloud WebFlux中,ReactorBackpressureBuffer是一个用于处理微服务间阻塞连接的工具。然而,你提到的onBackpressureBuffer方法并没有在ReactorBackpressureBuffer接口中定义。

      这个方法通常是在生产环境中使用的,用于处理服务器的负载。如果你没有使用生产环境,你可能需要在你的微服务之间添加一个异步处理逻辑,然后在你的服务内部调用onBackpressureBuffer方法。

      如果你使用的是Spring Cloud WebFlux,你可以参考Spring Cloud WebFlux的官方文档,找到相关的实现。或者,你也可以查阅Spring Cloud的相关项目,例如Spring Cloud Netflix RibbonSpring Cloud Config等,它们的实现中可能包含了onBackpressureBuffer方法。

    更多回答
    扫一扫访问手机版