微信扫一扫

028-83195727 , 15928970361
business@forhy.com

[置顶] RxJava详解,由浅入深。长篇巨作,超详细的用法。

RxJava,Android,异步,RxAndroid,观察者模式2016-11-24

RxJava:https://github.com/ReactiveX/RxJava

前言

使用了RxJava有一段时间了,深深感受到了其“牛逼”之处。下面,就从RxJava的基础开始,一步一步与大家分享一下这个强大的异步库的用法!

RxJava 概念初步理解

RxJava Repo上给的解释是:“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.” 大概就是说RxJava是Java VM上一个灵活的、使用可观测序列来组成的一个异步的、基于事件的库。

作用 - 异步

前面这段解释,重点就在于异步!刚接触RxJava的童鞋,可能会觉得特别难,无从下手,没事,相信通过这篇文章,大伙儿可以有一个比较深刻的理解!

RxJava可以浓缩为异步两个字,其核心的东西不外乎两个, Observables(被观察者)Subscribers(观察者)。Observables可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算、数据库操作、文件读取等等,事件执行结束后交给Subscribers的回调处理。

模式 - 观察者模式

观察者模式是对象的行为模式,也叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。 例如用过EventBus童鞋就知道,EventBus属于Publish/Subscribe模式。所以,使用RxJava也可以设计出一套事件总线的库,那就是RxBus。有兴趣的话可以在学完RxJava之后,去体验一下。这里就不细说了~

那么,RxJava也是一种扩展的观察者模式!

什么是观察者模式?举个栗子,Android中View的点击监听器的实现,View是被观察者,OnClickListener对象是观察者,Activity要如何知道View被点击了?那就是派一个OnClickListener对象,入驻View,与View达成一个订阅关系,一旦View被点击了,就通过OnClickListener对象的OnClick方法传达给Activity。采用观察者模式可以避免去轮询检查,节约有限的cpu资源。

结构 - 响应式编程

什么是响应式编程?举个栗子,a = b + c; 这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。

响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:

Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber

这个流程,可以简单的理解为:

  1. Observable发出一系列事件,他是事件的产生者;
  2. Subscriber负责处理事件,他是事件的消费者;
  3. Operator是对Observable发出的事件进行修改和变换;
  4. 若事件从产生到消费不需要其他处理,则可以省略掉中间的Operator,从而流程变为Obsevable -> Subscriber
  5. Subscriber通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的处理则交给Operator;

优势 - 逻辑简洁

Rx 优势可以概括为四个字,那就是 逻辑简洁。逻辑简洁并不意味着代码简洁,但是,由于链式结构,一条龙,你可以从头到尾,从上到下,很清楚的看到这个连式结构的执行顺序。对于开发人员来说,代码质量并不在于代码量,而在于逻辑清晰简洁,可维护性好,代码健壮!

另外,熟悉lambda的,还可以进一步提高代码的简洁性。举个简单栗子:

// 不使用lambda
Observable.just("Hello World!")
     .map(new Func1<String, String>() {
         @Override
         public String call(String s) {
             return s + "I am kyrie!";
         }
     })
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(new Action1<String>() {
         @Override
         public void call(String s) {
             Log.i(TAG, s);
         }
     });

// 使用lambda
Observable.just("Hello World!")
    .map(s -> s + "I am kyrie!")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(s -> {
        Log.i(TAG, s);
    });

RxJava 依赖

// Android 平台下须引入的一个依赖,主要用于线程控制
compile 'io.reactivex:rxandroid:1.1.0'
// RxJava
compile 'io.reactivex:rxjava:1.1.5'

这是我项目里面用的版本,可以到Maven/RxJava下获取最新版本。

RxJava 入门

事件产生

RxJava创建一个事件比较简单,由 Observable 通过 create 操作符来创建。举个栗子,还是经典的 HelloWorld~~

// 创建一个Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

    @Override
    public void call(Subscriber<? super String> subscriber) {
        // 发送一个 Hello World 事件
        subscriber.onNext("Hello World!");

        // 事件发送完成
        subscriber.onCompleted();
    }
});

这段代码可以理解为, Observable 发出了一个类型为 String ,值为 “Hello World!” 的事件,仅此而已。

对于 Subscriber 来说,通常onNext()可以多次调用,最后调用onCompleted()表示事件发送完成。

事件消费

有事件产生,自然也要有事件消费。RxJava 可以通过 subscribe 操作符,对上述事件进行消费。首先,先创建一个观察者。

// 创建一个Observer
Observer<String> observer = new Observer<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "complete");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, s);
    }
};

或者

// 创建一个Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "complete");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, s);
    }
};

这两种方式里面, Observer 是观察者, Subscriber 也是观察者,Subscriber 是一个实现了Observer接口的抽象类,对 Observer 进行了部分扩展,在使用上基本没有区别,多了发送之前调用的 onStart() 和解除订阅关系的 unsubscribe() 方法。

并且,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以在这之后的示例代码,都使用 Subscriber 来作为观察者。

最后,我们可以调用 subscribe 操作符, 进行事件订阅。

// 订阅事件
observable.subscribe(subscriber);

在 Subscriber 实现的三个方法中,顾名思义,对应三种不同状态:
1. onComplete(): 事件全部消费完成后回调
2. onError(Throwable t): 事件处理异常回调
3. onNext(T t): 每发出一个事件,回调一次

不完整定义回调

对于事件消费来说,好像为了打印一个“Hello World!”要废好大的劲… 其实,RxJava 提供了不完整定义的回调。我们可以为 Subscriber 中的三种状态根据自身需要分别创建一个回调动作 Action

// onComplete()
Action0 onCompleteAction = new Action0() {
    @Override
    public void call() {
        Log.i(TAG, "complete");
    }
};

// onNext(T t)
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, s);
    }
};

// onError(Throwable t)
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {

    }
};

那么,RxJava 支持以下三种不完全定义的回调。

observable.subscribe(onNextAction);

observable.subscribe(onNextAction, onErrorAction);

observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

我们可以根据当前需要,传入对应的 Action, RxJava 会相应的自动创建 Subscriber。

  1. Action0 表示一个无回调参数的Action;
  2. Action1 表示一个含有一个回调参数的Action;
  3. 当然,还有Action2 ~ Action9,分别对应2~9个参数的Action;
  4. 每个Action,都有一个 call() 方法,通过泛型,来指定对应参数的类型;

入门示例

前面讲解了事件的产生到消费的过程,下面就举个完整的例子。从res/mipmap中取出一张图片,显示在界面上。

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);

Observable.create(new Observable.OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {

        Drawable drawable = ContextCompat.getDrawable(mContext, R.mipmap.ic_launcher);

        subscriber.onNext(drawable);

        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<Drawable>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, e.toString());
    }

    @Override
    public void onNext(Drawable drawable) {
        ivLogo.setImageDrawable(drawable);
    }
});

RxJava 进阶

Scheduler线程控制

默认情况下,RxJava事件产生和消费均在同一个线程中,例如在主线程中调用,那么事件的产生和消费都在主线程。

那么问题来了,假如事件产生的过程是耗时操作,比如网络请求,结果显示在UI中,这个时候在主线程执行对于网络请求就不合适了,而在子线程执行,显示结果需要进行UI操作,同样不合适~~

所以,RxJava 的第一个牛逼之处在于可以自由切换线程!那么,如何做?

在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:
1. Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;
2. Schedulers.newThread(): 开启新线程操作;
3. Schedulers.immediate(): 默认指定的线程,也就是当前线程;
4. Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;
5. AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

我们可以通过 subscribeOn()observeOn() 这两个方法来进行线程调度。举个栗子:

依然还是显示一张图片,不同的是,这次是从网络上加载图片

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);

Observable.create(new Observable.OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {

        try {
            Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg").openStream(), "src");
            subscriber.onNext(drawable);
        } catch (IOException e) {
            subscriber.onError(e);
        }
    }
})
        // 指定 subscribe() 所在的线程,也就是上面call()方法调用的线程
        .subscribeOn(Schedulers.io())
        // 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Drawable>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }

            @Override
            public void onNext(Drawable drawable) {
                ivLogo.setImageDrawable(drawable);
            }
        });

所以,这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

变换

RxJava的又一牛逼之处,在于 变换。啥意思呢? 就是将发送的事件或事件序列,加工后转换成不同的事件或事件序列。

map操作符

变换的概念不好理解吧?举个简单的栗子,我们对上述示例 进行改写。

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {

        subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
    }
}).map(new Func1<String, Drawable>() {
    @Override
    public Drawable call(String url) {
        try {
            Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
            return drawable;
        } catch (IOException e) {

        }
        return null;
    }
})
        // 指定 subscribe() 所在的线程,也就是call()方法调用的线程
        .subscribeOn(Schedulers.io())
        // 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Drawable>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }

            @Override
            public void onNext(Drawable drawable) {
                if (drawable != null) {
                    ivLogo.setImageDrawable(drawable);
                }
            }
        });

经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是:

Observable<String> --> map变换 --> Observable<Drawable>

那么,Func1 是什么呢?与 Action1 类似,不同的是 FuncX 有返回值,而 ActionX 没有。为什么需要返回值呢?目的就在于对象的变换,由String对象转换为Drawable对象。同样,也有Func0 ~ Func9,对应不同的参数个数。

当然了,RxJava 的变换,可不止于map这么简单,继续往下!

flatMap操作符

不难发现,上述的 map 操作符,是一对一的变换,并且返回的是变换后的对象。而 flatMap 操作符可以适应一对多,并且返回的是一个 Observable 。应用场景举例:例如一个员工负责多个任务,现在要打印所有员工的所有任务。

final List<Employee> list = new ArrayList<Employee>() {
    {
        add(new Employee("jackson", missions1));
        add(new Employee("sunny", missions2));
    }
};
Observable.from(list)
        .flatMap(new Func1<Employee, Observable<Employee.Mission>>() {
            @Override
            public Observable<Employee.Mission> call(Employee employee) {
                return Observable.from(employee.missions);
            }
        })
        .subscribe(new Subscriber<Employee.Mission>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Employee.Mission mission) {
                Log.i(TAG, mission.desc);
            }
        });

通过上面的代码可以看出,mapflatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

  1. flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
  2. flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
  3. flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;
  4. map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
  5. 可以对一个Observable多次使用 mapflatMap

鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

Github上的 README.md 文件,通常是 MarkDown 语法。我们要获取 README.md 内容并按 MarkDown 风格显示在UI上,就可以通过以下方式(Retrofit2 + RxJava,稍后会介绍):

new ReadmeContentClient()
    // 获取md语法的Readme内容, 返回的是一个Observable<String>对象
    .getReadme()
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String md) {
            // 由于Readme的内容是md语法,需要转成html字符串通过WebView显示到UI
            // 返回的也是Observable<String>对象
            return new MarkDownStyleClient(md)
                            .formatMarkStyle();
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "readme:" + e.toString());
        }

        @Override
        public void onNext(String html) {
            // html就是根据readme md格式内容,生成的html代码
            view.showReadme(html);
        }
    });

RxJava 其他常用操作符

  1. from
    接收一个集合作为输入,然后每次输出一个元素给subscriber。

    // Observable.from(T[] params)
    Observable.from(new Integer[]{1, 2, 3, 4, 5})
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.i(TAG, "number:" + number);
            }
        });

    注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用create();

  2. just
    接收一个可变参数作为输入,最终也是生成数组,调用from(),然后每次输出一个元素给subscriber。

    // Observable.just(T... params),params的个数为1 ~ 10
    Observable.just(1, 2, 3, 4, 5)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.i(TAG, "number:" + number);
            }
        });
  3. filter
    条件过滤,去除不符合某些条件的事件。举个栗子:

    Observable.from(new Integer[]{1, 2, 3, 4, 5})
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer number) {
                // 偶数返回true,则表示剔除奇数,留下偶数
                return number % 2 == 0;
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.i(TAG, "number:" + number);
            }
        });
  4. take
    最多保留的事件数。

  5. doOnNext
    在处理下一个事件之前要做的事。

    Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer number) {
                // 偶数返回true,则表示剔除奇数
                return number % 2 == 0;
            }
        })
        // 最多保留三个,也就是最后剩三个偶数
        .take(3)
        .doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                // 在输出偶数之前输出它的hashCode
                Log.i(TAG, "hahcode = " + number.hashCode() + "");
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.i(TAG, "number = " + number);
            }
        });

    输出如下:

    hahcode = 2
    number = 2
    hahcode = 4
    number = 4
    hahcode = 6
    number = 6
  6. debounce
    通俗点讲,就是N个事件发生的时间间隔太近,就过滤掉前N-1个事件,保留最后一个事件。debounce可以指定这个时间间隔!可以用在SearchEditText请求关键词的地方,SearchEditText的内容变化太快,可以抵制频繁请求关键词,后面第15条15.Subject会介绍这个。为了演示效果,先举个简单栗子:

    Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                int i = 0;
                int[] times = new int[]{100, 1000};
                while (true) {
                    i++;
                    if (i >= Integer.MAX_VALUE - 1)
                        break;
                    subscriber.onNext(i);
                    try {
                        // 注意!!!!
                        // 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉
                        // 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉
                        Thread.sleep(times[i % 2]);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onCompleted();
            }
        })
        // 间隔400ms以内的事件将被丢弃
        .debounce(400, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "complete");
            }
    
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            }
    
            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "integer = " + integer);
            }
        });
    

    输出结果:

    11-23 10:44:45.167 MainActivity: integer = 1
    11-23 10:44:46.270 MainActivity: integer = 3
    11-23 10:44:47.373 MainActivity: integer = 5
    11-23 10:44:48.470 MainActivity: integer = 7
    11-23 10:44:49.570 MainActivity: integer = 9
    11-23 10:44:50.671 MainActivity: integer = 11
    11-23 10:44:51.772 MainActivity: integer = 13
    11-23 10:44:52.872 MainActivity: integer = 15
    11-23 10:44:53.973 MainActivity: integer = 17
    ...

    我们设置过滤条件为400ms,可以发现,奇数正常输出,因为在它的下一个事件事件隔了1000ms,所以它不会被过滤掉;偶数被过滤掉,是因为它距离下一个事件(奇数)只隔了100ms。并且,输出的两个事件相隔大约为 100ms + 1000ms = 1100ms

  7. merge
    用于合并两个Observable为一个Observable。较为简单。

    Observable.merge(Observable1, Observable2)
        .subscribe(subscriber);
  8. concat
    顺序执行多个Observable,个数为1 ~ 9。例子稍后与first操作符一起~~

  9. compose
    flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。

    1. compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在 flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
    2. compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
    3. flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。
    4. 建议使用 compose 代替 flatMap
    5. first
      只发送符合条件的第一个事件。可以与前面的contact操作符,做网络缓存。举个栗子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

      // 从缓存获取
      Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {
          @Override
          public void call(Subscriber<? super BookList> subscriber) {
              BookList list = getFromDisk();
              if (list != null) {
                  subscriber.onNext(list);
              } else {
                  subscriber.onCompleted();
              }
          }
      });
      
      // 从网络获取
      Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();
      
      Observable.concat(fromDisk, fromNetWork)
              // 如果缓存不为null,则不再进行网络请求。反之
              .first()
              .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new Subscriber<BookList>() {
                  @Override
                  public void onCompleted() {
      
                  }
      
                  @Override
                  public void onError(Throwable e) {
      
                  }
      
                  @Override
                  public void onNext(BookList discussionList) {
      
                  }
              });

      网络缓存用法,具体可参见我的项目:https://github.com/JustWayward/BookReader

    6. timer
      可以做定时操作,换句话讲,就是延迟执行。事件间隔由timer控制。举个栗子:两秒后输出“Hello World!”

      Observable.timer(2, TimeUnit.SECONDS)
          .subscribe(new Subscriber<Long>() {
              @Override
              public void onCompleted() {
      
              }
      
              @Override
              public void onError(Throwable e) {
      
              }
      
              @Override
              public void onNext(Long aLong) {
                  Log.i(TAG, "Hello World!");
              }
          });
    7. interval
      定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制。举个栗子:每隔两秒输出“Hello World!”

      Observable.interval(2, TimeUnit.SECONDS)
          .subscribe(new Subscriber<Long>() {
              @Override
              public void onCompleted() {
      
              }
      
              @Override
              public void onError(Throwable e) {
      
              }
      
              @Override
              public void onNext(Long aLong) {
                  Log.i(TAG, "Hello World!");
              }
          });
    8. throttleFirst
      与debounce类似,也是时间间隔太短,就丢弃事件。可以用于防抖操作,比如防止双击。

      RxView.clicks(button)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {
      
            }
      
            @Override
            public void onError(Throwable e) {
      
            }
      
            @Override
            public void onNext(Object o) {
                 Log.i(TAG, "do clicked!");
            }
        });

      上面这个RxView详见:https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定,JakeWharton大神的项目,厉害。

    9. Single
      Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

      Single.create(new Single.OnSubscribe<Object>() {
          @Override
          public void call(SingleSubscriber<? super Object> subscriber) {
              subscriber.onSuccess("Hello");
          }
      }).subscribe(new SingleSubscriber<Object>() {
          @Override
          public void onSuccess(Object value) {
              Log.i(TAG, value.toString());
          }
      
          @Override
          public void onError(Throwable error) {
      
          }
      });
    10. Subject
      Subject这个类,既是Observable又是Observer,啥意思呢?就是它自身既是事件的生产者,又是事件的消费者,相当于自身是一条管道,从一端进,又从另一端出。举个栗子:PublishSubject

      Subject subject = PublishSubject.create();
      
      // 1.由于Subject是Observable,所以进行订阅
      subject.subscribe(new Subscriber<Object>() {
          @Override
          public void onCompleted() {
      
          }
      
          @Override
          public void onError(Throwable e) {
      
          }
      
          @Override
          public void onNext(Object o) {
              Log.i(TAG, o.toString());
          }
      });
      
      // 2.由于Subject同时也是Observer,所以可以调用onNext发送数据
      subject.onNext("world");

      这个好像有点厉害的样子,哈哈。可以配合debounce,避免SearchEditText频繁请求。

      
      Subject subject = PublishSubject.create();
      
      subject.debounce(400, TimeUnit.MILLISECONDS)
              .subscribe(new Subscriber<Object>() {
              @Override
              public void onCompleted() {
      
              }
      
              @Override
              public void onError(Throwable e) {
      
              }
      
              @Override
              public void onNext(Object o) {
                  // request
              }
          });
      
      edittext.addTextChangedListener(new TextWatcher() {
      
          @Override 
          public void beforeTextChanged(CharSequence s, int start, int count, int after) { }
      
          @Override 
          public void onTextChanged(CharSequence s, int start, int before, int count) {
              subject.onNext(s.toString());
          }
      
          @Override 
          public void afterTextChanged(Editable s) { } 
      });
    11. RxJava 应用

      RxJava+Retrofit 的网络请求方式

      Retrofit是一个非常适合RestAPI的网络请求库。没用过的童鞋,还是推荐学一学的。

      使用Callback的请求方式:

      // 1. 定义一个请求接口
      @GET("/match/stat")
      Call<String> getMatchStat(@Query("mid") String mid, @Query("tabType") String tabType);
      
      // 2. 创建Service对象
      Retrofit retrofit = new Retrofit.Builder()
                              .baseUrl(BuildConfig.TENCENT_SERVER)
                              .addConverterFactory(ScalarsConverterFactory.create())
                              .client(OkHttpHelper.getTecentClient()).build();
      
      TencentApi api = retrofit.create(TencentApi.class);
      
      // 3. 调用
      Call<String> call = api.getMatchStat(mid, tabType);
      call.enqueue(new Callback<String>() {
          @Override
          public void onResponse(Call<String> call, Response<String> response) {
                  // 成功
              } else {
                  // 无数据
              }
          }
      
          @Override
          public void onFailure(Call<String> call, Throwable t) {
              // 失败
          }
      });

      与 RxJava 结合的方式,则是

      // 1. 定义请求接口,返回的是Observable对象
      @GET("/user/followers")
      Observable<List<User>> followers();
      
      // 2. 同样是创建api对象
      ...
      // 3. 请求
      api.followers()
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<List<User>>() {
              @Override
              public void onCompleted() {
      
              }
      
              @Override
              public void onError(Throwable e) {
                  // 请求出错。可能发生网络异常、Json解析异常等等
              }
      
              @Override
              public void onNext(List<User> list) {
                  // 请求成功
                  view.showMyFollowers(list);
              }
          });

      若需嵌套请求,比如先获取Token再进行才能进行登录,可参考flatMap操作符最后的获取Readme内容显示在WebView上的例子。

      Retrofit2 + RxJava + Dagger2: 具体可参见我的项目,里面有比较详细的用法。
      https://github.com/JustWayward/BookReader

      不难发现,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 以及 OnCompleted() 或在请求失败后调用 onError()

      :RxJava形式的请求,并不能减少代码量,但是逻辑非常清晰。假如请求到数据之后需要对数据进行处理,并且是耗时操作,难道要再开一个线程,或者用AsyncTask再做一次异步?很显然,RxJava的变换很好的解决了这个问题,依然会使逻辑结构清晰。

      RxBus

      准确的来说,是一种基于RxJava实现事件总线的一种思想。可以替代EventBus/Otto,因为他们都依赖于观察者模式。可以参考https://github.com/AndroidKnife/RxBus这个库。

      RxBinding

      前面介绍过了,JakeWharton大神的项目,https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定。

      RxJava 的一些坑

      未取消订阅而引起的内存泄漏

      举个栗子,对于前面常用操作符12.interval做周期性操作的例子,并没有使之停下来的,没有去控制订阅的生命周期,这样,就有可能引发内存泄漏。所以,在Activity#onDestroy()的时候或者不需要继续执行的时候应该取消订阅。

      Subscription subscription = Observable.interval(2, TimeUnit.SECONDS)
          .subscribe(new Subscriber<Long>() {
              @Override
              public void onCompleted() {
      
              }
      
              @Override
              public void onError(Throwable e) {
      
              }
      
              @Override
              public void onNext(Long aLong) {
                  Log.i(TAG, "Hello World!");
              }
          });
      // 调用unsubscribe();方法进行取消订阅
      subscription.unsubscribe();

      但是,如果有很多个数据源,那岂不是要取消很多次?当然不是的,可以利用 CompositeSubscription, 相当于一个 Subscription 集合。

      CompositeSubscription list = new CompositeSubscription();
      list.add(subscription1);
      list.add(subscription2);
      list.add(subscription3);
      
      // 统一调用一次unsubscribe,就可以把所有的订阅都取消
      list.unsubscribe();

      总结

      相信到了这里,大家对RxJava应该有了一个比较清晰的理解。当然,实践出真知,还是要去尝试,才能更深层次的体会到其强大之处。

      最后,总结一下RxJava的基本使用过程。

      1. 首先是创建事件源源,也就是被观察者,可以用Observable的create/just/from等方法来创建;
      2. 通过filter/debounce等操作符,进行自定义事件过滤;
      3. 通过Schedules进行事件发送和订阅的线程控制,也就是subscribeOn()observeOn();
      4. 通过map/flatMap/compose等操作符,进行事件的变换;
      5. 调用subscribe进行事件订阅;
      6. 最后,不要忘了对订阅者生命周期的控制,不用的时候,记得调用unsubscribe(),以免引发内存泄漏。

      感谢阅读!