前言
本篇文章基于Rxjava1.3.8版本源码解析
上篇文章 我们介绍了RxJava的成员类之间是如何组合串联整个数据流的
本篇文章 我们主要来阐述Rxjava的重要功能操作符转换器和线程切换
通过理清这些能让我们更好的操作数据达成我们业务的需要
正文
操作符
所谓操作符 即对数据流处理的函数 因为事件数据的处理自然是针对生产者故而操作符的方法 大部分封装在Observable类中
操作符可以按功能类型区分为 转换 过滤 整合
过滤
filter 过滤
ofType 过滤指定类型
take 只发射开始的N项数据或者一定时间内的数据
takeLast 只发射最后的N项数据或者一定时间内的数据
first 只发射第一项(或者满足某个条件的第一项)数据
skip 跳过开始的N项数据或者一定时间内的数据
elementAt 发射某一项数据
distinct 过滤重复数据
distinctUntilChanged 过滤掉连续重复的数据
throttleFirst 定期发射Observable发射的第一项数据
debounce
发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射
这个可以用来做抖动事件处理 避免重复请求和交互触发
timeout 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable。
takeUntil 当发射的数据满足某个条件后(包含该数据),或者第二个Observable发送完毕,终止第一个Observable发送数据。
takeWhile 当发射的数据满足某个条件时(不包含该数据),Observable终止发送数据。
整合
reduce 对序列使用reduce()函数并发射最终的结果
collect 使用collect收集数据到一个可变的数据结构。 比如将数据转为数组
merge
合并observable 不是按照顺序添加而是按照时间线连接,一遇到异常将停止发射数据,发送onError通知
而mergeDelayError 将异常延迟到其它没有错误的Observable发送完毕后才发射
merge在实际开发中可以运用到界面渲染 比如当前页面的数据加载逻辑为 先加载本地数据(数据库or缓存)如果有网络请求则覆盖
可以将本地数据作为Observable和网络数据的Observable merge会按照时间顺序发射数据 渲染页面
接着我们来看下源码
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return merge(new Observable[] { t1, t2 }); //将需要合并的封装为数组
}
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences)); //将多个observable通过from封装为一个
}
public static <T> Observable<T> from(T[] array) {
return unsafeCreate(new OnSubscribeFromArray<T>(array));
}
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(false));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
public static <T> OperatorMerge<T> instance(boolean delayErrors) { //是否需要将errorDelay
if (delayErrors) {
return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
}
return (OperatorMerge<T>)HolderNoDelay.INSTANCE; //无延迟合并的操作符单例
}
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
//操作符接口继承Func1单参数单返回 参数和返回为不同类型的Subscriber
}
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>>{
@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber); //添加订阅项
child.setProducer(producer); //setProducer即发送request的意思
return subscriber;
}
}
static final class MergeProducer<T> extends AtomicLong implements Producer {
final MergeSubscriber<T> subscriber;
@Override
public void request(long n) {
if (n > 0) {
if (get() == Long.MAX_VALUE) {
return;
}
BackpressureUtils.getAndAddRequest(this, n);
subscriber.emit();
}
}
public long produced(int n) { //生产完毕 序列号置空
return addAndGet(-n);
}
}
static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
//当前类也是一个封装的subscriber他会接收到合并的observable事件调用onNext
final Subscriber<? super T> child;
volatile Queue<Object> queue;//数据队列保存着合并数据 入队和出队
void emit() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
}
emitLoop();
}
@Override
public void onNext(T t) { //正是Observable.from自动发送的数据 所以正好是按发送时间发送事件进入队列遍历处理
parent.tryEmit(this, t);
}
@Override
public void onError(Throwable e) {
// Need to queue the error first before setting done, so that after emitLoop() removes the subscriber,
// it is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0,
// and at the same time the error queue was empty.
parent.getOrCreateErrorQueue().offer(e);
done = true;
parent.emit();
}
//用来发送多个事件 循环发送
void emitLoop() {
T v = NotificationLite.getValue(o);
// if child throws, report bounce it back immediately
try {
child.onNext(v); //执行我们subscribe的方法
} catch (Throwable t) {
if (!delayErrors) {
Exceptions.throwIfFatal(t);
skipFinal = true;
unsubscribe();
child.onError(t);
return;
}
getOrCreateErrorQueue().offer(t);
}
}
}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent; //我们设置的onSubscriber对象
final Operator<? extends R, ? super T> operator;
@Override
public void call(Subscriber<? super R> o) {
try {
//调用operator.call返回订阅者
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);//调用问设置的subscriber 传入订阅者 MergeSubscriber
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
这里涉及到基础操作符 lift 大部分的操作符内部都会用到 (核心为操作符对应的Subscriber和Producer)
我们阅读源码 当调用subscribe时会调用OnSubscribeLift.call 会调用operator.call MergeSubscriber合并的监听最终会分发到child 传入我们的subscriber,MergeProducer生产合并事件
整理一下 最终创建了Observable(OnSubscribeLift(onSubscribe,OperatorMerge))
当发生subscribe关系时会调用OnSubscribeLift的call方法
不同的操作符的实现原理与这个差不多 也是创建observable的OnSubscribeLift 传入onSubscribe和各自的operator接口
然后调用operator的call 会根据对应的操作 而设置对应的producer开始发送事件 也会设置subscriber封装用来接收数据
例如本例子的merge subscriber就将数据保存为队列通过非阻塞式循环的方式获取消息并分发到真正的subscriber
接下来的merge操作符就不一一分析源码了 只要知道这些操作符的具体功能和应用场景即可
concat 按顺序连接多个Observables 与merge的不同在于它是顺序的 等observable1发送完再发送2的
如果merge的业务场景是异步的场景则concat则为同步的场景 适合连续的流程处理
startWith 在数据序列的开头增加一项数据。startWith的内部也是调用了concat
zip 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合
这个操作符也可以做接口数据的聚合 等待接口数据回来将其统一作为list聚合展示
combineLatest
….
操作符讲解完毕之后 有一个感觉就是rxjava非常强大 如果我们整个应用的数据都用observable包装 那么能非常简便的做业务 有点类型kotlin这样的高级语言 将复杂的逻辑整合提供给开发者使用 并且内部操作符方法可以复用其他的 提高了代码的复用性
转换
toList 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
toSortedList 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
toMap 将序列数据转换为一个Map。我们可以根据数据项生成key和生成value
map 对Observable发射的每一项数据都应用一个函数来变换。
cast 在发射之前强制将Observable发射的所有数据转换为指定类型
flatMap 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并
可以理解为平铺
concatMap 类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射
groupBy 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
buffer: 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window: 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。
转换这部分我们主要分析flatMap源码
我们这边map返回的时一次的observable内部有列表 flatmap返回多个有列表的observable并执行数组subscriber的onnext会实现平铺的效果
Observable.from(getCollection())
.flatMap(new Func1<HomeHotCourse, Observable<HomeCourse>>() {
@Override
public Observable<HomeCourse> call(HomeHotCourse homeHotCourse) {
return Observable.from(homeHotCourse.getCourses());
}
})
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func)); //这里的erge
}
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
//merge了map的操作
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(false));
}
//根据我们之前的分析直接看OperatorMerge的subscriber和producer
//封装onSubscribe方法
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer; //转换方法也就是我们提供的
@Override
//当发生订阅关系时调用
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); //熟悉的调用方法
}
//封装subscriber
static final class MapSubscriber<T, R> extends Subscriber<T> {
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t); //通过转换行数
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
错误处理/重试机制
onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable。。
onExceptionResumeNext 与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常。
onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据。
retry: 当原始Observable在遇到错误时进行重试。
retryWhen: 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry。
其他
serialize: 强制Observable按次序发射数据并且要求功能是完好的
finallyDo/doAfterTerminate: 注册一个动作,在Observable完成时使用
delay: 延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)。
doOnUnsubscribe: 注册一个动作,在观察者取消订阅时使用。内部由OperatorDoOnUnsubscribe实现,在call中加入一个解绑动作。
doOnSubscribe 注册一个动作,在观察者订阅时使用。内部由OperatorDoOnSubscribe实现
cache: 缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者
doOnEach: 注册一个动作,对Observable发射的每个数据项使用
转换器Transformer
线程切换原理
在实际的业务开发中 我们常用rxjava来进行接口异步请求 通常为下列代码调用
observable.subscribeOn(Schedulers.io()) //subscriber.call发生在子线程
.observeOn(AndroidSchedulers.mainThread()) //subscribe.xxx发生在主线程
.subscribe
.....
subscribeOn和observeOn方法指定了call发生的线程和subscribe发生的线程 那么它内部是如何做到线程切换呢 我们深入源码
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
//HOOK onSubscribe
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
@Override
public void call(final Subscriber<? super T> subscriber) {
//当订阅关系场景 会根据schedule创建对应的worker
final Worker inner = scheduler.createWorker();
//根据第一篇文章我们对schedule的分析 这里的worker即
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
inner.schedule(parent);
}
//分发线程相关subscriber数据
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> actual;
final boolean requestOn;
final Worker worker;
Observable<T> source;
Thread t;
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}
@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
}
}
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable{
final Runnable actual;
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
}
从上面的代码我们可以看到
rxjava会根据传入的Schedule生成对应的ThreadWorker并将Subscriber作为runnable传入 线程任务处理器 f = executor.submit((Callable
分析完了子线程执行 那么是如何切换到主线程呢 在Rxjava中没有主线程区分 这个概念只存在在android所以提供了RxAndroid 来保证android相关的特色功能 包括主线程切换
.observeOn(AndroidSchedulers.mainThread()) 接着我们分析这行代码
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
//通过队列接收处理数据 等待主线程循环获取
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final boolean delayError;
final Queue<Object> queue;
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
public void call() {
for (;;) {
Object v = q.poll();
localChild.onNext(NotificationLite.<T>getValue(v));
}
}
}
public final class AndroidSchedulers {
private final Scheduler mainThreadScheduler;
private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}
}
class LooperScheduler extends Scheduler {
private final Handler handler;
LooperScheduler(Looper looper) {
handler = new Handler(looper);
}
LooperScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
static class HandlerWorker extends Worker {
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
}
}
当开启HandlerWorker 会将子线程发送过来的数据 封装为message 并发送给handler处理 我们知道handler消息机制 会将传来的runnable处理运行 这里的runnable即ObserveOnSubscriber 收到消息封装到queue 并执行schedule recursiveScheduler.schedule(this); this为自身 即调用自身operation.call
final Subscriber<? super T> localChild = this.child; 即我们传入的observer
for (;;) { 主线程 循环 发送事件 用到了管道机制 类似looper.loop
localChild.onNext(NotificationLite.
AndroidSchedulers.mainThread() 切换主线程后 用handler发送subscriber的消息 并从handler拿出来
ObserveOnSubscriber接收到onNext会将消息放到队列中 继续会调用主线程的worker去schedule去 即切换线程执行call方法 然后通过looper循环队列取出消息执行onNext等
结语
这是RxJava的第三篇 本篇主要将Rxjava提高的操作符整理并解析部分原理 知道内部是如何实现的
他是如何提高给我们这么强大的功能
接着下篇 我们来看RxJava在实际项目中的应用 这边我会用项目中的一个文件预览这个功能来做一个示范
这个过程中我们可以看到许多流程 文件下载-文件解密-文件预览 如何在这个过程中保证解密这个流程是可插拔的