Kotlin多平台响应式框架reaktive

Kotlin跨平台Reactive Extensions实现。

安装配置

这里有几个已经发布到 Maven Central 的模块:

  • reaktive:Reaktive 的主要库(多平台)
  • reaktive-annotations:注解集合(多平台)
  • reaktive-testing:测试工具(多平台)
  • utils:一些实用工具,例如 Clock、AtomicReference、Lock 等(多平台)
  • coroutines-interop:Kotlin 协程的互操作性辅助工具(多平台)
  • rxjava2-interop:RxJava v2 的互操作性辅助工具(JVM 和 Android)
  • rxjava3-interop:RxJava v3 的互操作性辅助工具(JVM 和 Android)

配置依赖

kotlin {    sourceSets {        commonMain {            dependencies {                implementation 'com.badoo.reaktive:reaktive:<version>'                implementation 'com.badoo.reaktive:reaktive-annotations:<version>'                implementation 'com.badoo.reaktive:coroutines-interop:<version>' // For interop with coroutines                implementation 'com.badoo.reaktive:rxjava2-interop:<version>' // For interop with RxJava v2                implementation 'com.badoo.reaktive:rxjava3-interop:<version>' // For interop with RxJava v3            }        }        commonTest {            dependencies {                implementation 'com.badoo.reaktive:reaktive-testing:<version>'            }        }    }}

特征

跨平台支持:JVM、Android、iOS、macOS、watchOS、tvOS、JavaScript、Linux X64、Linux ARM 32 hfp
调度程序支持:

  • computationScheduler – 固定线程池,线程数等于核心数
  • ioScheduler – 无限制线程池,使用缓存策略
  • newThreadScheduler – 为每个任务创建一个新线程
  • singleScheduler – 在单个共享后台线程上执行任务
  • trampolineScheduler – 将任务排队,并在参与的线程之一上执行它们
  • mainScheduler – 在主线程上执行任务
  • Kotlin/Native的真正多线程支持(有一些限制)
  • 针对Kotlin/Native的线程本地订阅而不会冻结
  • 支持的来源:Observable、Maybe、Single、Completable
  • Subjects:PublishSubject、BehaviorSubject、ReplaySubject、UnicastSubject
  • 与Kotlin协程的互操作性:在Reaktive和协程之间(包括Flow)之间进行转换
  • 与RxJava2和RxJava3的互操作性:在Reaktive和RxJava之间进行源的转换,能够重用RxJava的调度程序

Reaktive和旧(严格的)Kotlin/Native内存模型

旧的(严格的)Kotlin Native内存模型和并发是非常特殊的。一般来说,线程之间不允许共享可变状态。由于Reaktive支持Kotlin Native中的多线程,请在使用之前阅读以下文档:

  • 并发性

  • 不变性
    对象分离相对较难实现,当对象从外部创建且不完全由库管理时非常容易出错。这就是为什么Reaktive更喜欢冻结状态的原因。以下是一些提示:

  • 任何提交给调度程序的回调(以及任何被捕获的对象)都将被冻结

  • subscribeOn会冻结其上游源和下游观察者,所有的Disposables(上游和下游的)也都被冻结,所有的值(包括错误)都不是由该操作符冻结的

  • observeOn仅会冻结其下游观察者和通过它传递的所有值(包括错误),加上所有的Disposables,上游源不会被该操作符冻结

  • 使用调度程序的其他操作符(如debouncetimerdelay等)在大多数情况下与observeOn的行为相同

线程本地技巧以避免冻结

有时候不能接受冻结,例如我们可能希望在后台加载一些数据,然后更新UI。显然,UI 不能被冻结。使用Reaktive可以通过两种方式实现这种行为:

使用threadLocal运算符:

val values = mutableListOf<Any>()var isFinished = falseobservable<Any> { emitter ->    // Background job}    .subscribeOn(ioScheduler)    .observeOn(mainScheduler)    .threadLocal()    .doOnBeforeNext { values += it } // Callback is not frozen, we can update the mutable list    .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag    .subscribe()

subscribe 运算符中将 isThreadLocal 标志设置为 true

val values = mutableListOf<Any>()var isComplete = falseobservable<Any> { emitter ->    // Background job}    .subscribeOn(ioScheduler)    .observeOn(mainScheduler)    .subscribe(        isThreadLocal = true,        onNext = { values += it }, // Callback is not frozen, we can update the mutable list        onComplete = { isComplete = true } // Callback is not frozen, we can change the flag    )

在这两种情况下,订阅(subscribe调用)必须在主线程上执行。

Reaktive和新的(宽松的)Kotlin/Native内存模型

新的(宽松的)Kotlin/Native内存模型允许在线程之间传递对象而无需冻结。使用此内存模型时,不再需要使用threadLocal运算符/参数。请确保按照文档中所述禁用冻结。

协程互操作性

这个功能由coroutines-interop模块提供,分为两个版本发布:

coroutines-interop:基于稳定的kotlinx.coroutines – 使用稳定的协程版本和旧的(严格)内存模型使用此变体。
coroutines-interop:-nmtc基于正在进行的多线程kotlinx.coroutines – 使用此变体,无论是协程的多线程版本还是新的(放松的)内存模型。

基于稳定kotlinx.coroutines的协程互操作性

有一些重要的限制:

直到多线程协程的发布,JobCoroutineContext都无法被冻结。
由于第一个限制,在Kotlin/Native中,所有的xxxFromCoroutine {}构建器和Flow.asObservable()转换器都在runBlocking块内执行,并应该在后台Scheduler上进行订阅。
考虑以下coroutines-interop示例:

singleFromCoroutine {    // This block will be executed inside `runBlocking` in Kotlin/Native}    .subscribeOn(ioScheduler) // Switching to a background thread is necessary    .observeOn(mainScheduler)    .subscribe { /* Get the result here */ }

请注意,Ktor默认使用多线程协程。如果您正在使用Ktor,请使用基于多线程协程的coroutines-interop模块,并继续阅读下一个自述文件部分。

基于多线程kotlinx.coroutines的协程Interop

多线程kotlinx.coroutines变体消除了一些不愉快的限制- JobCoroutineContext都可以被冻结。

因此,有一个关键的区别-所有的xxxFromCoroutine {}构建器和Flow.asObservable()转换器在所有目标中(包括Kotlin/Native)都是异步执行的,因此可以在任何调度器上进行订阅。

注:

  • 由于多线程协程仍在进行中,可能会出现问题。
  • Ktor可以直接使用,没有已知的限制

协程Interop的通用限制
转换器Scheduler.asCoroutineDispatcher()CoroutineContext.asScheduler()目前仅在JVM和JS中可用。

使用DisposableScope进行订阅管理

Reaktive提供了一种方便的订阅管理方式:DisposableScope

请查看以下示例:

val scope =    disposableScope {        observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed        doOnDispose {            // Will be called when the scope is disposed        }        someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed    }// At some point laterscope.dispose()
class MyPresenter(    private val view: MyView,    private val longRunningAction: Completable) : DisposableScope by DisposableScope() {    init {        doOnDispose {            // Will be called when the presenter is disposed        }    }    fun load() {        view.showProgressBar()        // Subscription will be disposed when the presenter is disposed        longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)    }}class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {    override fun onCreate(savedInstanceState: Bundle?) {        super.onCreate(savedInstanceState)        MyPresenter(...).scope()    }    override fun onDestroy() {        dispose()        super.onDestroy()    }}

Reaktive 和 Swift 的互操作性

请参阅相应的文档页面:Reaktive 和 Swift 的互操作性。

https://github.com/badoo/Reaktive/blob/master/docs/SwiftInterop.md

插件
Reaktive 提供了 Plugin API,类似于 RxJava 插件。Plugin API 提供了一种装饰 Reaktive 源的方式。插件应该实现 ReaktivePlugin 接口,并可以使用 registerReaktivePlugin 函数进行注册,并使用 unregisterReaktivePlugin 函数进行取消注册。

object MyPlugin : ReaktivePlugin {    override fun <T> onAssembleObservable(observable: Observable<T>): Observable<T> =        object : Observable<T> {            private val traceException = TraceException()            override fun subscribe(observer: ObservableObserver<T>) {                observable.subscribe(                    object : ObservableObserver<T> by observer {                        override fun onError(error: Throwable) {                            observer.onError(error, traceException)                        }                    }                )            }        }    override fun <T> onAssembleSingle(single: Single<T>): Single<T> =        TODO("Similar to onAssembleSingle")    override fun <T> onAssembleMaybe(maybe: Maybe<T>): Maybe<T> =         TODO("Similar to onAssembleSingle")    override fun onAssembleCompletable(completable: Completable): Completable =        TODO("Similar to onAssembleSingle")    private fun ErrorCallback.onError(error: Throwable, traceException: TraceException) {        if (error.suppressedExceptions.lastOrNull() !is TraceException) {            error.addSuppressed(traceException)        }        onError(error)    }    private class TraceException : Exception()}

github项目主页

https://github.com/badoo/Reaktive

一些代码示例

MPP module

https://github.com/badoo/Reaktive/tree/master/sample-mpp-module

Android app

https://github.com/badoo/Reaktive/tree/master/sample-android-app

iOS app

https://github.com/badoo/Reaktive/tree/master/sample-ios-app

JavaScript browser app

https://github.com/badoo/Reaktive/tree/master/sample-js-browser-app

Linux x64 app

https://github.com/badoo/Reaktive/tree/master/sample-linuxx64-app