本篇都是基于RxJava2.0的操作符说明,由于操作符太多,本篇只列举常用的操作符说明
创建操作符
-
create
通过实现ObservableOnSubscribe接口的subscribe来创建,该方法有一个事件发射器ObservableEmitter,通过它来向下游发射事件。
onNext:发射事件
onComplete:表示事件全部发射完成,后续再调用onNext不生效
onError:表示事件发射异常,和onComplete相冲突,事件要么全部发射完成,要么异常终止。调用后再onNext发射事件不生效。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) { emitter.onNext("Android"); emitter.onNext("iOS"); emitter.onNext("Sailfish OS"); emitter.onComplete(); emitter.onNext("Meego");//不生效 } });
-
just
把传入的参数依次发射出去
Observable.just("Android","iOS","Sailfish OS") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("Alee",s); } });
-
from
-
fromArray
把一个可变参数(数组)依次发射出去
Observable.fromArray("Android","iOS","Sailfish OS") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("Alee",s); } });
-
fromFuture
Observable.fromFuture(Executors.newSingleThreadExecutor().submit(new Callable<String>() { public String call() { Thread.sleep(3000); return "Alee"; } })) .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } });
-
fromCallable
Observable.fromCallable(new Callable<String>() { @Override public String call() { return "萨拉黑"; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } });
-
fromIterable
List<String> list = new ArrayList<>(); list.add("撒"); list.add("啦"); list.add("嘿"); Observable.fromIterable(list).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } });
-
fromPublisher
Observable.fromPublisher(new Publisher<String>() { @Override public void subscribe(Subscriber<? super String> s) { s.onNext("Android"); s.onNext("iOS"); s.onNext("Sailfish OS"); s.onComplete(); s.onNext("Others");//不生效 } }).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } });
-
-
timer
通过Timer操作符创建一个延迟发射的Observable,发射的是一个Long型的0,等同于Android的Handler中的postDelay()方法。
Observable.timer(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { Log.e("Alee",String.valueOf(aLong)); //打印0 } });
-
interval
通过interval操作符创建一个可延迟发射且按照固定时间间隔发射自增整数的Observable,一般用来进行倒计时,或者计时器等
//1秒后每间隔一秒发射一个Long值出去 ,数值从0开始,每次加1,一共发射10次 Observable.interval(1,1,TimeUnit.SECONDS) .take(10) //take 只取前10次发射的事件 .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { Log.e("Alee",String.valueOf(aLong));//打印 0 到 9 } });
-
repeat
通过repeat操作符创建一个重复发射特定数据的Observable。
Observable.just(1).repeat(3).subscribe(new Consumer<Integer>() { int count; @Override public void accept(Integer integer) {//integer 始终等于1 Log.e("Alee",String.valueOf(++count));//打印 1 ,2 ,3 } });
-
empty
创建一个空的(什么事件都不发射的)直接通知完成的Observable
Observable.empty().subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object s) { Log.e("Alee",s.toString());//不会走到这儿 } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.e("Alee","complete");//会走到这儿 } });
-
error
同empty一样,创建一个什么事件都不发射只通知错误的Observable
Observable.error(new Throwable("oops, you have no money")).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object s) { Log.e("Alee",s.toString()); } @Override public void onError(Throwable e) { Log.e("Alee",e.getMessage());//会走到这儿 } @Override public void onComplete() { Log.e("Alee","complete"); } });
-
never
创建一个不发射任何事件且没有任何通知完成或者错误的Observable
Observable.never().subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.e("Alee","onSubscribe"); //只会走到这儿 } @Override public void onNext(Object o) { Log.e("Alee","onNext"); } @Override public void onError(Throwable e) { Log.e("Alee","onError"); } @Override public void onComplete() { Log.e("Alee","onComplete"); } });
-
range
创建一个发射指定范围的整型事件的Observable
Observable.range(1,9) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); //打印1-9 } });
-
defer
只有当订阅者订阅后才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅的时候调用Callable的call()方法创建Observable。
Observable.defer(new Callable<ObservableSource<String>>() { @Override public ObservableSource<String> call() { return Observable.just("hello world"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } });
合并操作符
-
concat
按照顺序连接多个同事件类型的Observable,比如发射Integer事件的Observable和发射String事件的Observable是不能concat的。另外
a.concatWith(b)
其内部也是调用Observable.concat(this,b)
。Observable<String> observable1 = Observable.just("撒","啦"); Observable<String> observable2 = Observable.just("嘿","哟"); Observable.concat(observable1,observable2).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); //打印 撒 啦 嘿 哟 } });
-
startWith
其内部也是调用concat连接两个Observable,只是把后面的Observable连接到前面而已
Observable.just("爱","你","哟") .startWith("Alee") .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s);// 打印 Alee 爱 你 哟 } });
-
merge
顾名思义,就是将多个Observable合并成一个。其与concat不同的地方在于merge不是按照事件顺序而是按照时间线来连接的。需要注意的是,采用merge合并如果遇到异常,则停止发射事件,并发送onError通知。另一个mergeDelayError可以将异常延迟到其他没有错误的Observable发射完成后才发送onError通知。
Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { for (int i = 0; i < 3; i++) { try { Thread.sleep(1000); emitter.onNext(i); } catch (InterruptedException e) { e.printStackTrace(); emitter.onError(new Throwable(e.getMessage())); } } emitter.onComplete(); } }).subscribeOn(Schedulers.io());//订阅在io线程 Observable observable2 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { for (int i = 10; i <= 30; i+=10) { try { Thread.sleep(1000); emitter.onNext(i); } catch (InterruptedException e) { e.printStackTrace(); emitter.onError(new Throwable(e.getMessage())); } } emitter.onComplete(); } }).subscribeOn(Schedulers.io());//订阅在io线程 //在Android主线程观察接收事件 Observable.merge(observable1,observable2).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); } });
打印结果:
08-20 11:47:19.131 18326-18326/com.aleej.demo E/Alee: 10 08-20 11:47:19.132 18326-18326/com.aleej.demo E/Alee: 0 08-20 11:47:20.133 18326-18326/com.aleej.demo E/Alee: 20 08-20 11:47:20.134 18326-18326/com.aleej.demo E/Alee: 1 08-20 11:47:21.135 18326-18326/com.aleej.demo E/Alee: 2 08-20 11:47:21.136 18326-18326/com.aleej.demo E/Alee: 30
-
zip
组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数量不一样,则以最少的Observable为标准进行压合。
Observable<Integer> observable1 = Observable.just(1,2,3,4); Observable<String> observable2 = Observable.just("a","b","c"); Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) { return String.format(Locale.getDefault(),"%d->%s",integer,s); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); //打印 1->a 2->b 3->c } });
-
combineLatest
当两个Observable中的任何一个发射了一个数据时,通过combineLatest组合每个Observable发射的最新数据(共两个数据),然后发射这个函数的结果。和zip类似,区别在于zip只在每个Observable都发射了数据才压合,combineLatest在任何一个Observable发射了数据都进行压合,每次与另一个Observable最近的数据进行压合。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws InterruptedException { emitter.onNext(1); Thread.sleep(1000); emitter.onNext(2); Thread.sleep(5000); emitter.onNext(3); Thread.sleep(500); emitter.onNext(4); Thread.sleep(1000); emitter.onNext(5); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws InterruptedException { Thread.sleep(500); emitter.onNext("A"); Thread.sleep(1000); emitter.onNext("B"); Thread.sleep(1000); emitter.onNext("C"); Thread.sleep(100); emitter.onNext("D"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) { return String.format(Locale.getDefault(),"%d%s",integer,s); } }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } });
打印结果
08-20 14:26:22.447 22751-22751/com.aleej.demo E/Alee: 1A 08-20 14:26:22.946 22751-22751/com.aleej.demo E/Alee: 2A 08-20 14:26:23.451 22751-22751/com.aleej.demo E/Alee: 2B 08-20 14:26:24.457 22751-22751/com.aleej.demo E/Alee: 2C 08-20 14:26:24.563 22751-22751/com.aleej.demo E/Alee: 2D 08-20 14:26:27.953 22751-22751/com.aleej.demo E/Alee: 3D 08-20 14:26:28.459 22751-22751/com.aleej.demo E/Alee: 4D 08-20 14:26:29.466 22751-22751/com.aleej.demo E/Alee: 5D
过滤操作符
-
filter
通过指定条件过滤数据
Observable.just(1,2,3,4,5) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) { return integer % 2 == 0; //只要能被2整除的值 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); //打印 2 4 } });
-
ofType
通过指定类型过滤数据
Observable.just("A",2,"c",4,5.0F,6L,7.77,0x8) .ofType(String.class) //过滤出所有String 类型的值 .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); //打印 A c } });
-
take
只发射开始的前N项数据,或者一定时间内的数据。
Observable.just(1,3,5,7,9) .take(3)//取前三项数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer));//打印 1 3 5 } });
-
takeLast
只发射最后的N项数据,或者一定时间内的数据。
Observable.just(1,3,5,7,9) .takeLast(3)//取最后3项数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); //打印 5 7 9 } });
-
first
只发射第一项数据。需要指定默认值
Observable.just("A","B","C") .first("Def") .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); //打印A } });
-
last
只发射最后一项数据。需要指定默认值
Observable.just("A","B","C") .last("Def") .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s);//打印 C } });
-
skip
跳过开始的N项数据或者一定时间内的数据。
Observable.just(1,2,3,4,5) .skip(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); //打印 3 4 5 } });
-
skipLast
跳过最后的N项数据或者一定时间内的数据。
Observable.just("1","2","3","4") .skipLast(2) .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s);//打印 1 2 } });
-
elementAt
发射某一项数据,如果超过了范围则可以指定默认值
Observable.just(1,2,3,4,5) .elementAt(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer));//打印 4 } });
-
elementAtOrError
发射某一项数据,如果超过了范围则发送一个NoSuchElementException的Error通知。
Observable.just(1,2,3,4,5) .elementAtOrError(6) .subscribe(new BiConsumer<Integer, Throwable>() { @Override public void accept(Integer integer, Throwable throwable) { //打印 java.util.NoSuchElementException Log.e("Alee",String.valueOf(throwable.toString())); } });
-
ignoreElements
丢弃所有数据,只发射错误或者正常终于的通知。
-
distinct
过滤重复数据。
Observable.just(1,2,3,2,4,2,3,4,5,9) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer));// 打印 1 2 3 4 5 9 } });
-
distinctUntilChanged
过滤连续重复的数据。
Observable.just(1,2,2,2,4,2,3,4,5,9) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer));// 打印 1 2 4 2 3 4 5 9 } });
-
throttleFirst
一定时间内取第一次发射的事件。可用于防止按钮在一段时间内重复点击(防抖动)
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 100; i++) { emitter.onNext(i); Thread.sleep(10); //每发送一个事件,休眠10毫秒 } emitter.onComplete(); } }).throttleFirst(100,TimeUnit.MILLISECONDS) //取每100毫秒内第一次发射的事件 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); } });
打印结果
08-21 09:54:26.633 9038-9038/com.aleej.demo E/Alee: 0 08-21 09:54:26.735 9038-9038/com.aleej.demo E/Alee: 10 08-21 09:54:26.836 9038-9038/com.aleej.demo E/Alee: 20 08-21 09:54:26.937 9038-9038/com.aleej.demo E/Alee: 30 08-21 09:54:27.039 9038-9038/com.aleej.demo E/Alee: 40 08-21 09:54:27.142 9038-9038/com.aleej.demo E/Alee: 50 08-21 09:54:27.244 9038-9038/com.aleej.demo E/Alee: 60 08-21 09:54:27.347 9038-9038/com.aleej.demo E/Alee: 70 08-21 09:54:27.450 9038-9038/com.aleej.demo E/Alee: 80 08-21 09:54:27.552 9038-9038/com.aleej.demo E/Alee: 90
-
throttleWithTimeout/debounce
这两个操作符都表示发射数据时,如果两次数据的发射间隔小于指定的时间,则会丢弃前一次的数据,直到指定时间段内都没有新数据发射时才进行发射。
其throttleWithTimeout内部也是调用debounce操作符来实现
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 100; i++) { emitter.onNext(i); Thread.sleep(10); //每发送一个事件,休眠10毫秒 } Thread.sleep(111); emitter.onNext(100); emitter.onComplete(); } }).throttleWithTimeout(100,TimeUnit.MILLISECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); //打印 99 100 } });
-
sample/throttleLast
一定时间内取最后一次发射的事件。
throttleLast内部也是采用sample操作符来实现
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 100; i++) { emitter.onNext(i); Thread.sleep(10); //每发送一个事件,休眠10毫秒 } emitter.onComplete(); } }).throttleLast(100,TimeUnit.MILLISECONDS) //取每100毫秒内最后一次发射的事件 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); } });
打印结果
08-21 10:24:52.545 11096-11249/com.aleej.demo E/Alee: 9 08-21 10:24:52.645 11096-11249/com.aleej.demo E/Alee: 19 08-21 10:24:52.745 11096-11249/com.aleej.demo E/Alee: 29 08-21 10:24:52.845 11096-11249/com.aleej.demo E/Alee: 39 08-21 10:24:52.945 11096-11249/com.aleej.demo E/Alee: 49 08-21 10:24:53.045 11096-11249/com.aleej.demo E/Alee: 59 08-21 10:24:53.145 11096-11249/com.aleej.demo E/Alee: 68 08-21 10:24:53.245 11096-11249/com.aleej.demo E/Alee: 78 08-21 10:24:53.345 11096-11249/com.aleej.demo E/Alee: 88 08-21 10:24:53.445 11096-11249/com.aleej.demo E/Alee: 98
-
timeout
如果原始Observable过了指定的一段时长没有发射任何数据,则发送一个TimeoutException的异常通知或者使用备用的Observable发射数据。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Thread.sleep(120); emitter.onNext(1); emitter.onComplete(); } }).timeout(100,TimeUnit.MILLISECONDS,Observable.just(9)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee", String.valueOf(integer)); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.e("Alee", throwable.toString()); } });
打印结果
08-21 10:38:40.452 12430-12464/com.aleej.demo E/Alee: 9
条件/布尔操作符
-
all
判断所有的数据项是否都满足指定的条件。都满足才发射true,否则发射false
Observable.just(1,2,3,4,5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) { return integer > 0 && integer < 5; } }) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { Log.e("Alee",aBoolean.toString()); //打印false } });
-
any
判断是否存在某一项满足指定的条件。只要存在则发射true,否则发射false,如果原始Observable没发射任何事件,则默认发射false。
Observable.just(1,2,3,4,5) .any(new Predicate<Integer>() { @Override public boolean test(Integer integer) { return integer > 0 && integer < 5; } }) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { Log.e("Alee",aBoolean.toString()); //打印true } });
-
contains
判断在发射的所有数据项中是否包含指定的元素。内部调用的any操作符来实现。
Observable.just(2,8,2,5,6) .contains(6) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { Log.e("Alee",aBoolean.toString()); //打印true } });
-
sequenceEqual
判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)
Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3)) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { Log.e("Alee",aBoolean.toString()); //打印true } }); Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(3,2,1)) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { Log.e("Alee",aBoolean.toString()); //打印false } });
-
isEmpty
用于判断原始Observable发射完毕时,有没有发射数据。有发射数据则发射false,如果只收到了onComplete通知则发射true。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onComplete(); } }).isEmpty().subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) { Log.e("Alee",aBoolean.toString()); //打印true } });
-
ambArray
给定多个Observable,选择第一个发射数据的Observable进行处理,其他Observable则被抛弃。
Observable.ambArray(Observable.timer(1,TimeUnit.SECONDS),Observable.just(7L,8L,9L)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { Log.e("Alee",aLong.toString()); //打印7 8 9 } });
-
switchIfEmpty
如果原始Observable正常终止后没有发射任何数据,就使用备用的Observable发射数据。
Observable<Integer> observable = Observable.empty(); observable.switchIfEmpty(Observable.just(1,2,3)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); //打印 1 2 3 } });
-
defaultIfEmpty
如果原始Observable正常终止后没有发射任何数据,就发射一个默认值。其内部也是调用的switchIfEmpty。
bservable<Integer> observable = Observable.empty(); observable.defaultIfEmpty(9) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); //打印 9 } });
-
takeUntil
当发射的数据满足指定条件时(包含该数据),或者指定的第二个Observable发射完毕后,终止第一个Observable发射数据。
Observable.just(1,2,3,4) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) { return integer % 2 == 0; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); //打印 1 2 } });
-
takeWhile
当发射的数据不满足指定条件时(不包含该数据),Observable终止发射数据。
Observable.just(1,2,3,4,5) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) { return integer != 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",String.valueOf(integer)); //打印 1 2 } });
-
skipUntil
接收一个Observable,直到该Observable发射事件之前,原始Observable所有已发射的事件全部被抛弃。
//从0开始发射10次事件,初始发射延迟0秒,每次发射后间隔1秒再发射下一次事件 Observable.intervalRange(0,10,0,1,TimeUnit.SECONDS) .skipUntil(Observable.timer(3,TimeUnit.SECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long integer) { Log.e("Alee",integer.toString()); } });
打印结果
08-21 14:31:21.986 25957-26044/com.aleej.demo E/Alee: 3 //有时候不会显示3这条数据 08-21 14:31:22.987 25957-26044/com.aleej.demo E/Alee: 4 08-21 14:31:23.987 25957-26044/com.aleej.demo E/Alee: 5 08-21 14:31:24.987 25957-26044/com.aleej.demo E/Alee: 6 08-21 14:31:25.986 25957-26044/com.aleej.demo E/Alee: 7 08-21 14:31:26.987 25957-26044/com.aleej.demo E/Alee: 8 08-21 14:31:27.987 25957-26044/com.aleej.demo E/Alee: 9
-
skipWhile
接受一个Predicate用来控制跳过开始一段数据。
//从0开始发射10次事件,初始发射延迟0秒,每次发射后间隔1秒再发射下一次事件 Observable.intervalRange(0,10,0,1,TimeUnit.SECONDS) .skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) { return aLong != 5; //注意:是前面不等于5的一段数据将被抛弃 } }) .subscribe(new Consumer<Long>() { @Override public void accept(Long integer) { Log.e("Alee",integer.toString()); } });
打印结果
08-21 14:41:28.255 26738-26795/com.aleej.demo E/Alee: 5 08-21 14:41:29.255 26738-26795/com.aleej.demo E/Alee: 6 08-21 14:41:30.255 26738-26795/com.aleej.demo E/Alee: 7 08-21 14:41:31.255 26738-26795/com.aleej.demo E/Alee: 8 08-21 14:41:32.255 26738-26795/com.aleej.demo E/Alee: 9
聚合操作符
-
reduce
把Observable中所有元素都聚合成单一的元素,例如把所有元素都加起来
Observable.just(1,2,3,4,5) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer1, Integer integer2) { return integer1+integer2; //求和 } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); // 打印 15 } }); Observable.just("a","b","c","d","e") .reduce(new BiFunction<String, String, String>() { @Override public String apply(String s1, String s2) { return String.format("%s->%s",s1,s2); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); //打印 a->b->c->d->e } });
-
count
计算发射的数量。
Observable.just(1,2,3) .count() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) { Log.e("Alee",aLong.toString()); //打印 3 } });
-
collect
和reduce操作类似,但是collect需要自己定义收集的容器和收集逻辑。
Observable.just(7,8,9) .collect(new Callable<List<Integer>>() { @Override public List<Integer> call() { return new ArrayList<>();//创建数据结构用于收集 } }, new BiConsumer<List<Integer>, Integer>() { @Override public void accept(List<Integer> list, Integer integer) { list.add(integer); //收集数据 } }).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> list) { Log.e("Alee",list.toString()); // 打印 [7, 8, 9] } });
转换操作符
-
toList
收集原始Observable发射的所有数据到一个List,然后发射这个List事件。
Observable.just(1,3,7) .toList() .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> list) { Log.e("Alee",list.toString());// 打印 [1, 3, 7] } });
-
toSortedList
收集原始Observable发射的所有数据到一个有序列表,可以自定义排序规则,然后发射这个列表事件。
Observable.just(1,4,7,3,5,2,6) .toSortedList() .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> list) { Log.e("Alee",list.toString()); //打印 [1, 2, 3, 4, 5, 6, 7] } });
-
toMap
收集原始Observable发射的所有数据到一个Map集合,根据toMap的两个参数自定义key和value,然后发射这个Map事件。
Observable.just(0,8,1,8) .toMap(new Function<Integer, String>() { @Override public String apply(Integer integer) { return String.format(Locale.getDefault(),"key:%d",integer); } }, new Function<Integer, String>() { @Override public String apply(Integer integer) { return String.format(Locale.getDefault(),"value->%d",integer); } }).subscribe(new Consumer<Map<String, String>>() { @Override public void accept(Map<String, String> stringStringMap) { //打印 {key:1=value->1, key:0=value->0, key:8=value->8} Log.e("Alee",stringStringMap.toString()); } });
-
toMultiMap
和toMap类似,不同的是map的value是一个集合。
Observable.just(0,8,1,8) .toMultimap(new Function<Integer, String>() { @Override public String apply(Integer integer) { return String.format(Locale.getDefault(),"key:%d",integer); } }, new Function<Integer, String>() { @Override public String apply(Integer integer) { return String.format(Locale.getDefault(),"value->%d",integer); } }).subscribe(new BiConsumer<Map<String, Collection<String>>, Throwable>() { @Override public void accept(Map<String, Collection<String>> stringCollectionMap, Throwable throwable) { //打印 {key:1=[value->1], key:0=[value->0], key:8=[value->8, value->8]} Log.e("Alee",stringCollectionMap.toString()); } });
变换操作符
-
map
对Observable发射的每一项元素都应用一个函数来变换成新的元素。
Observable.just(1,2,3) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) { return String.format(Locale.getDefault(),"我是%d",integer); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); // 打印 我是1 我是2 我是3 } });
-
cast
强制转换每一个元素的类型,内部调用map操作符来进行转换。
Observable.just(1,2,3) .cast(Number.class) .subscribe(new Consumer<Number>() { @Override public void accept(Number number) { Log.e("Alee",number.toString()); //打印 1 2 3 } });
-
flatMap
把原始Observable中的每一个元素转换成新的Observable,每个新转换成的Observable发射的元素将会合并成一个单独的Observable,然后按顺序发射事件。
Observable.just(1,2,3) .flatMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply(Integer integer) { return Observable.just(integer*10,integer*100); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { //打印 10 100 20 200 30 300 Log.e("Alee",integer.toString()); } });
-
flatMapIterable
和flatMap一样,只不过是把每个元素转换成了Iterable。
Observable.just(1,2,3) .flatMapIterable(new Function<Integer, Iterable<Integer>>() { @Override public Iterable<Integer> apply(Integer integer) { return Arrays.asList(integer*10,integer*100); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { //打印 10 100 20 200 30 300 Log.e("Alee",integer.toString()); } });
-
concatMap
类似于flatMap
flatMap操作符是使用merge合并元素,concatMap操作符则是通过concat合并元素,前者可能会出现元素交错问题,后者严格按照顺序发射。
Observable.just(1,2,3) .concatMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply(Integer integer) { return Observable.just(integer*10,integer*100); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { //打印 10 100 20 200 30 300 Log.e("Alee",integer.toString()); } });
-
switchMap
和flatMap类似,但是转换出来的每一个新的Observable会取代掉前一个Observable。
Observable.just(1,2,3) .switchMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply(Integer integer) { return Observable.timer(1,TimeUnit.SECONDS).map(new Function<Long, Integer>() { @Override public Integer apply(Long aLong) { return integer; } }); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); // 打印 3 } });
-
scan
扫描每一个元素,第一个元素将忽略扫描,从第二个元素开始(可以获得上一个元素的值)进行扫描变换后发射
Observable.just(1,2,3) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer last, Integer item) { /** 会走2次 第一次 last=1 item=2 第二次 last=3 item=3 */ return last+item; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); //打印 1 3 6 } });
-
groupBy
通过Function接收每个数据的分组key,然后发射GroupedObservable,使用者可以再订阅这个GroupedObservable进行数据输出。
Observable.just(1,2,3,4,5) .groupBy(new Function<Integer, String>() { @Override public String apply(Integer integer) { return integer % 2 == 0 ? "偶数" : "奇数"; } }).subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) { stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",stringIntegerGroupedObservable.getKey()+":"+integer.toString()); } }); } });
打印结果
08-21 17:39:34.102 7971-7971/? E/Alee: 奇数:1 偶数:2 奇数:3 偶数:4 奇数:5
-
buffer
把全部元素按照指定大小打包成元素集合,然后发射这些集合。
Observable.just(1,2,3,4,5) .buffer(3) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> list) { // 打印 [1, 2, 3] [4, 5] Log.e("Alee",list.toString()); } });
-
window
把全部元素按照指定个数拆分成若干个Observable发射出去。
Observable.just(1,2,3,4,5) .window(3) .subscribe(new Consumer<Observable<Integer>>() { @Override public void accept(Observable<Integer> integerObservable) { integerObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { //打印 1 2 3 4 5 Log.e("Alee",integer.toString()); } }); } });
错误处理/重试机制
-
onErrorResumeNext
当原始Observable在遇到错误时,使用备用Observable发射数据。
Observable.just(1,"2",3) .cast(Integer.class) .onErrorResumeNext(Observable.just(3,2,1)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString());// 打印 1 3 2 1 } });
-
onExceptionResumeNext
当原始Observable在遇到异常时,使用备用Observable发射数据。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,而onExceptionResumeNext只能处理异常。
Observable.just(1,2,"3") .cast(Integer.class) .onExceptionResumeNext(Observable.just(3,2,1)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString());// 打印 1 2 3 2 1 } });
-
onErrorReturn/onErrorReturnItem
当原始Observable在遇到错误的时候发射一个特定的数据。
Observable.just(1,2,"3") .cast(Integer.class) // .onErrorReturnItem(4) .onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) { return 4; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); // 打印 1 2 4 } });
-
retry
当原始Observable遇到错误时进行重试。
Observable.just(1,2,"3") .cast(Integer.class) .retry(3)//当遇到错误时,再重试3次 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee", integer.toString()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { Log.e("Alee",throwable.toString()); } });
打印结果
08-22 09:55:20.689 15859-15859/? E/Alee: 1 2 1 2 1 2 1 2 java.lang.ClassCastException: Cannot cast java.lang.String to java.lang.Integer
-
retryWhen
当原始Observable在遇到错误时,将错误传递给另一个Observable来决定是否要重新订阅原始Observable。
Observable.just(1,2,"3") .cast(Integer.class) .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Throwable> throwableObservable) { return Observable.timer(1,TimeUnit.SECONDS).map(new Function<Long, Integer>() { @Override public Integer apply(Long aLong) { return 7; } }); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee", integer.toString()); } });
打印结果
08-22 10:08:38.321 16579-16579/com.aleej.demo E/Alee: 1 2 08-22 10:08:39.321 16579-16632/com.aleej.demo E/Alee: 1 2
ConnectableObservable
可连接的Observable。在被订阅时并不开始发射数据,只有在它的connect()被调用的时候才开始发射数据。可以等所有潜在的订阅者都订阅了这个Observable之后才开始发射数据。
-
connect
指示一个可连接的Observable开始发射数据。
-
Observable.publish()
将一个Observable转换为一个可连接的Observable。
-
Observable.replay()
返回确保所有的订阅者看到相同的数据序列的
ConnectableObservable
,即使它们在Observable开始发射数据之后才订阅。 -
refCount
让一个可连接的Observable表现得像一个普通的Observable。
ConnectableObservable<Integer> observable = Observable.just(1,2,3).publish(); observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee", String.format("No1. %d",integer)); } }); observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee", String.format("No2. %d",integer)); } }); observable.connect();//调用此方法发射数据
打印结果
08-22 10:44:03.248 18881-18881/? E/Alee: No1. 1 No2. 1 08-22 10:44:03.249 18881-18881/? E/Alee: No1. 2 No2. 2 No1. 3 No2. 3
阻塞操作符
在Rxjava1中的BlockingObservable已经在Rxjava2中去掉了,在Rxjava2中已经集成到了Observable中。
官方说明:https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0
-
blockingForEach
对Observable发射的每一项数据调用一个方法,会阻塞直到Observable完成
Observable.just(1,2).blockingForEach(new Consumer<Integer>() { @Override public void accept(Integer integer) throws InterruptedException { Log.e("Alee", String.format("%d %s",integer,Thread.currentThread().getName())); Thread.sleep(500); } }); Log.e("Alee",Thread.currentThread().getName());
打印结果
08-22 11:15:32.397 20699-20699/com.aleej.demo E/Alee: 1 main 08-22 11:15:32.898 20699-20699/com.aleej.demo E/Alee: 2 main 08-22 11:15:33.400 20699-20699/com.aleej.demo E/Alee: main
-
blockingFirst
阻塞直到Observable发射了一个数据,然后返回第一项数据
-
blockingMostRecent
返回一个总是返回Observable最近发射的数据的iterable
-
blockingLatest
返回一个iterable,会阻塞直到或者除非Observable发射了一个iterable没有返回的值,然后返回这个值
-
blockingNext
返回一个iterable,阻塞直到Observable发射了另外一个值
-
blockingLast
阻塞直到Observable终止,然后返回最后一项数据
-
blockingIterable
将Observable转换返回一个iterable.
-
blockingSingle
如果Observable终止时只发射了一个值,返回那个值,否则抛出异常
-
blockingSubscribe
在当前线程订阅,和forEach类似
工具集
-
materialize
将Observable转换成一个通知列表。
Observable.just(1,2,3) .materialize() .subscribe(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) { Log.e("Alee",integerNotification.toString()); } });
打印结果
08-22 11:40:45.995 21309-21309/com.aleej.demo E/Alee: OnNextNotification[1] OnNextNotification[2] OnNextNotification[3] OnCompleteNotification
-
dematerialize
将通知逆转回一个Observable。
Observable.just(1,2,3) .materialize() .dematerialize() .subscribe(new Consumer<Object>() { @Override public void accept(Object integer) { Log.e("Alee",integer.toString()); //打印 1 2 3 } });
-
timestamp
给Observable发射的每个数据项添加一个时间戳。
Observable.just(1,2,3) .timestamp() .subscribe(new Consumer<Timed<Integer>>() { @Override public void accept(Timed<Integer> integerTimed) { Log.e("Alee",integerTimed.toString()); } });
打印结果
Timed[time=1534909593930, unit=MILLISECONDS, value=1] Timed[time=1534909593930, unit=MILLISECONDS, value=2] Timed[time=1534909593930, unit=MILLISECONDS, value=3]
-
timeInterval
给Observable发射的两个数据项间添加一个时间差。
Observable.intervalRange(1L,5L,0L,1L,TimeUnit.SECONDS) .timeInterval() .subscribe(new Consumer<Timed<Long>>() { @Override public void accept(Timed<Long> integerTimed) { Log.e("Alee",integerTimed.toString()); } });
打印结果
08-22 13:52:29.032 25153-25198/com.aleej.demo E/Alee: Timed[time=7, unit=MILLISECONDS, value=1] 08-22 13:52:30.027 25153-25198/com.aleej.demo E/Alee: Timed[time=995, unit=MILLISECONDS, value=2] 08-22 13:52:31.027 25153-25198/com.aleej.demo E/Alee: Timed[time=1000, unit=MILLISECONDS, value=3] 08-22 13:52:32.027 25153-25198/com.aleej.demo E/Alee: Timed[time=1000, unit=MILLISECONDS, value=4] 08-22 13:52:33.025 25153-25198/com.aleej.demo E/Alee: Timed[time=999, unit=MILLISECONDS, value=5]
-
serialize
强制一个Observable连续调用并保证行为正确。
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个
onNext
调用之前尝试调用onCompleted
或onError
方法,或者从两个不同的线程同时调用onNext
方法。使用Serialize
操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的RxJava中的实现是
serialize
,它默认不在任何特定的调度器上执行 -
cache
缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者。
-
observeOn
指定观察者观察Observable的调度器。
-
subscribeOn
指定Observable执行任务的调度器。
-
doOnEach
注册一个动作,对Observable发射的每个数据项使用。
-
doOnCompleted
注册一个动作,对正常完成的Observable使用。
-
doOnError
注册一个动作,对发生错误的Observable使用。
-
doOnTerminate
注册一个动作,对完成的Observable使用,无论是否发生错误。
-
doOnSubscribe
注册一个动作,在观察者订阅时使用。
-
doOnDispose
注册一个动作,在观察者取消订阅时使用。
-
doAfterTerminate
注册一个动作,在Observable完成时使用。
-
delay
延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)。
-
delaySubscription
延时处理订阅请求。
-
using
创建一个只在Observable生命周期存在的资源,当Observable终止时这个资源会被自动释放
Observable.using(new Callable<File>() { @Override public File call() throws Exception { //创建Observable生命周期内需要的资源 File file = new File(getCacheDir(),"alee.txt"); if (!file.exists()) { boolean result = file.createNewFile(); Log.e("Alee","create"); } return file; } }, new Function<File, ObservableSource<String>>() { @Override public ObservableSource<String> apply(File file) { return Observable.just(file.exists() ? "exist" : "not exist"); } }, new Consumer<File>() { @Override public void accept(File file) { if (file.exists()) { file.delete(); Log.e("Alee","delete"); } } }).subscribe(new Consumer<String>() { @Override public void accept(String s) { Log.e("Alee",s); } }); }
打印结果
08-22 15:06:57.747 27294-27294/com.aleej.demo E/Alee: create 08-22 15:06:57.748 27294-27294/com.aleej.demo E/Alee: exist delete
-
single
如果原始Observable只发射了一个数据,则下游接收到该数据;如果一个数据都没发射,则下游接收到默认数据;如果发射了多个数据,否则抛出异常(IllegalArgumentException)。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) { emitter.onComplete(); } }) .single(9) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { Log.e("Alee",integer.toString()); //打印 9 } });
调度器 Scheduler
如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的Observable)在特定的调度器(Scheduler)上执行。
-
Schedulers.computation
用于计算任务,如事件循环或和回调处理。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
-
Schedulers.newThread
开启一个新的线程。
-
Schedulers.io
主要用于一些耗时操作,比如读写文件,数据库存取,网络交互等。 这个调度器根据需要,增加或者减少线程池中的线程数量。需要注意的是Schedulers.io()中的线程池数量是无限制大的,大量的I/O操作将创建许多线程,我们需要在性能和线程数量中做出取舍。
-
Schedulers.from(executor)
使用指定的Executor作为调度器。
-
Schedulers.immediate
在当前线程立即开始执行任务。
-
Schedulers.trampoline
当其它排队的任务完成后,在当前线程排队开始执行。
-
AndroidSchedulers.mainThread
Android专用,指定的操作在Android的主线程(UI线程中)运行。
需要依赖RxAndroid库。