RxJava代码组织

从源码角度分析其成员的关系

Posted by CH on May 14, 2020

前言

本篇文章基于Rxjava1.3.8版本源码解析

上篇文章 我们介绍了RxJava的成员类和各个类承担的作用

本篇文章 我们主要来阐述类之间的联系 是如何组合在一起 形成整个事件流的生产和消费

通过理清这些能让我们更好的理解这个库所带的思想-也就是基于事件的调度方式

正文

在阐述开始前让我们看下下列代码

  class RxFunction{
      public void func(){ 
       Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onStart();
                subscriber.onNext(1);
                if(error){
                    subscriber.onError(new Throwable());
                }else{
                    subscriber.onCompleted();
                }
            }
        }).subscribe(new Subscriber<Integer>() {

            @Override
            public void onStart() {
                Log.d(TAG,"onStart");
            }

            @Override
            public void onCompleted() {
                Log.d(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG,"onError,"+e.getMessage());
            }

            @Override
            public void onNext(Integer o) {
                Log.d(TAG,"onNext,"+o);
            }
        });
      }
  }

这部分代码是RxJava调用的一种形式

首先Observable的创建和订阅 其中涉及到两个匿名内部类 Subscriber和Observable.OnSubscribe

函数的效果是创建被观察者对象并注册订阅者 当订阅关系生成时 会调用call函数 触发订阅者的回调函数 形成通知关系

它的目的主要是形成自上而下的数据传递关系 更符合我们的逻辑顺序

这也是为什么让被观察者订阅观察者而不是我们现实逻辑中的观众订阅频道这种形式

observable.subscribe(observer) 而不是observer.subscribe(observable)

流程解析

Observable的创建

Observable作为可观察对象 必有数据变化后的通知函数 在RxJava中这些通知函数为一次事件分发

每个执行函数onStart() onNext()都可以当作一个事件

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onStart();
                subscriber.onNext(1);
                if(error){
                    subscriber.onError(new Throwable());
                }else{
                    subscriber.onCompleted();
                }
            }
        })

上述代码创建了Observable.OnSubscribe对象 从字面上理解 他是Observable发生订阅关系时创建的类 内部肯定是订阅关系发生时做的事情

我们接着从源码上跟进

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
public interface Action1<T> extends Action {
    void call(T t);
}    
public interface Action extends Function {

}
public interface Function {

}

我们可以发现 一整个继承链 OnSubscribe继承自接口Action1 那Action1是什么呢 它继承自Action和Function 这些又是什么?

RxJava在源码中有一个文件夹Functions 存放这些类 这些类主要是对函数的抽象 有点函数式编程的味道 像lambda就是将方法转换为对象持有 方便我们以对象.函数的形式调用

Function为所有函数的抽象 而Action是其上的实现 代表无返回值函数 参数个数从0-9分别对应Action0~Action9

而ActionN的参数则为可变数组,同理Func0~Func9分别对应参数个数0~9的有返回值的函数

而OnSubscribe继承Action1表示其为一个含有call函数的对象且函数为单参数 且参数类型为发送事件的类型 也就是我们在泛型T处传入的类型

也就是我们发送事件携带的数据类型。

接着我们看下Observable.create方法会将这个对象如何处理

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}
    
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }    
    
static volatile Func1<Observable.OnSubscribe, Observable.OnSubscribe> onObservableCreate;

onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
            }
        };
   
   public abstract class RxJavaObservableExecutionHook {
        public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
            return f;
        }
   }

这个方法会创建新的Observable对象 构造方法传入Observable.OnSubscribe

RxJavaHook 流程代理

我们先来看下RxJavaHooks.onCreate(f)这个方法 这个类基本上在rxjava操作的每个地方都有

从类名看它是RxJava的代理 那么它是如何对流程进行Hook呢 这里以创建Observable为例

onObservableCreate为单参单返回的函数对象 从代理的角度可以理解后续他的操作无非是 传入A->B(A)->B.A.call()->返回A

这样的一套代理行为

而这里的代码也是如此 因为默认的实现并不会影响我们的解析流程 我们可以简单的理解为传入什么即返回什么

Observable构造

接着我们来看Observable的构造

    final OnSubscribe<T> onSubscribe;
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

他将OnSubscribe对象作为全局变量建立引用关系 作为其触发方法

除了这个部分 Observable还提供了其他类型数据订阅的封装

1.Observable.from 可以接收数组类型 迭代类型 和Future对象(可以理解为可取消的runnable) 2.Observable.just 原理是传输Object[]数组 调用from ….

我们可以直接传入数据类型直接subscribe就可以接收到数据

那其是如何将数据直接按照对应顺序发送事件的 难道不需要进行onNext等事件的调用吗

这里我们来看from方法的源码

public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        return unsafeCreate(new OnSubscribeFromArray<T>(array));
    }
    
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
     final T[] array;
     public OnSubscribeFromArray(T[] array) {
        this.array = array;
     }
    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }
}

可以看到 会创建一个封装的Subscriber对象 将列表数据传入其中 它的call方法 会为subscriber设置事件的生产者

生产者 Producer

producer的出现是为了解决背压的问题 即上下游处理数据速率不同出现的数据堆积问题 比如 observable 发得太快,subscriber 处理不过来 要怎么做

让 subscriber 向 observable 主动请求数据 subscriber 不请求,observable 就不发出数据。 它俩相互协调,避免出现过多的数据,而协调的桥梁,就是 producer 所以我们会在一些rxjava代码中看到 observer观察者中调用request方法

public interface Producer { //生产者-负责生产事件
    void request(long n);
}

request方法即在合适的时机主动去发数据 背压 request参数 为Long.MAX_VALUE 表示不需要背压 request参数是可叠加的 如果数量超出Long.MAX_VALUE 则参数会被忽略

我们再来看设置生产者setProducer这个方法

  protected final void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("number requested cannot be negative: " + n);
        }

        // if producer is set then we will request from it
        // otherwise we increase the requested count by n
        Producer producerToRequestFrom;
        synchronized (this) {
            if (producer != null) {
                producerToRequestFrom = producer;
            } else {
                addToRequested(n);
                return;
            }
        }
        // after releasing lock (we should not make requests holding a lock)
        producerToRequestFrom.request(n);
    }
    
private void addToRequested(long n) {
        if (requested == NOT_SET) {
            requested = n;
        } else {
            final long total = requested + n;
            // check if overflow occurred
            if (total < 0) {
                requested = Long.MAX_VALUE;
            } else {
                requested = total;
            }
        }
    }
public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) { //对象级别锁 避免这个对象在多个线程运行造成数据错乱 同步设置producer和请求序号
            toRequest = requested; //requested表示当前的请求序号 如果请求过request就会变化
            producer = p; 
            if (subscriber != null) {
                // 当请求发生过 下边代码忽略
                if (toRequest == NOT_SET) { 
                    //如果设置了当前的producer并且没有请求过 则忽略让下个producer处理
                    passToSubscriber = true;
                }
            }
        }
        // 当subscriber锁被释放 producer被正确设置变量 直接设置producer
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else { //无论是否有请求过直接request
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE); //非背压请求
            } else {
                producer.request(toRequest); //背压请求
            }
        }
    }

这里采用对象锁 不同线程 相同subscriber对象 执行这段代码块会按顺序执行 如果这个对象 没有被请求过 会一直循环请求 直到request add 如果为第一次请求以非背压式执行 否的话通过背压执行

再来看下request方法 当调用过request之后才会requested序列才会改变 方可让producer执行request方法

static final class FromArrayProducer<T>  extends AtomicLong implements Producer {
    
     final Subscriber<? super T> child;
     final T[] array;
    int index;

        public FromArrayProducer(Subscriber<? super T> child, T[] array) {
            this.child = child;
            this.array = array;
        }
    
          @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) { 
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }
        
         void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) { //遍历循环 执行onNext传输数组数据
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted(); //执行onCompleted
        }
}

上述代码为数组producer的实现 request方法 判断如果不需要背压则执行fastPath 如果需要执行slowPath

fastPath是依次发送所有的数据

而slowPath是发送指定n个数据 内部会进行循环 直到数据发送完毕或者 传入的序号为0 循环才会结束

经过以上这些 我们才是看完了一个Observable的数据封装和传递

Observable.subscribe建立订阅关系,接收生产者事件
  public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this); //
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
         subscriber.onStart();
         try {
             RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
             return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
              if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                }
            }
        }
    }
    
    

核心代码在于 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

RxJavaHooks代理传入返回的是 observable.onSubscribe 即我们创建observer构造函数时传的参数 onSubscribe这里执行了call函数

也就是最开始那段代码 发送事件数据的函数

并且返回 RxJavaHooks.onObservableReturn(subscriber); 也就是当前的subscriber对象 方便链式调用

并且在Try catch异常中 我们可以看到这样的一段代码

subscriber.onError(RxJavaHooks.onObservableError(e)); 会发送onError事件 捕获异常

而我们也可以通过Rxjava提供的统一拦截器处理共性

到此为止 我们已分析完毕 被观察者的创建 数据事件的生产 订阅的建立 和事件的分发 对Rxjava的理解 也迈入了基础的一步

基于这些理解目前我们可以产出这样一份类的关系图 帮助我们回忆巩固

Rxjava类图

结语

这是RxJava的第二篇 本篇主要将上篇的成员建立联系来形成Rxjava逻辑上的串联 借此来理解函数式编程的一角 接着下篇 我们来看RxJava如何对原始流进行操作 来满足我们的业务的定制 又是如何做到线程的切换 普遍运用与我们的网络请求