简单谈谈RxJava和多线程并发
发布时间 - 2026-01-10 23:24:15 点击率:次前言

相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的。
RxJava与并发
首先让我们来看一段RxJava协议的原文:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
如上所述,RxJava对多线程并发其实并没有做非常的多保护,这段话中说,如果多个Observables从多个线程中发射数据,必须要满足happens-before原则。
下面来看一个简单的例子:
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
unSafeCount = unSafeCount + integer;
Log.d("TAG", "onNext: " + unSafeCount);
}
});
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
final int unit = 1;
for(int i = 0;i < 10;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
subject.onNext(unit);
}
}
}).start();
}
}
});
这是一个最典型的多线程问题,从10个线程中发射数据并相加,这样最终得到的答案是小于10000的。虽然使用了RxJava,但是这样的使用对于并发是没有意义的,因为RxJava并没有去处理并发带来的问题。我们可以看下subject的onNext方法的源码,里面很简单,就是调用了对应observer的onNext方法而已。不止是这样,绝大多数的Subject都是线程不安全的,所以当你在使用这样的类的时候(典型场景就是自制的RxBus),如果从多个线程中发射数据,那你就要小心了。
对于这样的问题,有两种解决方案:
第一种就是简单的使用传统的解决方法,比如用AtomicInteger代替int。
第二种则是使用RxJava的解决方案,在这里就是用SerializedSubject去代替Subject:
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
unSafeCount = unSafeCount + integer;
count.addAndGet(integer);
Log.d("TAG", "onNext: " + count);
}
});
final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
final int unit = 1;
for(int i = 0;i < 10;i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0;j < 1000;j++){
ser.onNext(unit);
}
}
}).start();
}
}
});
可以看一下SerializedSubject的onNext方法做了什么:
@Override
public void onNext(T t) {
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
if (emitting) {
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
list.add(nl.next(t));
return;
}
emitting = true;
}
try {
actual.onNext(t);
} catch (Throwable e) {
terminated = true;
Exceptions.throwOrReport(e, actual, t);
return;
}
for (;;) {
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
FastList list;
synchronized (this) {
list = queue;
if (list == null) {
emitting = false;
return;
}
queue = null;
}
for (Object o : list.array) {
if (o == null) {
break;
}
try {
if (nl.accept(actual, o)) {
terminated = true;
return;
}
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
}
}
}
处理方式很简单,如果有其他线程在发射数据,那就将数据放置到队列中,等待下次发射。这保证了同一时间只会有一个线程调用onNext,onComplete和onError这些方法。
但是这样操作显然是会造成性能的影响的,所以RxJava并不会把所有的操作都打上线程安全的标签。
在这里就要引申出一个问题,那就是使用者对create方法的滥用,其实这个方法不应该被使用者频繁的调用的,因为你必须要小心的处理所有的数据发射,接收的逻辑。相反的,使用已有的操作符能很好的解决这个问题,所以下次大家在遇到问题的时候不要简单的使用create去自己写,而是应该想想有没有现成的操作符可以完成相应的需求。
RxJava中的一些操作符
RxJava中有一些操作符也和多线程并发有关,下面让我来讲一讲merge和concat,以及他们的一些变种操作符。
对于多线程发射数据,有时候我们需要得到的结果也保持和发射时候一样的顺序,这个时候如果我们使用merge这个操作符去结合多个发射源,那么就会产生一定的问题了(例子中做了非常不好的示范——使用了create操作符,请大家不要学习这样的写法,这里单纯是为了求证结果)。
Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
subscriber.onNext(1);
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(2);
subscriber.onCompleted();
}
});
Observable.merge(o1,o2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
Log.d("TAG", "onNext: " + i);
}
});
对于这样的场景,我们得到的答案将是2,1而不是先得到o1发射的数据,再获取o2的数据。
究其原因,就是因为merge其实就是给什么传什么,也不会去管数据发射的顺序:
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
可以看到在经过lift操作之后,对应的中间人MergeSubscriber的onNext,没有什么多余的代码,所以在多个Observable从多线程中发射数据的时候,顺序当然不能得到保证。
一个单词说明这个问题:interleaving——交错。merge后的数据源可能是交错的。由于merge有这样数据交错的问题,所以它的变种—flatMap也会有同样的问题。
对于这样的场景,我们可以使用concat操作符来完成:
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.
根据文档,我们知道concat操作符是一个接一个的处理数据源的数据的。
if (wip.getAndIncrement() != 0) {
return;
}
final int delayErrorMode = this.delayErrorMode;
for (;;) {
if (actual.isUnsubscribed()) {
return;
}
if (!active) {
if (delayErrorMode == BOUNDARY) {
if (error.get() != null) {
Throwable ex = ExceptionsUtils.terminate(error);
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
}
boolean mainDone = done;
Object v = queue.poll();
boolean empty = v == null;
if (mainDone && empty) {
Throwable ex = ExceptionsUtils.terminate(error);
if (ex == null) {
actual.onCompleted();
} else
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
if (!empty) {
Observable<? extends R> source;
try {
source = mapper.call(NotificationLite.<T>instance().getValue(v));
} catch (Throwable mapperError) {
Exceptions.throwIfFatal(mapperError);
drainError(mapperError);
return;
}
if (source == null) {
drainError(new NullPointerException("The source returned by the mapper was null"));
return;
}
if (source != Observable.empty()) {
if (source instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
active = true;
arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
} else {
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
inner.set(innerSubscriber);
if (!innerSubscriber.isUnsubscribed()) {
active = true;
source.unsafeSubscribe(innerSubscriber);
} else {
return;
}
}
request(1);
} else {
request(1);
continue;
}
}
}
if (wip.decrementAndGet() == 0) {
break;
}
}
通过源码我们可以知道,active字段就保证了如果上一个数据源还没有发射完数据,就会一直在for循环中等待,直到上一个数据源发射完了数据重置了active字段。
对于concat,其实还存在一个问题,那就是多个Observable变成了串行,会大大的增加整个RxJava事件流的处理时间,对于这个场景,我们可以使用concatEager来解决。concatEager的源码就不带大家分析了,有兴趣的同学可以自行查看。
总结
这篇文章比较短,讲的东西也比较浅显,其实就是讨论了一下RxJava中多线程并发的几个问题。最后我想说,RxJava并不是什么高大上的东西,在你的项目引入之前,要考虑一下是否真的有必要这么做。就算真的有场景需要RxJava,也请不要一口气把项目中所有的操作都换成RxJava,一些简单的操作不一定需要使用RxJava的操作符的实现,用了反而降低了代码的可读性,切勿为了使用Rx而使用Rx。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。
# rxjava
# 多线程并发
# 多线程
# 并发
# Java多线程并发执行demo代码实例
# Java常见面试题之多线程和高并发详解
# Java多线程并发编程 并发三大要素
# JAVA多线程并发下的单例模式应用
# 使用java的HttpClient实现多线程并发
# Java多线程和并发基础面试题(问答形式)
# 深入探究Java多线程并发编程的要点
# java多线程并发executorservice(任务调度)类
# java多线程和并发包入门示例
# 详解Java多线程与并发
# 多个
# 就会
# 在这里
# 我们可以
# 上一
# 很简单
# 可以使用
# 一个问题
# 这篇文章
# 要小心
# 都是
# 是一个
# 那就是
# 下次
# 他们的
# 使用了
# 也不
# 让我
# 还没有
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
如何用IIS7快速搭建并优化网站站点?
如何快速选择适合个人网站的云服务器配置?
PHP的CURL方法curl_setopt()函数案例介绍(抓取网页,POST数据)
Laravel如何使用Blade模板引擎?(完整语法和示例)
Laravel怎么清理缓存_Laravel optimize clear命令详解
Laravel如何监控和管理失败的队列任务_Laravel失败任务处理与监控
Laravel如何实现本地化和多语言支持_Laravel多语言配置与翻译文件管理
google浏览器怎么清理缓存_谷歌浏览器清除缓存加速详细步骤
Windows驱动无法加载错误解决方法_驱动签名验证失败处理步骤
浅谈Javascript中的Label语句
教你用AI将一段旋律扩展成一首完整的曲子
java中使用zxing批量生成二维码立牌
Laravel怎么实现观察者模式Observer_Laravel模型事件监听与解耦开发【指南】
JS去除重复并统计数量的实现方法
为什么要用作用域操作符_php中访问类常量与静态属性的优势【解答】
详解Huffman编码算法之Java实现
详解ASP.NET 生成二维码实例(采用ThoughtWorks.QRCode和QrCode.Net两种方式)
如何安全更换建站之星模板并保留数据?
javascript基于原型链的继承及call和apply函数用法分析
微信小程序 scroll-view组件实现列表页实例代码
微信h5制作网站有哪些,免费微信H5页面制作工具?
Python并发异常传播_错误处理解析【教程】
html5怎么画眼睛_HT5用Canvas或SVG画眼球瞳孔加JS控制动态【绘制】
如何在香港服务器上快速搭建免备案网站?
Laravel观察者模式如何使用_Laravel Model Observer配置
Laravel distinct去重查询_Laravel Eloquent去重方法
新三国志曹操传主线渭水交兵攻略
nginx修改上传文件大小限制的方法
Laravel如何自定义错误页面(404, 500)?(代码示例)
Laravel中的Facade(门面)到底是什么原理
Laravel怎么在Controller之外的地方验证数据
消息称 OpenAI 正研发的神秘硬件设备或为智能笔,富士康代工
想要更高端的建设网站,这些原则一定要坚持!
Laravel Seeder填充数据教程_Laravel模型工厂Factory使用
如何用虚拟主机快速搭建网站?详细步骤解析
Laravel怎么自定义错误页面_Laravel修改404和500页面模板
极客网站有哪些,DoNews、36氪、爱范儿、虎嗅、雷锋网、极客公园这些互联网媒体网站有什么差异?
Android滚轮选择时间控件使用详解
Internet Explorer官网直接进入 IE浏览器在线体验版网址
弹幕视频网站制作教程下载,弹幕视频网站是什么意思?
Win11应用商店下载慢怎么办 Win11更改DNS提速下载【修复】
如何生成腾讯云建站专用兑换码?
html5的keygen标签为什么废弃_替代方案说明【解答】
Laravel如何理解并使用服务容器(Service Container)_Laravel依赖注入与容器绑定说明
Laravel Debugbar怎么安装_Laravel调试工具栏配置指南
深圳防火门网站制作公司,深圳中天明防火门怎么编码?
浅析上传头像示例及其注意事项
如何彻底删除建站之星生成的Banner?
西安市网站制作公司,哪个相亲网站比较好?西安比较好的相亲网站?
Laravel API资源类怎么用_Laravel API Resource数据转换

