• 日常搜索
  • 百度一下
  • Google
  • 在线工具
  • 搜转载

使用RxJava和RxKotlin进行Kotlin反应式编程

从成为 android 开发的官方支持语言以来,kotlin在 Android 开发人员中的流行度迅速增长,据 Google 报告,使用Kotlin创建的应用程序增加了 6 倍。

如果您以前使用过 Rxjava 或 RxAndroid 并想切换到 Kotlin,或者想开始使用 Kotlin 进行响应式编程,那么本教程适合您。我们将介绍在 Kotlin 中创建 RxJava 2.0 和数据流的基本要素ObserversObservables然后再了解如何通过将 RxJava 与 Kotlin 扩展函数相结合来从项目中删除大量样板代码。

将 RxJava 与 Kotlin 结合使用可以帮助您以更少的代码创建高度响应的应用程序,但没有一种编程语言是完美的,因此我还将分享许多开发人员在第一次开始使用带有 Kotlin 的 RxJava 2.0 时遇到的 SAM 转换问题的解决方法。

最后,我们将创建一个应用程序来演示如何使用 RxJava 来解决您在实际 Android 项目中遇到的一些问题。

如果这是您第一次体验 RxJava,那么在此过程中,我还将提供您理解核心 RxJava 概念所需的所有背景信息。即使您以前从未尝试过 RxJava,在本文结束时,您也会对如何在您的项目中使用这个库有一个深刻的理解,并且您将使用 RxJava、RxKotlin、RxAndroid 创建了几个工作应用程序和rxbinding

到底什么是 RxJava?

RxJava 是 reactiveX 库的开源实现,可帮助您以反应式编程风格创建应用程序。尽管RxJava旨在处理同步和异步数据流,但它并不局限于“传统”数据类型。RxJava 对“数据”的定义非常广泛,包括缓存、变量、属性,甚至是点击和滑动等用户输入事件。仅仅因为您的应用程序不处理大量数字或执行复杂的数据转换,并不意味着它不能从 RxJava 中受益!

那么 RxJava 是如何工作的呢?

RxJava 扩展了基于 Observers 和 Observables 概念的 Observer 软件设计模式。要创建基本的 RxJava 数据管道,您需要:

  • 创建一个可观察的。

  • 给 Observable 一些数据以发出。  

  • 创建一个观察者。

  • 订阅 Observer 到 Observable。

一旦 Observable 至少有一个 Observer,它就会开始发送数据。每次 Observable 发出一条数据时,它都会通过调用该onNext()方法通知其分配的 Observer,然后 Observer 通常会执行一些操作来响应该数据发出。一旦 Observable 完成发送数据,它会通过调用来通知 Observer onComplete()然后 Observable 将终止,数据流将结束。

如果发生异常, thenonError()将被调用,并且 Observable 将立即终止,不再发出任何数据或调用onComplete().

但是 RxJava不仅仅是将数据从 Observable 传递给 Observer!RxJava 有大量的操作符集合,您可以使用它们来过滤、合并和转换这些数据。例如,假设您的应用程序有一个立即付款按钮来检测onClick事件,并且您担心不耐烦的用户可能会多次点击该按钮,从而导致您的应用程序处理多次付款。  

RxJava 允许您将这些onClick事件转换为数据流,然后您可以使用 RxJava 的各种运算符对其进行操作。在这个特定的示例中,您可以使用debounce()运算符来过滤快速连续发生的数据排放,因此即使用户猛击“立即付款”按钮,您的应用程序也只会注册一次付款。

使用 RxJava 有什么好处?

我们已经了解了 RxJava 如何帮助您解决特定应用程序中的特定问题,但总的来说,它能为 Android 项目提供什么?

RxJava 可以通过为您提供一种编写您想要实现的目标的方法来帮助简化您的代码,而不是编写您的应用程序必须完成的指令列表。例如,如果您想忽略在同一 500 毫秒内发生的所有数据发射,那么您可以编写:

.debounce(500, TimeUnit.MILLISECONDS)

此外,由于 RxJava几乎将所有内容都视为数据,因此它提供了一个模板,您可以将其应用于各种事件:创建 Observable、创建 Observer、为 Observer 订阅 Observable、冲洗和重复。这种公式化的方法产生了更直接、更易读的代码。

Android 开发人员的另一个主要好处是,RxJava 可以减轻 Android 多线程带来的痛苦。今天的移动用户希望他们的应用程序能够进行多任务处理,即使只是在后台下载数据同时保持对用户输入的响应这样简单。

Android 有多种用于创建和管理多线程的内置解决方案,但这些解决方案都不是特别容易实现的,而且它们会很快导致复杂、冗长、难以阅读且容易出错的代码。

在 RxJava 中,您可以使用运算符和调度程序的组合来创建和管理其他线程。您可以使用subscribeOn运算符和调度程序轻松更改执行工作的线程。例如,这里我们正在安排要在新线程上执行的工作:

.subscribeOn(Schedulers.newThread())

您可以使用运算符指定应在何处发布此工作的结果observeOn在这里,我们使用调度程序将结果发布到 Android 最重要的主 UI 线程,该AndroidSchedulers.mainThread调度程序作为 RxAndroid 库的一部分提供:

.observeOn(AndroidSchedulers.mainThread())

相比 Android 内置的多线程解决方案,RxJava 的做法更加简洁易懂。

同样,您可以在我的 Android 版 RxJava 2 入门文章中了解更多关于 RxJava 的工作原理以及将这个库添加到您的项目的好处  。

我应该使用 RxJava 还是 RxKotlin?

由于 Kotlin 与 Java 100% 可互操作,因此您可以在 Kotlin 项目中毫无困难地使用大多数 Java 库——RxJava 库也不例外。

一个专用的RxKotlin 库,它是围绕常规 RxJava 库的 Kotlin 包装器。这个包装器提供了针对 Kotlin 环境优化 RxJava 的扩展,并且可以进一步减少您需要编写的样板代码量。

由于您可以在 Kotlin 中使用 RxJava 而无需 RxKotlin,因此除非另有说明,否则我们将在本文中使用 RxJava。

在 Kotlin 中创建简单的观察者和可观察对象

Observers 和 Observables 是 RxJava 的构建块,所以让我们从创建开始:

  • 一个简单的 Observable,它发出一个短的数据流来响应按钮点击事件。

  • 通过将不同的消息打印到 Android Studio 的Logcat来对此数据做出反应的 Observable 。

使用您选择的设置创建一个新项目,但请确保在出现提示时选中Include Kotlin support复选框。接下来,打开项目的build.gradle文件并将 RxJava 库添加为项目依赖项:

dependencies {
  implementation fileTree(dir: 'libs', include: ['*.jar'])
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
  implementation 'androidx.appcompat:appcompat:1.0.0-alpha1'
  implementation 'androidx.constraintlayout:constraintlayout:1.1.0'
  implementation 'io.reactivex.rxjava2:rxjava:2.1.9'

}

然后,打开项目的activity_main.xml文件并添加将启动数据流的按钮:

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
  xmlns:tools="http://schemas.android.com/tools"
  android:layout_width="match_parent"
  android:layout_height="match_parent"
  android:orientation="vertical"
  tools:context=".MainActivity" >

  <Button
      android:id="@+id/button"
      android:layout_width="wrap_content"
      android:layout_height="wrap_content"
      android:text="Start RxJava stream" />

</LinearLayout>

创建 Observable 有多种不同的方法,但最简单的方法之一是使用just()运算符将对象或对象列表转换为 Observable。

在下面的代码中,我们将创建一个 Observable ( myObservable) 并为其指定要发射的项目 1、2、3、4 和 5。我们还创建了一个 Observer ( myObserver),将其订阅到myObservable,然后告诉它每次收到新的发射时都向Logcat打印一条消息。

import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import kotlinx.android.synthetic.main.activity_main.*

class MainActivity : AppCompatActivity() {

  private var TAG = "MainActivity"

  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)

//Start the stream when the button is clicked//

      button.setOnClickListener { startRStream() }

  }

  private fun startRStream() {

//Create an Observable//

      val myObservable = getObservable()

//Create an Observer//

      val myObserver = getObserver()

//Subscribe myObserver to myObservable//

      myObservable
              .subscribe(myObserver)
  }

  private fun getObserver(): Observer<String> {
      return object : Observer<String> {
          override fun onSubscribe(d: Disposable) {
          }

//Every time onNext is called, print the value to Android Studio’s Logcat//

          override fun onNext(s: String) {
              Log.d(TAG, "onNext: $s")
          }

//Called if an exception is thrown//

          override fun onError(e: Throwable) {
              Log.e(TAG, "onError: " + e.message)
          }

//When onComplete is called, print the following to Logcat//

          override fun onComplete() {
              Log.d(TAG, "onComplete")
          }
      }
  }

//Give myObservable some data to emit//

  private fun getObservable(): Observable<String> {
      return Observable.just("1", "2", "3", "4", "5")
  }
}

您现在可以对这个应用程序进行测试:

  • 在物理 Android 智能手机或平板电脑或 Android 虚拟设备 (AVD) 上安装您的项目。

  • 单击 Start RxJava按钮。

  • 通过选择Android Monitor选项卡(光标位于以下屏幕截图中的位置)打开 Android Studio 的 Logcat Monitor ,然后选择Logcat选项卡。

此时,Observable 将开始发送其数据,并且 Observer 将其消息打印到 Logcat。您的 Logcat 输出应如下所示:

使用RxJava和RxKotlin进行Kotlin反应式编程  第1张

 如果您想亲自尝试,可以从 GitHub 下载此项目。

RxJava 的 Kotlin 扩展

现在我们已经了解了如何在 Kotlin 中设置一个简单的 RxJava 管道,让我们看看如何使用 RxKotlin 的扩展函数以更少的代码实现这一点。

要使用 RxKotlin 库,您需要将其添加为项目依赖项:

dependencies {
  implementation fileTree(dir: 'libs', include: ['*.jar'])
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
  implementation 'androidx.appcompat:appcompat:1.0.0-alpha1'
  implementation 'androidx.constraintlayout:constraintlayout:1.1.0'
  implementation 'io.reactivex.rxjava2:rxjava:2.1.9'

//Add the following//

  implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'

}

在下面的示例中,我们使用 RxKotlin 的toObservable()扩展函数将 aList转换为 Observable。我们还使用了subscribeBy()扩展函数,因为它允许我们使用命名参数构造一个观察者,这会产生更清晰的代码。

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import kotlinx.android.synthetic.main.activity_main.*

class MainActivity : AppCompatActivity() {

  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)

//Start the stream when the button is clicked//

      button.setOnClickListener { startRStream() }

  }

  private fun startRStream() {

      val list = listOf("1", "2", "3", "4", "5")

//Apply the toObservable() extension function//

      list.toObservable()

//Construct your Observer using the subscribeBy() extension function//

              .subscribeBy(

                      onNext = { println(it) },
                      onError = { it.printStackTrace() },
                      onComplete = { println("onComplete!") }

              )
  }
}

这是您应该看到的输出:

使用RxJava和RxKotlin进行Kotlin反应式编程  第2张

解决 RxJava 的 SAM 歧义问题

RxKotlin 还为给定 Java 方法上存在多个 SAM 参数重载时可能发生的SAM 转换问题提供了重要的解决方法。这种 SAM 歧义让 Kotlin 编译器感到困惑,因为它无法确定应该转换哪个接口,因此您的项目将无法编译。

当使用带有 Kotlin 的 RxJava 2.0 时,这种 SAM 歧义是一个特殊的问题,因为许多 RxJava 运算符采用多种 SAM 兼容类型。

让我们看看实际的 SAM 转换问题。在下面的代码中,我们使用zip()运算符来组合两个 Observable 的输出:  

import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_main.*

class MainActivity : AppCompatActivity() {

  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)

//Start the stream when the button is clicked//

      button.setOnClickListener { startRStream() }
      
  }

  private fun startRStream() {

      val numbers = Observable.range(1, 6)

      val strings = Observable.just("One", "Two", "Three",

              "Four", "Five", "Six" )

      val zipped = Observable.zip(strings, numbers) { s, n -> "$s $n" }
      zipped.subscribe(::println)
  }
}

这将导致 Kotlin 编译器抛出类型推断错误。但是,RxKotlin 为受影响的运算符提供了辅助方法和扩展函数,包括Observables.zip()我们在以下代码中使用的 :

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables
import kotlinx.android.synthetic.main.activity_main.*

class MainActivity : AppCompatActivity() {

  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)

//Start the stream when the button is clicked//

      button.setOnClickListener { startRStream() }

  }

  private fun startRStream() {

      val numbers = Observable.range(1, 6)

      val strings = Observable.just("One", "Two", "Three",

              "Four", "Five", "Six" )

      val zipped = Observables.zip(strings, numbers) { s, n -> "$s $n" }
      zipped.subscribe(::println)
  }


}

这是此代码的输出:

使用RxJava和RxKotlin进行Kotlin反应式编程  第3张

结论

在本教程中,我向您展示了如何在 Kotlin 项目中开始使用 RxJava 库,包括使用许多其他支持库,例如 RxKotlin 和 RxBinding。我们研究了如何在 Kotlin 中创建简单的 Observers 和 Observables,一直到使用扩展函数为 Kotlin 平台优化 RxJava。

到目前为止,我们已经使用 RxJava 创建了简单的可发出数据的 Observable,以及将这些数据打印到 Android Studio 的 Logcat 的 Observer——但这不是您在现实世界中使用 RxJava 的方式!


文章目录
  • 到底什么是 RxJava?
  • 使用 RxJava 有什么好处?
  • 我应该使用 RxJava 还是 RxKotlin?
  • 在 Kotlin 中创建简单的观察者和可观察对象
  • RxJava 的 Kotlin 扩展
    • 解决 RxJava 的 SAM 歧义问题
  • 结论