在Andoid中如何使用RxJava 2进行多线程编程?(6)

在上面的代码片段中,源observable在I/O线程生成其条目,因为这里组合使用了subscribeOn()和Schedulers.io()。现在,我们想要使用map()操作符来转换每个条目,只不过需要在计算线程中进行。为了实现这一点,我们可以在map()操作符之前组合使用observeOn()和Schedulers.computation(),这样的话就能切换线程并将产出物传递到计算线程中了。

接下来,需要过滤条目,但是因为某些原因,我们想在一个全新的线程中进行操作。这样的话,我们可以在filter()操作符前面组合使用observeOn()和Schedulers.newThread(),这样的话,就能为每个条目切换至一个新的线程。

最后,我们希望订阅者消费最终处理后的条目并在UI上展现结果,为了实现这一点,我们需要再次切换线程,不过这一次需要切换至主线程,这里需要组合使用observeOn()和AndroidSchedulers.mainThread() Scheduler。

但是,如果我们多次地连续使用observeOn()会怎样呢?在下面的代码片段中,最终的订阅者在消费结果的时候,到底使用的是哪个线程呢?是第一个还是最后一个observeOn()能发挥作用呢?

getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

如果你运行这个代码片段的话,你会发现所有的条目都是在RxComputationThreadPool-1线程中消费的,这意味着最后的observeOn()和Schedulers.computation()发挥了作用。但是,为什么呢?

背后的原理

你可能已经猜到,为何是最后的observeOn(),而不是其他的observeOn()发挥作用了。我们已经知道,订阅只能针对上游才能发生,另外一方面,产出物只能面对下游才能发挥作用。它们从源observable开始,一路沿着链向下,直至最后的sink subscriber。

observeOn()操作符只能针对下游observable才有效,所以最后的observeOn()和Schedulers.computation()覆盖了前面声明的observeOn()操作符。因此,每当你想为特定的observable切换线程时,需要做的就是指定observeOn()操作符。同步、状态不一致性、竞态条件以及其他线程相关的问题都会在幕后自动处理好了。

到目前为止,相信你已经对如何使用RxJava编写多线程的应用并保证App的快速和流畅有了很好的理解。

如果你还没有充分理解和吸收文中的知识的话,其实也不要紧,读者可以多读几遍并亲自尝试一下样例代码,这样的话,会有更深刻的理解。对于一篇文章来说,这些内容确实有些多,你可以在这上面多花一点时间。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/d1e341f4e14e0d4b847772148355d24a.html