如何在Android上使用响应式建模简化并发

本文概述

并发性和异步性是移动编程固有的。

通过命令式编程处理并发可能是许多问题的原因, 而命令式编程是Android上通常涉及的编程。将响应式编程与RxJava结合使用, 可以通过提供更简洁且更不易出错的解决方案来避免潜在的并发问题。

除了简化并发, 异步任务之外, RxJava还提供了执行功能样式操作的功能, 这些功能可以转换, 合并和聚合Observable的发射, 直到获得所需的结果。

如何在Android上使用响应式建模简化并发1

通过结合RxJava的反应性范例和功能样式操作, 即使在Android的非反应性世界中, 我们也可以以反应性方式对各种并发构造进行建模。在本文中, 你将学习如何做到这一点。你还将学习如何逐步将RxJava引入现有项目。

如果你不熟悉RxJava, 建议你在这里阅读有关RxJava的一些基础知识的文章。

桥接非反应式进入反应式世界

将RxJava作为项目的库之一添加的挑战之一是, 它从根本上改变了你对代码进行推理的方式。

RxJava要求你将数据视为被推入而不是被拉入。虽然概念本身很简单, 但是更改基于pull范式的完整代码库可能会有些艰巨。尽管一致性始终是理想的选择, 但你可能并不总是拥有一次在整个代码库中一次进行此转换的特权, 因此可能需要更多的增量方法。

考虑以下代码:

/**
* @return a list of users with blogs
*/
public List<User> getUsersWithBlogs() {
   final List<User> allUsers = UserCache.getAllUsers();
   final List<User> usersWithBlogs = new ArrayList<>();
   for (User user : allUsers) {
       if (user.blog != null && !user.blog.isEmpty()) {
           usersWithBlogs.add(user);
       }
   }
   Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name));
   return usersWithBlogs;
}

此函数从缓存中获取用户对象列表, 根据用户是否有博客对其进行过滤, 并按用户名对它们进行排序, 最后将其返回给调用者。查看此片段, 我们注意到其中许多操作都可以利用RxJava运算符。例如filter()和sorted()。

重写此代码段可以给我们:

/**
* @return a list of users with blogs
*/
public Observable<User> getUsersWithBlogs() {
   return Observable.fromIterable(UserCache.getAllUsers())
                    .filter(user -> user.blog != null && !user.blog.isEmpty())
                    .sorted((user1, user2) -> user1.name.compareTo(user2.name));
}

函数的第一行通过fromIterable()将UserCache.getAllUsers()返回的List <User>转换为Observable <User>。这是使我们的代码具有反应性的第一步。现在我们正在对Observable进行操作, 这使我们能够在RxJava工具箱中执行任何Observable运算符–在这种情况下为filter()和sorted()。

关于此更改, 还有其他几点要注意。

首先, 方法签名不再相同。如果仅在少数几个地方使用此方法调用, 并且很容易将更改传播到堆栈的其他区域, 则这可能并不重要。但是, 如果它破坏了依赖此方法的客户端, 则将出现问题, 并且应还原方法签名。

其次, RxJava在设计时考虑了惰性。也就是说, 在没有可观察对象的订户时, 不应执行任何长时间的操作。进行此修改后, 该假设不再成立, 因为甚至在有任何订阅者之前, 都会调用UserCache.getAllUsers()。

离开反应世界

为了解决我们更改中的第一个问题, 我们可以使用Observable可用的任何阻塞运算符, 例如blockingFirst()和blockingNext()。本质上, 这两个运算符都将阻塞, 直到向下游发射一个项目为止:blockingFirst()将返回发射的第一个元素并完成, 而blockingNext()将返回一个Iterable, 该迭代器允许你对基础数据执行for-each循环(循环中的每次迭代都会阻止)。

但是, 使用阻塞操作的一个副作用是要注意的一个重要方面, 那就是异常会抛出在调用线程上, 而不是传递给观察者的onError()方法。

使用阻塞运算符将方法签名更改回List <User>, 我们的代码段现在看起来像这样:

/**
* @return a list of users with blogs
*/
public List<User> getUsersWithBlogs() {
   return Observable.fromIterable(UserCache.getAllUsers())
           .filter(user -> user.blog != null && !user.blog.isEmpty())
           .sorted((user1, user2) -> user1.name.compareTo(user2.name))
           .toList()
           .blockingGet();
}

在调用阻塞运算符(即blockingGet())之前, 我们首先需要将聚合运算符链接到List(), 以便将流从Observable <User>修改为Single <List <User >>(Single是一种特殊类型)。 Observable的值仅在onSuccess()中发出单个值, 或者通过onError()发出错误。

然后, 我们可以调用阻塞操作符blockingGet(), 该操作解开Single并返回List <User>。

尽管RxJava支持此功能, 但应尽可能避免这种情况, 因为这不是惯用的反应式编程。但是, 在绝对必要时, 阻塞运算符是退出被动世界的一种不错的初始方法。

懒惰的方法

如前所述, RxJava在设计时考虑了惰性。即, 应将长时间运行的操作尽可能地延迟(即, 直到在Observable上调用订阅)。为了使我们的解决方案变得懒惰, 我们使用defer()运算符。

如何在Android上使用响应式建模简化并发2

defer()包含一个ObservableSource工厂, 该工厂为每个订阅的新观察者创建一个Observable。在我们的例子中, 我们希望每当观察者订阅时返回Observable.fromIterable(UserCache.getAllUser())。

/**
* @return a list of users with blogs
*/
public Observable<User> getUsersWithBlogs() {
   return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers()))
                    .filter(user -> user.blog != null && !user.blog.isEmpty())
                    .sorted((user1, user2) -> user1.name.compareTo(user2.name));
}

现在, 长时间运行的操作已包装在defer()中, 只需在subscribeOn()中指定适当的Scheduler, 我们就可以完全控制应在哪个线程中运行。进行此更改后, 我们的代码将完全响应, 并且仅应在需要数据时进行订阅。

/**
* @return a list of users with blogs
*/
public Observable<User> getUsersWithBlogs() {
   return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers()))
                    .filter(user -> user.blog != null && !user.blog.isEmpty())
                    .sorted((user1, user2) -> user1.name.compareTo(user2.name))
                    .subscribeOn(Schedulers.io());
}

另一个用于延迟计算的非常有用的运算符是fromCallable()方法。与defer()期望在lambda函数中返回一个Observable并依次”平展”返回的Observable不同, fromCallable()将调用lambda并向下游返回值。

/**
* @return a list of users with blogs
*/
public Observable<User> getUsersWithBlogs() {
   final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers());
   final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users));
   return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty())
                        .sorted((user1, user2) -> user1.name.compareTo(user2.name));
}

现在, 在列表上使用fromCallable()单个将返回Observable <List <User >>, 我们需要使用flatMap()将其扁平化。

反应一切

从前面的示例中, 我们已经看到可以使用阻塞操作和defer()/ fromCallable()将任何对象包装在Observable中, 并在非反应性和反应性状态之间跳转。使用这些构造, 我们可以开始将Android应用程序的区域转换为可响应的。

长期运营

最初考虑使用RxJava的一个好地方是, 只要你有一个需要花费一些时间来执行的过程, 例如网络调用(请查看以前的示例), 磁盘读写等等。以下示例说明了一个简单的函数, 该函数可以将文本写入文件系统:

/**
* Writes {@code text} to the file system.
*
* @param context a Context
* @param filename the name of the file
* @param text the text to write
* @return true if the text was successfully written, otherwise, false
*/
public boolean writeTextToFile(Context context, String filename, String text) {
   FileOutputStream outputStream;
   try {
       outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE);
       outputStream.write(text.getBytes());
       outputStream.close();
       return true;
   } catch (Exception e) {
       e.printStackTrace();
       return false;
   }
}

调用此函数时, 由于此操作正在阻塞, 因此需要确保在单独的线程上完成此操作。对调用者施加这样的限制会使开发人员的事情复杂化, 这增加了错误的可能性, 并有可能减慢开发速度。

在函数中添加注释当然可以帮助避免调用者的错误, 但这仍然离防弹还差得远。

但是, 使用RxJava, 我们可以轻松地将其包装到Observable中, 并指定应在其上运行的Scheduler。这样, 调用者根本不必担心在单独的线程中调用该函数。该功能将自行处理。

/**
* Writes {@code text} to the filesystem.
*
* @param context a Context
* @param filename the name of the file
* @param text the text to write
* @return An Observable emitting a boolean indicating whether or not the text was successfully written.
*/
public Observable<Boolean> writeTextToFile(Context context, String filename, String text) {
   return Observable.fromCallable(() -> {
       FileOutputStream outputStream;
       outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE);
       outputStream.write(text.getBytes());
       outputStream.close();
       return true;
   }).subscribeOn(Schedulers.io());
}

使用fromCallable(), 将文本写入文件的时间推迟到订阅时间。

由于异常是RxJava中的一流对象, 因此更改的另一个好处是, 我们不再需要将操作包装在try / catch块中。异常将简单地传播到下游, 而不是被吞噬。这使调用者可以处理他/她认为合适的异常(例如, 根据抛出的异常向用户显示错误等)。

我们可以执行的另一种优化是返回Completable而不是Observable。 Completable本质上是Observable的一种特殊类型(类似于Single), 它仅指示通过onComplete()成功执行还是通过onError()成功执行计算。在这种情况下, 返回Completable似乎更有意义, 因为在Observable流中返回单个true似乎很愚蠢。

/**
* Writes {@code text} to the filesystem.
*
* @param context a context
* @param filename the name of the file
* @param text the text to write
* @return A Completable
*/
public Completable writeTextToFile(Context context, String filename, String text) {
   return Completable.fromAction(() -> {
       FileOutputStream outputStream;
       outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE);
       outputStream.write(text.getBytes());
       outputStream.close();
   }).subscribeOn(Schedulers.io());
}

要完成该操作, 我们将使用Completable的fromAction()操作, 因为返回值不再是我们所需要的。如果需要, 如Observable一样, Completable也支持fromCallable()和defer()函数。

取代回呼

到目前为止, 我们查看的所有示例都发出一个值(即可以建模为Single), 或者告诉我们操作是成功还是失败(即可以建模为Completable)。

但是, 我们如何在应用程序中转换接收连续更新或事件(例如位置更新, 查看点击事件, 传感器事件等)的区域?

我们将研究两种方法, 使用create()和使用Subjects。

create()允许我们显式调用观察者的onNext()| onComplete()|我们从数据源接收更新时使用onError()方法。要使用create(), 我们传入一个ObservableOnSubscribe, 当观察者订阅时, 该ObservableOnSubscribe会接收一个ObservableEmitter。然后, 使用接收到的发射器, 我们可以执行所有必要的设置调用以开始接收更新, 然后调用适当的Emitter事件。

如果是位置更新, 我们可以注册接收此位置的更新, 并在收到时发出位置更新。

public class LocationManager {

   /**
    * Call to receive device location updates.
    * @return An Observable emitting location updates
    */
   public Observable<Location> observeLocation() {
       return Observable.create(emitter -> {
           // Make sure that the following conditions apply and if not, call the emitter's onError() method
           // (1) googleApiClient is connected
           // (2) location permission is granted
           final LocationRequest locationRequest = new LocationRequest();
           locationRequest.setInterval(1000);
           locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY);

           LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() {
               @Override public void onLocationChanged(Location location) {
                   if (!emitter.isDisposed()) {
                       emitter.onNext(location);
                   }
               }
           });
       });
   }
}

create()调用中的函数请求位置更新, 并传递一个回调, 该回调在设备位置更改时被调用。如我们在这里看到的, 我们基本上替换了回调样式的接口, 而是在创建的Observable流中发出接收到的位置(出于教育目的, 如果你想研究, 我通过构造位置请求跳过了一些细节。你可以在此处阅读更多详细信息)。

关于create()的另一件事要注意的是, 每当调用subscribe()时, 都会提供一个新的发射器。换句话说, create()返回一个冷的Observable。这意味着在上面的功能中, 我们可能会多次请求位置更新, 这不是我们想要的。

要解决此问题, 我们想在Subjects的帮助下更改函数以返回热的Observable。

输入主题

一个Subject扩展一个Observable并同时实现Observer。每当我们想同时向多个订阅者发出或投射同一事件时, 此功能特别有用。在实现方面, 我们希望向客户公开该主题为可观察对象, 同时将其作为提供者的主题。

public class LocationManager {

   private Subject<Location> locationSubject = PublishSubject.create();
   
   /**
    * Invoke this method when this LocationManager should start listening to location updates.
    */
   public void connect() {
       final LocationRequest locationRequest = new LocationRequest();
       locationRequest.setInterval(1000);
       locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY);

       LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() {
           @Override public void onLocationChanged(Location location) {
               locationSubject.onNext(location);
           }
       });
   }
   
   /**
    * Call to receive device location updates.
    * @return An Observable emitting location updates
    */
   public Observable<Location> observeLocation() {
       return locationSubject;
   }
}

在此新实现中, 使用子类型PublishSubject, 该子类型在事件从订阅时间开始到达时发出事件。因此, 如果在已经发出位置更新的时间点进行订阅, 则观察者将不会接收到过去的发射, 而只会接收后续的发射。如果不需要这种行为, 则可以使用RxJava工具包中的几个其他Subject子类型。

如何在Android上使用响应式建模简化并发3

此外, 我们还创建了一个单独的connect()函数, 该函数启动请求以接收位置更新。 observeLocation()仍然可以进行connect()调用, 但是为了清楚/简单起见, 我们将其从函数中重构出来。

本文总结

我们研究了多种机制和技术:

  • defer()及其变体将计算的执行延迟到订阅为止
  • 通过create()生成的冷Observable
  • 使用主题的热门可观察物
  • 当我们想离开反应世界时, blockX操作

希望本文提供的示例启发了有关你应用中不同领域的一些想法, 这些想法可以转换为反应性。我们已经介绍了很多内容, 如果你有任何问题, 建议或不清楚的地方, 请在下面发表评论!

如果你有兴趣了解有关RxJava的更多信息, 我正在研究一本深入的书, 其中介绍了如何使用Android示例以反应方式查看问题。如果你想接收更新, 请在这里订阅。

微信公众号
手机浏览(小程序)
0
分享到:
没有账号? 忘记密码?