RxJava解析

从源码角度分析该库成员及其作用

Posted by CH on May 12, 2020

前言

本篇文章基于Rxjava1.3.8版本源码解析

说起RxJava 在Android开发者眼里 他是一个基于事件流的异步库 相比于其他的异步库例如AsyncTask等 优势在于对逻辑的整理 将功能作为事件流发送 事件流具有可转换,过滤,整合的特性 并且可被观测 保证最终接收到的为处理完毕的数据

这里准备后续用几篇文章 来阐述自身对RxJava这个库的理解 主要介绍涉及的类和其作用 并尝试解决下面的几个问题

1.RxJava如何组织数据的流向 如何从发送者到接收者 这个过程中 如何保证数据流能被中间层处理并继续转发

2.RxJava操作符的原理 操作符是如何操作数据流的 转换 过滤 整合 Transformer转换器 如何做到线程切换 如何保证线程安全

3.Rxjava在实际项目中的应用 处理复杂逻辑的案例(文件下载上传 mediaId,acl,cwk,加解密)

4.RxJava的隐患 哪些问题会阻碍业务流程运行 (背压,onError后不再执行)

5.将自己的理解 输出一份类图或者流程图 方便理解记忆

正文

RxJava是基于观察者模式的扩展

传统的观察者模式 可以用下面的简洁代码来描述

    class Observer{
        void onObservableChanged(){
            
        }
    }
    
    class Observable{
        List<Observer> observers;
        
        void registerObserver(Observer observer){
            observers.add(observer);
        }
        
        void onDataChanged(){
           for(item in observers){
               item.onObservableChanged();
           }
        }
    }

被观察者内部维护了观察者的队列 队列的增删对应着观察者的注册和注销 当被观察者发生变化时 循环遍历观察者并调用其监听函数

Rxjava与传统的观察者不同在于其对监听事件的扩展 RxJava 的事件回调方法 除了普通事件 onNext() (相当于 onClick() / onEvent())之外, 还定义了两个特殊的事件:onCompleted() 和 onError()

两者互斥

onCompleted()表示当前事件发送完毕的

onError()用于抛出程序执行期间 异常报错的事件 (同时队列自动终止,不允许再有事件发出。(对于某些业务来说不符合要求))

接着我们来看下Rxjava库的成员组成

成员解析

Observer & Subscriber 观察者(事件的消费者)

Observer是一个接口 他定义了作为一个观察者具备的行为 通常这部分需要我们自己实现接收最终的数据

public interface Observer<T> {
     void onCompleted(); 
     void onError(Throwable e);
     void onNext(T t);
}

在Rxjava中观察者和被观察者是一种订阅的关系(subscribe) 由被观察订阅观察者,通过Observable.subscribe(Observer)建立订阅关系

Rxjava观察者的另一种形式Subscriber

它是一种抽象类 实现 Observer和Subscription接口,是一种具备订阅功能的Observer

其扩展的onStart方法 在事件发送之前执行(发生在subscribe线程),和unsubscribe()方法 这个方法用来取消订阅 取消订阅之后将不再收到事件

为了合理的使用资源 在合适的时机调用此方法 可以避免内存泄漏 比如在Activity.onDestroy()处调用,像Retrofit和OkHttp网络框架 会代理生命周期 自动处理

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    // subscriptions内部存储了订阅事件的集合 可以允许将一套业务的订阅统一进行解绑
    // private List<Subscription> subscriptions;
    private final SubscriptionList subscriptions;
    
    public void onStart() { }
}

public interface Subscription {
    //取消订阅 释放引用
    void unsubscribe();
    //是否被释放
    boolean isUnsubscribed();
}
Observable 被观察者 (事件的生产者)
//T为事件数据的类型
public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    //当构建Observable对象时 会将OnSubscribe对象作为全局变量 方便后续数据处理和发送事件
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    //订阅观察者 之后我们会详细解析这部分的源码 这里可以理解为被观察者已持有观察者的引用
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    } 
    //从字面理解即 订阅发生在 这里用来切换函数执行的线程 这里线程根据业务定制
    public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
        return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }    
    //从字面理解即 监听发生在 这里用来切换函数执行的线程 通常为主线程
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }
    //作为事件的生产者 也承担着操作数据流的功能 这样的函数有很多
    public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        return merge(map(func));
    }    
}
Subject

Subject内即有Observer又有Observable 作为两者之间的中间层既可以作为可观察对象 接收第一手的发送数据 并进行代理, 作为观察者它也可以接收数据 并释放新的数据

这样就出现了不同功能的Subject来处理不同的业务场景 它的存在正是为了解决定制不同的数据流接收的问题 比如ReplaySubject,该Subject会接收数据,当被订阅时,无论何时将所有接收到的数据全部发送给订阅者。 可以用来做粘性消息 ReplaySubject 还有其他的Subject也具备不同的功能 有兴趣可以深入研究

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
    public abstract boolean hasObservers();
}
Schedulers 线程调度器 (用于获取不同的线程池)

线程调度器可以理解为对ThreadExecutorService线程池的封装

根据业务需求的不同封装为不同的线程调度器 方便数据流在不同的线程执行

其本质最终是会调用下列方法 让其在线程中执行

ScheduledExecutorService.execute(runnable)

类解析

public final class Schedulers {
    
    static final Scheduler SINGLE; //单线程单任务 可复用线程
    
    static final Scheduler COMPUTATION; //计算型
    
    static final Scheduler IO; //读写型
    
    static final Scheduler TRAMPOLINE; //排队执行任务
    
    static final Scheduler NEW_THREAD;//为每个任务新建线程执行
    
    //类加载阶段就已经创建对应的Scheduler
    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask()); //IOTask本质为IoScheduler对象 

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }  
    
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
}

这里拿IO的Scheduler举例 其继承于Scheduler抽象类

作为调度器具备的函数 内部都是对Worker实际工作者的调度

调度器->工作线程池->工作线程

public abstract class Scheduler {
    //创建对应的worker 具体线程执行函数发生在其中
    public abstract Worker createWorker();
    //直接调度
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    //周期调度
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        
    }
}

IoScheduler继承Scheduler抽象类 实现其具体个性

public final class IoScheduler extends Scheduler {
    final ThreadFactory threadFactory; //线程工厂
    final AtomicReference<CachedWorkerPool> pool; //缓存的工作池 内部有队列 ConcurrentLinkedQueue 用于存储和获取Worker
    ...
}

接着我们看实际的工作类Worker 静态抽象类 实现Disposable 表示任务是可以弃用的

public abstract static class Worker implements Disposable {
    //功能比较重要的即调度
     public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
}

Worker的实现为NewThreadWorker 调度函数会调用scheduleActual

private final ScheduledExecutorService executor;

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    //1.封装runnable
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    //2.如果不需要延迟 执行用线程池允许 需要延迟 调度执行
     Future<?> f;
     if (delayTime <= 0) {
         f = executor.submit((Callable<Object>)sr);
     } else {
         f = executor.schedule((Callable<Object>)sr, delayTime, unit);
     }     
}

从上面的代码 我们知道传入的runnable对象已经由线程池执行 实现了子线程的切换 至于具体执行Runnable是什么 之后在线程切换部分再讲

Publisher & Flowable

Publisher是数据的提供者, 将数据发布给订阅者 Flowable用来解决背压问题 即上游发送事件 下游处理不及时导致的阻塞问题
Flowable对应Observable FlowableEmitter对应对应ObservableEmitter Subscriber对应Observer Subscription对于Disposable 一次性的操作类

Flowable的整体框架和Observable没有区别,仅仅是换了不同的类,但是实现的功能大体一致。

具体我们在背压一节讲解

public interface Publisher<T> {
    //这是一个工厂方法,每次调用的时候启动一个新的Subscription,绑定的Subscriber会消耗来自Publisher的信号。
    public void subscribe(Subscriber<? super T> s);
}

//Flowable实现Publisher 
public abstract class Flowable<T> implements Publisher<T> {
    
}

结语

许久未更 回顾了之前写的文章 感慨只是Android开发的冰山一角 这两年工作下来 忙于业务对自身的沉淀 颇有懈怠 准备重拾blog继续沉淀 不仅限于代码解析 也会分享一些 这些年工作过程当中遇到的问题的通用解决方法 沉淀出一套方案 在此记录 此篇RxJava既是自己一直以来对其不理解的解惑 也是对自己积累知识的输出 愿君共勉