RxJava:使用基于Single的累加器进行的扫描,该扫描器处理中间的Single - java

我正在写一个基于Rx的单向UI流,其中每个状态的减少都是一个Single。通常,此类流程是使用scan完成的(它们需要先前的状态),但是当涉及到Single时,会有些棘手。我设法使它像这样工作:

val events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducer = { currentState: List<String>, event: String ->
  Single.fromCallable { /* do work */ currentState.plus(event) }
}

events
  .scan(
    initialState,
    { currentStateSingle, event ->
      val nextStateSingle = currentStateSingle
        .flatMap { curState -> reducer(curState, event) }
      // cache is required to avoid resubscription 
      // to all previously emitted single's on each new scan iteration
      nextStateSingle.cache()
    }
  )
  .flatMapSingle { it }
  .subscribe { state -> println("state updated to $state") }

让我感到困扰的是,每个事件(在UI环境中可能有很多事件)将创建一个nextStateSingle.cache()并永远将其添加到现有链中,并且曾经发出的所有这些Single都将保留在那里,无休止地消耗内存,并且在之后不会被处置他们发出了一个新状态,根本不需要它们。

我一直在思考如何使用某种switchMap用法,甚至使用一些外部原子变量来保持状态(而不是扫描),但是我找不到方法。

我看到的唯一其他选择是编写一个定制操作符,该操作符将订阅内部Single,等待结果然后处理它,但是我想避免编写定制操作符。

参考方案

根据缓存的文档,由于您无法处理原点,也无法清除缓存的值,因此可以使用此变通办法,该方法可以控制缓存并通过忘记所有引用来清除有壳的值:

AtomicBoolean shouldStop = new AtomicBoolean();

source.takeUntil(v -> shouldStop.get())
        .onTerminateDetach()
        .cache()
        .takeUntil(v -> shouldStop.get())
        .onTerminateDetach()
        .subscribe(...);

然后,也许您可​​以保存状态并偶尔使引用无效。

RxJava-我什么时候应该取消订阅? - java

我想确保使用RxJava时不会造成任何内存泄漏。请让我知道这些是否是处理每种情况的正确方法。 案例1 如果我在同一范围内创建Observable和Subscription,GC会照顾好它们的处置吗?我必须在这里致电unsubscribe()吗?public static void createObservableAndSubscribe() { Observ…

RxJava-为什么执行程序只使用一个线程 - java

我创建了一个固定的线程池来处理每300毫秒发出的事件,并假定该过程需要1000毫秒。假设多线程可以工作,但是只有一个线程被重用。如果我将sleepTime设置为小于300ms,则处理线程会更改,但这没有用。问题:如何使其并发?为什么程序重用线程?先感谢您public static void main(String[] args) throws Interru…

android-android studio模拟器中的SSL问题,在手机上可以正常工作 - java

我有一个可以通过https调用网络服务的应用程序。当我在手机上运行apk时,效果很好。但是,在模拟器中,所有通过POST的SSL请求均失败,并显示以下信息: 读取错误:ssl = 0xb402be00:SSL库失败,通常是一个协议 错误 错误:100c50bf:SSL例程:ssl3_read_bytes:NO_RENEGOTIATION(外部/无聊的sl /…

绑定Java库Xamarin.Android - java

我花了两天时间在每个论坛,文档,tuto,博客等上寻找答案。我为实习生启动了一个Android应用程序,因为我不懂Java,所以用xamarin C#开发了它。直到最近一切都还不错,但现在我需要集成一个SDK才能在应用程序中使用POS(销售点),但是该库是用Java编写的,即使跟随文档或辅导老师,我也无法将其与xamarin绑定(我什至无法调试)。这里有人已…

Java:正则表达式模式匹配器是否有大小限制? - java

我的模式类似于OR:“word1 | word2 | word3”我大约有800个字。可能有问题吗? 参考方案 您仅受记忆和理智的限制。 :)