介绍
ReactiveX
主要是响应式编程(Reactive Program),它集合了观察者模式,迭代器模式,函数式编程的优点,是一种面向集合数据流和变化传播的编程范式,是基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programmingIt is sometimes called “functional reactive programming” but this is a misnomer. ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time.    
Java 8 新特性
Java 8 中引入了很多针对响应式的新特性,如 Lambda 表达式,函数式接口,MAP,Filter,Stream 流式操作等等。其中 JDK8 的 Stream 是一个受到函数式编程和多核时代影响而产生的东西。java.util.stream 包,实现了集合的流式操作,流式操作包括集合的过滤,排序,映射等功能。根据流的操作性,又可以分为串行流和并行流。根据操作返回的结果不同,流式操作又分为中间操作和最终操作。大大方便了我们对于集合的操作:     
- 最终操作:返回一特定类型的结果
 - 中间操作:返回流本身
 
流式操作最大的特点是以疏代堵,流处理方式可以处理无限的数据,只需要消耗时间。因此流式是对数据量开放的,适用于处理大数据
RxJava
RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,是 JVM 的响应式扩展。最大的特点是异步,异步是通过一种扩展的观察者模式来实现的。使用了 Java 8 的新特性来实现 Reactive ExtendsionsRxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.RxJava 2.x will target Reactive Streams APIs directly for Java 8+    
仓库地址: https://github.com/ReactiveX/RxJava    
依赖
app/build.gradle 中增加依赖:   
1  | compile 'io.reactivex.rxjava2:rxjava:2.*.*'  | 
* 换成具体的版本号Sync Project 同步整个工程,自动下载 RxJava 相关依赖包      
基本概念
Observable
涉及到三个概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)。Observable.subscribe(Observer) 实现订阅关系, Observable 提供数据流,Observer 响应事件并处理数据    
- 数据流
Observable表示一个数据流集合,数据是通过Push的方式发出,示例图: 

You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.”
- 数据发射
如果已经拿到Observable数据流,RxJava会自动发射。如果新建Observable可以实现ObservableOnSubscribe通过代码控制手动发射 
Observer
Observer 提供接收数据流的处理机制,包含如下几个回调:     
void onSubscribe(@NonNull Disposable d);
表示订阅成功后,能拿到Disposable实例,通过它可以解除订阅关系void onNext(@NonNull T t);
表示接收数据,所有发射过来的数据都在这接收并处理void onError(@NonNull Throwable e);
表示事件队列异常,在事件处理过程中出异常时,该方法会被触发,同时队列自动终止,不允许再有事件发出void onComplete();
表示事件队列完结,RxJava规定:当不会再有新的事件发出时,需要触发该方法作为结束标志
注意:在一个正确运行的事件序列中,onCompleted 和 onError 有且只有一个,也就是说二者必须唯一并且互斥,在响应的队列中只能调用一个      
subscribe
理清了上面的概念后,将 Observable 和 Observer 订阅在一起,就实现了数据流的连通    
1  | // 第一步:新建 Observable  | 
Scheduler 线程调度器
在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的,也就是在哪个线程调用 subscribe,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的,Scheduler 就是来完成异步的    
subscribeOn
指定的就是发射事件的线程,即生产事件的线程
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用subscribeOn()只有第一次的有效,其余的会被忽略
RxJava中的所有的数据准备,发射的回调都是在这个线程中执行的
observeOn
指定的就是订阅者接收事件的线程,即事件消费的线程
但多次指定订阅者接收线程是可以的,也就是说每调用一次observerOn(),下游的线程就会切换一次
小结:如果不指定 subscribeOn 和 observeOn,事件就是同步执行的,也就是发射一个数据后同步消费一个数据,这样并没有体现 RxJava 的特点,所以实际中需要调用这两个接口指定具体的事件产生和消费的线程。示例 Log :   
1  | // 数据发射和消费都在同一个线程 3633  | 
系统自带的几个常用线程:
Schedulers.computation()
专为计算所使用的线程,计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,用于计算任务,例如图形的计算,事件循环等。使用的是固定的线程池,大小为CPU核数,即默认线程数等于处理器的数量。不要把I/O操作放该线程中,否则I/O操作的等待时间会浪费CPUSchedulers.newThread()
总是启用新线程,并在新线程执行操作Schedulers.io()I/O操作:读写文件、读写数据库、网络信息交互等所使用的线程, 行为模式和newThread差不多,区别在于io的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下更有效率。不要把计算工作放在io中,可以避免创建不必要的线程Schedulers.single()
先开线程,同时后台线程都在同一个线程中Schedulers.from(Executor)
使用指定的Executor作为调度器Schedulers.trampoline()
在当前线程排队执行,如果队列中已经存在其他任务AndroidSchedulers.mainThread()RxAndroid提供的线程, 在Android主线程中运行,用于事件消费时更新UI
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
小结:只有 trampoline() 是在当前线程中执行,其他都是新开线程,可以通过如下代码调试是在哪个线程中:Log.d(TAG,  "thread.name = " + Thread.currentThread().getName())   
如下示例适用于多数的 『后台线程取数据,主线程显示』的程序策略:
1  | Observable.just(1, 2, 3, 4)  | 
Backpressure 背压
背压是指在异步场景中,生产者发送事件速度远快于消费者的处理速度带来的问题。如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件,所以在这种情况下,根本不存在背压问题。即同步情况下,Backpressure 问题不存在。异步场景中解决背压问题就是流速控制的一种策略,要求生产者降低事件发射速度的策略      
1  | public enum BackpressureStrategy {  | 
BUFFER 模式默认队列大小为 128   
Flowable/Subscriber
RxJava 2.* 中,Observeable 用于订阅 Observer,是不支持背压的;而新引入的 Flowable 用于订阅 Subscriber,支持背压。差别图:  

Flowable/Subscriber 订阅示例,Flowable 需要指定背压策略:   
1  | Flowable.create(new FlowableOnSubscribe<Integer>() {  | 
在选择 Observable 和 Flowable 时,官方给出了一些建议,在数据量不是很大的时候(小于 1000)选择 Observable  
其他模式
除了上面的 Observable/Observer 和 Flowable/Subscriber 模式外, RxJava 还提供如下几种模式:  
- Single/SingleObserver
数据流只包含一个数据,只发射一个数据 - Completable/CompletableObserver
不发射数据(也就没有数据消费),只会回调onComplet - Maybe/MaybeObserver
上面两个的综合体,提供了以上两种功能供选择,但是只有一个功能能生效,也就是二选一。如果发射数据,发射一个数据;如果不发射数据,只能响应onComplete 
常见操作符
map
变换操作符,它的作用将类型装换,将上游类型转换为下游需要的类型
.map()
是一对一的类型转换:输入一个事件变量A,输出一个事件变量BflatMap
是一对多的类型转换(也就是事件拆分):将一个发送事件的上游Observable变换为多个发送事件的Observables。比如输入是一个事件List<T>,转换后输出是list.count个T的事件流。flatMap并不保证事件转换后的顺序,如果需要保证顺序则需要使用concatMap
zip
组合操作符,将多个 Observable 发送的事件组合到一起,然后发送这些组合事件,它按照严格的顺序应用这个函数。但只发射与发射数据项最少的那个Observable 一样多的数据,也就是事件数量和最少的那个保持一致    

filter
过滤操作符,把只有满足条件的事件才能发送给消费者
concat
连接操作符,将多个 Observable 发送的事件连接到一起,顺序发送所有事件。即将并行数据串行发出     

zip 是取最小那一组组合发送, concat 是将所有数据顺序连接发送   
示例及分析
总结
常见操作及转换
1  | // 定义一个 T 的集合  | 
问题
事件分解和组合
RxJava 有没有提供将一个事件分解成多个事件?有将多个事件过滤,减少,或者将多个事件组合等
思路:通过 flatmap 分解成数据流,使用 filter 过滤该事件流,distinct 去重,toList 重新组合   
参考文档
- https://github.com/ReactiveX/RxJava/wiki
 - http://reactivex.io
 - http://reactivex.io/documentation/observable.html
 - https://yq.aliyun.com/articles/63660
 - http://gank.io/post/560e15be2dca930e00da1083
 - http://www.jianshu.com/p/464fa025229e
 - https://github.com/lzyzsd/Awesome-RxJava
 - http://blog.csdn.net/maplejaw_/article/details/52381395
 - https://zhuanlan.zhihu.com/p/24482660?refer=dreawer
 - RxJava合集