使用RxJava中遇到的一些”坑“

发布时间 - 2026-01-11 01:22:39    点击率:

前言

大家越用RxJava,越觉得它好用,所以不知不觉地发现代码里到处都是RxJava的身影。然而,RxJava也不是银弹,其中仍然有很多问题需要解决。这里,我简单地总结一下自己遇到的一些“坑”,内容上可能会比较松散。

一、考虑主线程的切换

RxJava中一个常用的使用方法是——在其他线程中做处理,然后切换到UI线程中去更新页面。其中,线程切换就是使用了observeOn()。后台下载文件,前台显示下载进度就可以使用这种方式完成。然而,实践发现这其中有坑。如果文件比较大,而下载包的粒度又比较小,这将导致很多通知积压下来,最终导致错误。

这种错误其实也是可以理解的,毕竟MainLooper是根据Message来工作的,Message过多必然会导致一些问题。当然,这还是比较想当然的想法,最终还是需要到源码中一探究竟。ObserveOn的原理在前面关于RxJava的文章已经有过分析,这里还是简单列一下代码。其中的重点还是OperatorObserveOn的内部类——ObserveOnSubscriber。其重要代码片段如下:

 /** Observe through individual queue per observer. */
 static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
  final Subscriber<? super T> child;
  final Scheduler.Worker recursiveScheduler;
  final NotificationLite<T> on;
  final boolean delayError;
  final Queue<Object> queue;
  /** The emission threshold that should trigger a replenishing request. */
  final int limit;

  // the status of the current stream
  volatile boolean finished;

  final AtomicLong requested = new AtomicLong();

  final AtomicLong counter = new AtomicLong();

  /**
   * The single exception if not null, should be written before setting finished (release) and read after
   * reading finished (acquire).
   */
  Throwable error;

  /** Remembers how many elements have been emitted before the requests run out. */
  long emitted;

  // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
  // not prevent anything downstream from consuming, which will happen if the Subscription is chained
  public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
   this.child = child;
   this.recursiveScheduler = scheduler.createWorker();
   this.delayError = delayError;
   this.on = NotificationLite.instance();
   int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
   // this formula calculates the 75% of the bufferSize, rounded up to the next integer
   this.limit = calculatedSize - (calculatedSize >> 2);
   if (UnsafeAccess.isUnsafeAvailable()) {
    queue = new SpscArrayQueue<Object>(calculatedSize);
   } else {
    queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
   }
   // signal that this is an async operator capable of receiving this many
   request(calculatedSize);
  }

  void init() {
   // don't want this code in the constructor because `this` can escape through the
   // setProducer call
   Subscriber<? super T> localChild = child;

   localChild.setProducer(new Producer() {

    @Override
    public void request(long n) {
     if (n > 0L) {
      BackpressureUtils.getAndAddRequest(requested, n);
      schedule();
     }
    }

   });
   localChild.add(recursiveScheduler);
   localChild.add(this);
  }

  @Override
  public void onNext(final T t) {
   if (isUnsubscribed() || finished) {
    return;
   }
   if (!queue.offer(on.next(t))) {
    onError(new MissingBackpressureException());
    return;
   }
   schedule();
  }

  @Override
  public void onCompleted() {
   if (isUnsubscribed() || finished) {
    return;
   }
   finished = true;
   schedule();
  }

  @Override
  public void onError(final Throwable e) {
   if (isUnsubscribed() || finished) {
    RxJavaHooks.onError(e);
    return;
   }
   error = e;
   finished = true;
   schedule();
  }

  protected void schedule() {
   if (counter.getAndIncrement() == 0) {
    recursiveScheduler.schedule(this);
   }
  }
 }

关键点就在于这个queue成员,这个队列存放了需要进行发送给下行线程的消息。对于主线程来说,符合其实是比较重的,从消息的生产者和消费者的模式讲,过多过快的消息会导致消息阻塞。甚至,都到不了阻塞的情况,因为queue的大小会有上限,在onNext()方法中的queue.offer()可能会产生异常,这取决于queue的实现方式。但无论如何都不可能无限大,所以无法保证绝对不出异常。

解决这个问题的方法其实也很简单,可以在生产者降低消息的产生频率。也可以在消息处理的时候先不进行线程切换,而是通过判断,在必要的时候进行线程切换,比如使用runOnUIThread()

二、RxJava避免内存泄漏

RxJava的响应式机制本质上还是回调实现的,因此内存泄漏也是会出现的。倘若不对Subscription进行管理,内存泄漏会非常严重。对于Subscription,其实有几个比较广泛使用的方法,比如RxLifecycle,以及简单的CompositeSubscription。至于它们的使用方法,其实都非常简单,这里就不赘述了。

说到内存泄漏,就谈点题外话,动画也可能导致内存泄漏。其原因仍然是一些回调函数,这些回调函数实现的View变化的功能,但是在被撤销以后,回调函数没有取消掉,同时View可能持有Context信息,从而导致内存泄漏。最近才发现,LoadToastView这个开源库一直存在内存泄漏,其原因正如上文所说。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。


# android  # rxjava的使用  # rxjava使用  # rxjava的坑  # RxJava入门指南及其在Android开发中的使用示例  # android使用RxJava实现预加载  # android非RxJava环境下使用Handler实现预加载  # 回调  # 都是  # 会有  # 都不  # 不出  # 有很多  # 就不  # 说到  # 也很  # 有过  # 才发现  # 想当然  # 有几个  # 仍然是  # 这篇文章  # 中去  # 比较大  # 较小  # 谢谢大家  # 好用 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: C#如何调用原生C++ COM对象详解  android nfc常用标签读取总结  独立制作一个网站多少钱,建立网站需要花多少钱?  Laravel怎么判断请求类型_Laravel Request isMethod用法  Laravel如何使用Blade模板引擎?(完整语法和示例)  LinuxShell函数封装方法_脚本复用设计思路【教程】  智能起名网站制作软件有哪些,制作logo的软件?  如何快速上传自定义模板至建站之星?  三星网站视频制作教程下载,三星w23网页如何全屏?  如何安全更换建站之星模板并保留数据?  Laravel如何实现邮箱地址验证功能_Laravel邮件验证流程与配置  EditPlus中的正则表达式 实战(2)  宙斯浏览器文件分类查看教程 快速筛选视频文档与图片方法  如何用wdcp快速搭建高效网站?  三星、SK海力士获美批准:可向中国出口芯片制造设备  CSS3怎么给轮播图加过渡动画_transition加transform实现【技巧】  如何挑选优质建站一级代理提升网站排名?  WordPress 子目录安装中正确处理脚本路径的完整指南  Win11应用商店下载慢怎么办 Win11更改DNS提速下载【修复】  详解CentOS6.5 安装 MySQL5.1.71的方法  laravel怎么配置和使用PHP-FPM来优化性能_laravel PHP-FPM配置与性能优化方法  Laravel怎么实现一对多关联查询_Laravel Eloquent模型关系定义与预加载【实战】  如何快速生成高效建站系统源代码?  香港服务器部署网站为何提示未备案?  Laravel路由Route怎么设置_Laravel基础路由定义与参数传递规则【详解】  Python自动化办公教程_ExcelWordPDF批量处理案例  Laravel如何连接多个数据库_Laravel多数据库连接配置与切换教程  如何快速搭建FTP站点实现文件共享?  Windows11怎样设置电源计划_Windows11电源计划调整攻略【指南】  历史网站制作软件,华为如何找回被删除的网站?  如何利用DOS批处理实现定时关机操作详解  图册素材网站设计制作软件,图册的导出方式有几种?  制作无缝贴图网站有哪些,3dmax无缝贴图怎么调?  详解Android中Activity的四大启动模式实验简述  浅谈javascript alert和confirm的美化  Gemini手机端怎么发图片_Gemini手机端发图方法【步骤】  Laravel Eloquent模型如何创建_Laravel ORM基础之Model创建与使用教程  如何在IIS中新建站点并配置端口与IP地址?  详解Nginx + Tomcat 反向代理 如何在高效的在一台服务器部署多个站点  Laravel事件和监听器如何实现_Laravel Events & Listeners解耦应用的实战教程  教你用AI将一段旋律扩展成一首完整的曲子  浅谈redis在项目中的应用  如何快速搭建高效香港服务器网站?  如何用虚拟主机快速搭建网站?详细步骤解析  Python图片处理进阶教程_Pillow滤镜与图像增强  php json中文编码为null的解决办法  网站制作怎么样才能赚钱,用自己的电脑做服务器架设网站有什么利弊,能赚钱吗?  如何快速搭建高效简练网站?  Laravel如何使用API Resources格式化JSON响应_Laravel数据资源封装与格式化输出  香港服务器建站指南:免备案优势与SEO优化技巧全解析