RxJava操作符

RxJava常用操作符大全

Posted by Alee on July 14, 2018

本篇都是基于RxJava2.0的操作符说明,由于操作符太多,本篇只列举常用的操作符说明

创建操作符

  1. create

    通过实现ObservableOnSubscribe接口的subscribe来创建,该方法有一个事件发射器ObservableEmitter,通过它来向下游发射事件。

    onNext:发射事件

    onComplete:表示事件全部发射完成,后续再调用onNext不生效

    onError:表示事件发射异常,和onComplete相冲突,事件要么全部发射完成,要么异常终止。调用后再onNext发射事件不生效。

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) {
                    emitter.onNext("Android");
                    emitter.onNext("iOS");
                    emitter.onNext("Sailfish OS");
                    emitter.onComplete();
                    emitter.onNext("Meego");//不生效
                }
            });
    
  2. just

    把传入的参数依次发射出去

    Observable.just("Android","iOS","Sailfish OS")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.e("Alee",s);
                        }
                    });
    
  3. from

    1. fromArray

      把一个可变参数(数组)依次发射出去

      Observable.fromArray("Android","iOS","Sailfish OS")
                      .subscribe(new Consumer<String>() {
                          @Override
                          public void accept(String s) throws Exception {
                              Log.e("Alee",s);
                          }
                      });
      
    2. fromFuture

      Observable.fromFuture(Executors.newSingleThreadExecutor().submit(new Callable<String>() {
                public String call() {
                    Thread.sleep(3000);
                    return "Alee";
                }
      	}))
                      .subscribe(new Consumer<String>() {
                          @Override
                          public void accept(String s) {
                              Log.e("Alee",s);
                          }
                      });
      
    3. fromCallable

      Observable.fromCallable(new Callable<String>() {
            
                  @Override
                  public String call() {
                      return "萨拉黑";
                  }
              }).subscribe(new Consumer<String>() {
                  @Override
                  public void accept(String s) {
                      Log.e("Alee",s);
                  }
              });
      
    4. fromIterable

      List<String> list = new ArrayList<>();
              list.add("撒");
              list.add("啦");
              list.add("嘿");
              Observable.fromIterable(list).subscribe(new Consumer<String>() {
                  @Override
                  public void accept(String s) {
                      Log.e("Alee",s);
                  }
              });
      
    5. fromPublisher

      Observable.fromPublisher(new Publisher<String>() {
                  @Override
                  public void subscribe(Subscriber<? super String> s) {
                      s.onNext("Android");
                      s.onNext("iOS");
                      s.onNext("Sailfish OS");
                      s.onComplete();
                      s.onNext("Others");//不生效
                  }
              }).subscribe(new Consumer<String>() {
                  @Override
                  public void accept(String s) {
                      Log.e("Alee",s);
                  }
              });
      
  4. timer

    通过Timer操作符创建一个延迟发射的Observable,发射的是一个Long型的0,等同于Android的Handler中的postDelay()方法。

    Observable.timer(3, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) {
                    Log.e("Alee",String.valueOf(aLong)); //打印0
                }
            });
    
  5. interval

    通过interval操作符创建一个可延迟发射且按照固定时间间隔发射自增整数的Observable,一般用来进行倒计时,或者计时器等

    //1秒后每间隔一秒发射一个Long值出去 ,数值从0开始,每次加1,一共发射10次
    Observable.interval(1,1,TimeUnit.SECONDS)
        .take(10) //take 只取前10次发射的事件
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                Log.e("Alee",String.valueOf(aLong));//打印 0 到 9
            }
        });
    
  6. repeat

    通过repeat操作符创建一个重复发射特定数据的Observable。

    Observable.just(1).repeat(3).subscribe(new Consumer<Integer>() {
                int count;
                @Override
                public void accept(Integer integer) {//integer 始终等于1
                    Log.e("Alee",String.valueOf(++count));//打印 1 ,2 ,3
                }
            });
    
  7. empty

    创建一个空的(什么事件都不发射的)直接通知完成的Observable

    Observable.empty().subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
       
                }
       
                @Override
                public void onNext(Object s) {
                    Log.e("Alee",s.toString());//不会走到这儿
                }
       
                @Override
                public void onError(Throwable e) {
       
                }
       
                @Override
                public void onComplete() {
                    Log.e("Alee","complete");//会走到这儿
                }
            });
    
  8. error

    同empty一样,创建一个什么事件都不发射只通知错误的Observable

    Observable.error(new Throwable("oops, you have no money")).subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
       
                }
       
                @Override
                public void onNext(Object s) {
                    Log.e("Alee",s.toString());
                }
       
                @Override
                public void onError(Throwable e) {
                    Log.e("Alee",e.getMessage());//会走到这儿
                }
       
                @Override
                public void onComplete() {
                    Log.e("Alee","complete");
                }
            });
    
  9. never

    创建一个不发射任何事件且没有任何通知完成或者错误的Observable

    Observable.never().subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("Alee","onSubscribe"); //只会走到这儿
                }
                @Override
                public void onNext(Object o) {
                    Log.e("Alee","onNext");
                }
                @Override
                public void onError(Throwable e) {
                    Log.e("Alee","onError");
                }
                @Override
                public void onComplete() {
                    Log.e("Alee","onComplete");
                }
            });
    
  10. range

    创建一个发射指定范围的整型事件的Observable

    Observable.range(1,9)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer)); //打印1-9
                        }
                    });
    
  11. defer

    只有当订阅者订阅后才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅的时候调用Callable的call()方法创建Observable。

    Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() {
                    return Observable.just("hello world");
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    Log.e("Alee",s);
                }
            });
    

合并操作符

  1. concat

    按照顺序连接多个同事件类型的Observable,比如发射Integer事件的Observable和发射String事件的Observable是不能concat的。另外a.concatWith(b)其内部也是调用Observable.concat(this,b)

    Observable<String> observable1 = Observable.just("撒","啦");
            Observable<String> observable2 = Observable.just("嘿","哟");
            Observable.concat(observable1,observable2).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    Log.e("Alee",s); //打印 撒 啦 嘿 哟
                }
            });
    
  2. startWith

    其内部也是调用concat连接两个Observable,只是把后面的Observable连接到前面而已

    Observable.just("爱","你","哟")
                    .startWith("Alee")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            Log.e("Alee",s);// 打印 Alee 爱 你 哟
                        }
                    });
    
  3. merge

    顾名思义,就是将多个Observable合并成一个。其与concat不同的地方在于merge不是按照事件顺序而是按照时间线来连接的。需要注意的是,采用merge合并如果遇到异常,则停止发射事件,并发送onError通知。另一个mergeDelayError可以将异常延迟到其他没有错误的Observable发射完成后才发送onError通知。

    Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    for (int i = 0; i < 3; i++) {
                        try {
                            Thread.sleep(1000);
                            emitter.onNext(i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            emitter.onError(new Throwable(e.getMessage()));
                        }
                    }
                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.io());//订阅在io线程
            Observable observable2 = Observable.create(new 				ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    for (int i = 10; i <= 30; i+=10) {
                        try {
                            Thread.sleep(1000);
                            emitter.onNext(i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            emitter.onError(new Throwable(e.getMessage()));
                        }
                    }
                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.io());//订阅在io线程
    //在Android主线程观察接收事件        
    Observable.merge(observable1,observable2).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee",String.valueOf(integer));
                }
            });
    

    打印结果:

    08-20 11:47:19.131 18326-18326/com.aleej.demo E/Alee: 10
    08-20 11:47:19.132 18326-18326/com.aleej.demo E/Alee: 0
    08-20 11:47:20.133 18326-18326/com.aleej.demo E/Alee: 20
    08-20 11:47:20.134 18326-18326/com.aleej.demo E/Alee: 1
    08-20 11:47:21.135 18326-18326/com.aleej.demo E/Alee: 2
    08-20 11:47:21.136 18326-18326/com.aleej.demo E/Alee: 30
    
  4. zip

    组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数量不一样,则以最少的Observable为标准进行压合。

    Observable<Integer> observable1 = Observable.just(1,2,3,4);
            Observable<String> observable2 = Observable.just("a","b","c");
            Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
                @Override
                public String apply(Integer integer, String s) {
                    return String.format(Locale.getDefault(),"%d->%s",integer,s);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    Log.e("Alee",s); //打印 1->a 2->b 3->c
                }
            });
    
  5. combineLatest

    当两个Observable中的任何一个发射了一个数据时,通过combineLatest组合每个Observable发射的最新数据(共两个数据),然后发射这个函数的结果。和zip类似,区别在于zip只在每个Observable都发射了数据才压合,combineLatest在任何一个Observable发射了数据都进行压合,每次与另一个Observable最近的数据进行压合。

    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws InterruptedException {
                    emitter.onNext(1);
                    Thread.sleep(1000);
                    emitter.onNext(2);
                    Thread.sleep(5000);
                    emitter.onNext(3);
                    Thread.sleep(500);
                    emitter.onNext(4);
                    Thread.sleep(1000);
                    emitter.onNext(5);
                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.io());
            Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws InterruptedException {
                    Thread.sleep(500);
                    emitter.onNext("A");
                    Thread.sleep(1000);
                    emitter.onNext("B");
                    Thread.sleep(1000);
                    emitter.onNext("C");
                    Thread.sleep(100);
                    emitter.onNext("D");
                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.io());
            Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() {
                @Override
                public String apply(Integer integer, String s) {
                    return String.format(Locale.getDefault(),"%d%s",integer,s);
                }
            }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    Log.e("Alee",s);
                }
            });
    

    打印结果

    08-20 14:26:22.447 22751-22751/com.aleej.demo E/Alee: 1A
    08-20 14:26:22.946 22751-22751/com.aleej.demo E/Alee: 2A
    08-20 14:26:23.451 22751-22751/com.aleej.demo E/Alee: 2B
    08-20 14:26:24.457 22751-22751/com.aleej.demo E/Alee: 2C
    08-20 14:26:24.563 22751-22751/com.aleej.demo E/Alee: 2D
    08-20 14:26:27.953 22751-22751/com.aleej.demo E/Alee: 3D
    08-20 14:26:28.459 22751-22751/com.aleej.demo E/Alee: 4D
    08-20 14:26:29.466 22751-22751/com.aleej.demo E/Alee: 5D
    

过滤操作符

  1. filter

    通过指定条件过滤数据

    Observable.just(1,2,3,4,5)
                    .filter(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) {
                            return integer % 2 == 0; //只要能被2整除的值
                        }
                    }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee",String.valueOf(integer)); //打印 2 4
                }
            });
    
  2. ofType

    通过指定类型过滤数据

    Observable.just("A",2,"c",4,5.0F,6L,7.77,0x8)
                    .ofType(String.class) //过滤出所有String 类型的值
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            Log.e("Alee",s); //打印 A c
                        }
                    });
    
  3. take

    只发射开始的前N项数据,或者一定时间内的数据。

    Observable.just(1,3,5,7,9)
                    .take(3)//取前三项数据
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer));//打印 1 3 5
                        }
                    });
    
  4. takeLast

    只发射最后的N项数据,或者一定时间内的数据。

    Observable.just(1,3,5,7,9)
                    .takeLast(3)//取最后3项数据
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer)); //打印 5 7 9
                        }
                    });
    
  5. first

    只发射第一项数据。需要指定默认值

    Observable.just("A","B","C")
                    .first("Def")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            Log.e("Alee",s); //打印A
                        }
                    });
    
  6. last

    只发射最后一项数据。需要指定默认值

    Observable.just("A","B","C")
                    .last("Def")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            Log.e("Alee",s);//打印 C
                        }
                    });
    
  7. skip

    跳过开始的N项数据或者一定时间内的数据。

    Observable.just(1,2,3,4,5)
                    .skip(2)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer)); //打印 3 4 5
                        }
                    });
    
  8. skipLast

    跳过最后的N项数据或者一定时间内的数据。

    Observable.just("1","2","3","4")
                    .skipLast(2)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            Log.e("Alee",s);//打印 1 2
                        }
                    });
    
  9. elementAt

    发射某一项数据,如果超过了范围则可以指定默认值

    Observable.just(1,2,3,4,5)
                    .elementAt(3)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer));//打印 4
                        }
                    });
    
  10. elementAtOrError

    发射某一项数据,如果超过了范围则发送一个NoSuchElementException的Error通知。

    Observable.just(1,2,3,4,5)
                   .elementAtOrError(6)
                   .subscribe(new BiConsumer<Integer, Throwable>() {
                       @Override
                       public void accept(Integer integer, Throwable throwable) {
                           //打印 java.util.NoSuchElementException
                           Log.e("Alee",String.valueOf(throwable.toString()));
                       }
                   });
    
  11. ignoreElements

    丢弃所有数据,只发射错误或者正常终于的通知。

  12. distinct

    过滤重复数据。

    Observable.just(1,2,3,2,4,2,3,4,5,9)
                    .distinct()
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer));// 打印 1 2 3 4 5 9
                        }
                    });
    
  13. distinctUntilChanged

    过滤连续重复的数据。

    Observable.just(1,2,2,2,4,2,3,4,5,9)
                    .distinctUntilChanged()
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer));// 打印 1 2 4 2 3 4 5 9
                        }
                    });
    
  14. throttleFirst

    一定时间内取第一次发射的事件。可用于防止按钮在一段时间内重复点击(防抖动)

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 100; i++) {
                        emitter.onNext(i);
                        Thread.sleep(10); //每发送一个事件,休眠10毫秒
                    }
                    emitter.onComplete();
                }
            }).throttleFirst(100,TimeUnit.MILLISECONDS) //取每100毫秒内第一次发射的事件
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer));
                        }
                    });
    

    打印结果

    08-21 09:54:26.633 9038-9038/com.aleej.demo E/Alee: 0
    08-21 09:54:26.735 9038-9038/com.aleej.demo E/Alee: 10
    08-21 09:54:26.836 9038-9038/com.aleej.demo E/Alee: 20
    08-21 09:54:26.937 9038-9038/com.aleej.demo E/Alee: 30
    08-21 09:54:27.039 9038-9038/com.aleej.demo E/Alee: 40
    08-21 09:54:27.142 9038-9038/com.aleej.demo E/Alee: 50
    08-21 09:54:27.244 9038-9038/com.aleej.demo E/Alee: 60
    08-21 09:54:27.347 9038-9038/com.aleej.demo E/Alee: 70
    08-21 09:54:27.450 9038-9038/com.aleej.demo E/Alee: 80
    08-21 09:54:27.552 9038-9038/com.aleej.demo E/Alee: 90
    
  15. throttleWithTimeout/debounce

    这两个操作符都表示发射数据时,如果两次数据的发射间隔小于指定的时间,则会丢弃前一次的数据,直到指定时间段内都没有新数据发射时才进行发射。

    其throttleWithTimeout内部也是调用debounce操作符来实现

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 100; i++) {
                        emitter.onNext(i);
                        Thread.sleep(10); //每发送一个事件,休眠10毫秒
                    }
                    Thread.sleep(111);
                    emitter.onNext(100);
                    emitter.onComplete();
                }
            }).throttleWithTimeout(100,TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer)); //打印 99 100
                        }
                    });
    
  16. sample/throttleLast

    一定时间内取最后一次发射的事件。

    throttleLast内部也是采用sample操作符来实现

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 100; i++) {
                        emitter.onNext(i);
                        Thread.sleep(10); //每发送一个事件,休眠10毫秒
                    }
                    emitter.onComplete();
                }
            }).throttleLast(100,TimeUnit.MILLISECONDS) //取每100毫秒内最后一次发射的事件
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",String.valueOf(integer));
                        }
                    });
    

    打印结果

    08-21 10:24:52.545 11096-11249/com.aleej.demo E/Alee: 9
    08-21 10:24:52.645 11096-11249/com.aleej.demo E/Alee: 19
    08-21 10:24:52.745 11096-11249/com.aleej.demo E/Alee: 29
    08-21 10:24:52.845 11096-11249/com.aleej.demo E/Alee: 39
    08-21 10:24:52.945 11096-11249/com.aleej.demo E/Alee: 49
    08-21 10:24:53.045 11096-11249/com.aleej.demo E/Alee: 59
    08-21 10:24:53.145 11096-11249/com.aleej.demo E/Alee: 68
    08-21 10:24:53.245 11096-11249/com.aleej.demo E/Alee: 78
    08-21 10:24:53.345 11096-11249/com.aleej.demo E/Alee: 88
    08-21 10:24:53.445 11096-11249/com.aleej.demo E/Alee: 98
    
  17. timeout

    如果原始Observable过了指定的一段时长没有发射任何数据,则发送一个TimeoutException的异常通知或者使用备用的Observable发射数据。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Thread.sleep(120);
                    emitter.onNext(1);
                    emitter.onComplete();
                }
            }).timeout(100,TimeUnit.MILLISECONDS,Observable.just(9))
                    .subscribe(new Consumer<Integer>() {
                                   @Override
                                   public void accept(Integer integer) {
                                       Log.e("Alee", String.valueOf(integer));
                                   }
                               },
                            new Consumer<Throwable>() {
                                @Override
                                public void accept(Throwable throwable) throws Exception {
                                    Log.e("Alee", throwable.toString());
                                }
                            });
    

    打印结果

    08-21 10:38:40.452 12430-12464/com.aleej.demo E/Alee: 9
    

条件/布尔操作符

  1. all

    判断所有的数据项是否都满足指定的条件。都满足才发射true,否则发射false

    Observable.just(1,2,3,4,5)
                    .all(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) {
                            return integer > 0 && integer < 5;
                        }
                    })
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) {
                            Log.e("Alee",aBoolean.toString()); //打印false
                        }
                    });
    
  2. any

    判断是否存在某一项满足指定的条件。只要存在则发射true,否则发射false,如果原始Observable没发射任何事件,则默认发射false。

    Observable.just(1,2,3,4,5)
                    .any(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) {
                            return integer > 0 && integer < 5;
                        }
                    })
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) {
                            Log.e("Alee",aBoolean.toString()); //打印true
                        }
                    });
    
  3. contains

    判断在发射的所有数据项中是否包含指定的元素。内部调用的any操作符来实现。

    Observable.just(2,8,2,5,6)
                    .contains(6)
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) {
                            Log.e("Alee",aBoolean.toString()); //打印true
                        }
                    });
    
  4. sequenceEqual

    判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)

    Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3))
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) {
                            Log.e("Alee",aBoolean.toString()); //打印true
                        }
                    });
       
    Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(3,2,1))
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) {
                            Log.e("Alee",aBoolean.toString()); //打印false
                        }
                    });
    
  5. isEmpty

    用于判断原始Observable发射完毕时,有没有发射数据。有发射数据则发射false,如果只收到了onComplete通知则发射true。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    emitter.onComplete();
                }
            }).isEmpty().subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) {
                    Log.e("Alee",aBoolean.toString()); //打印true
                }
            });
    
  6. ambArray

    给定多个Observable,选择第一个发射数据的Observable进行处理,其他Observable则被抛弃。

    Observable.ambArray(Observable.timer(1,TimeUnit.SECONDS),Observable.just(7L,8L,9L))
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) {
                            Log.e("Alee",aLong.toString()); //打印7 8 9
                        }
                    });
    
  7. switchIfEmpty

    如果原始Observable正常终止后没有发射任何数据,就使用备用的Observable发射数据。

    Observable<Integer> observable = Observable.empty();
            observable.switchIfEmpty(Observable.just(1,2,3))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString()); //打印 1 2 3
                        }
                    });
    
  8. defaultIfEmpty

    如果原始Observable正常终止后没有发射任何数据,就发射一个默认值。其内部也是调用的switchIfEmpty。

    bservable<Integer> observable = Observable.empty();
            observable.defaultIfEmpty(9)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString()); //打印 9
                        }
                    });
    
  9. takeUntil

    当发射的数据满足指定条件时(包含该数据),或者指定的第二个Observable发射完毕后,终止第一个Observable发射数据。

    Observable.just(1,2,3,4)
                    .takeUntil(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) {
                            return integer % 2 == 0;
                        }
                    }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee",integer.toString()); //打印 1 2
                }
            });
    
  10. takeWhile

    当发射的数据不满足指定条件时(不包含该数据),Observable终止发射数据。

    Observable.just(1,2,3,4,5)
                      .takeWhile(new Predicate<Integer>() {
                          @Override
                          public boolean test(Integer integer) {
                              return integer != 3;
                          }
                      }).subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(Integer integer) {
                      Log.e("Alee",String.valueOf(integer)); //打印 1 2
                  }
              });
    
  11. skipUntil

    接收一个Observable,直到该Observable发射事件之前,原始Observable所有已发射的事件全部被抛弃。

    //从0开始发射10次事件,初始发射延迟0秒,每次发射后间隔1秒再发射下一次事件
    Observable.intervalRange(0,10,0,1,TimeUnit.SECONDS)
                    .skipUntil(Observable.timer(3,TimeUnit.SECONDS))
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long integer) {
                            Log.e("Alee",integer.toString());
                        }
                    });
    

    打印结果

    08-21 14:31:21.986 25957-26044/com.aleej.demo E/Alee: 3 //有时候不会显示3这条数据
    08-21 14:31:22.987 25957-26044/com.aleej.demo E/Alee: 4
    08-21 14:31:23.987 25957-26044/com.aleej.demo E/Alee: 5
    08-21 14:31:24.987 25957-26044/com.aleej.demo E/Alee: 6
    08-21 14:31:25.986 25957-26044/com.aleej.demo E/Alee: 7
    08-21 14:31:26.987 25957-26044/com.aleej.demo E/Alee: 8
    08-21 14:31:27.987 25957-26044/com.aleej.demo E/Alee: 9
    
  12. skipWhile

    接受一个Predicate用来控制跳过开始一段数据

    //从0开始发射10次事件,初始发射延迟0秒,每次发射后间隔1秒再发射下一次事件
    Observable.intervalRange(0,10,0,1,TimeUnit.SECONDS)
        .skipWhile(new Predicate<Long>() {
            @Override
            public boolean test(Long aLong) {
                return aLong != 5; //注意:是前面不等于5的一段数据将被抛弃
            }
        })
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long integer) {
                Log.e("Alee",integer.toString());
            }
        });
    

    打印结果

    08-21 14:41:28.255 26738-26795/com.aleej.demo E/Alee: 5
    08-21 14:41:29.255 26738-26795/com.aleej.demo E/Alee: 6
    08-21 14:41:30.255 26738-26795/com.aleej.demo E/Alee: 7
    08-21 14:41:31.255 26738-26795/com.aleej.demo E/Alee: 8
    08-21 14:41:32.255 26738-26795/com.aleej.demo E/Alee: 9
    

聚合操作符

  1. reduce

    把Observable中所有元素都聚合成单一的元素,例如把所有元素都加起来

    Observable.just(1,2,3,4,5)
                    .reduce(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer integer1, Integer integer2) {
                            return integer1+integer2; //求和
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString()); // 打印 15
                        }
                    });
            Observable.just("a","b","c","d","e")
                    .reduce(new BiFunction<String, String, String>() {
                        @Override
                        public String apply(String s1, String s2) {
                            return String.format("%s->%s",s1,s2);
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            Log.e("Alee",s); //打印  a->b->c->d->e
                        }
                    });
    
  2. count

    计算发射的数量。

    Observable.just(1,2,3)
                    .count()
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) {
                            Log.e("Alee",aLong.toString()); //打印 3
                        }
                    });
    
  3. collect

    和reduce操作类似,但是collect需要自己定义收集的容器和收集逻辑。

    Observable.just(7,8,9)
                    .collect(new Callable<List<Integer>>() {
                                 @Override
                                 public List<Integer> call() {
                                     return new ArrayList<>();//创建数据结构用于收集
                                 }
                             },
                            new BiConsumer<List<Integer>, Integer>() {
                                @Override
                                public void accept(List<Integer> list, Integer integer) {
                                    list.add(integer); //收集数据
                                }
                            }).subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> list) {
                    Log.e("Alee",list.toString()); // 打印 [7, 8, 9]
                }
            });
    

转换操作符

  1. toList

    收集原始Observable发射的所有数据到一个List,然后发射这个List事件。

    Observable.just(1,3,7)
                    .toList()
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> list) {
                            Log.e("Alee",list.toString());// 打印  [1, 3, 7]
                        }
                    });
    
  2. toSortedList

    收集原始Observable发射的所有数据到一个有序列表,可以自定义排序规则,然后发射这个列表事件。

    Observable.just(1,4,7,3,5,2,6)
                    .toSortedList()
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> list) {
                            Log.e("Alee",list.toString()); //打印  [1, 2, 3, 4, 5, 6, 7]
                        }
                    });
    
  3. toMap

    收集原始Observable发射的所有数据到一个Map集合,根据toMap的两个参数自定义key和value,然后发射这个Map事件。

    Observable.just(0,8,1,8)
                    .toMap(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) {
                            return String.format(Locale.getDefault(),"key:%d",integer);
                        }
                    }, new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) {
                            return String.format(Locale.getDefault(),"value->%d",integer);
                        }
                    }).subscribe(new Consumer<Map<String, String>>() {
                @Override
                public void accept(Map<String, String> stringStringMap) {
                    //打印 {key:1=value->1, key:0=value->0, key:8=value->8}
                    Log.e("Alee",stringStringMap.toString());
                }
            });
    
  4. toMultiMap

    和toMap类似,不同的是map的value是一个集合。

    Observable.just(0,8,1,8)
                    .toMultimap(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) {
                            return String.format(Locale.getDefault(),"key:%d",integer);
                        }
                    }, new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) {
                            return String.format(Locale.getDefault(),"value->%d",integer);
                        }
                    }).subscribe(new BiConsumer<Map<String, Collection<String>>, Throwable>() {
                @Override
                public void accept(Map<String, Collection<String>> stringCollectionMap, Throwable throwable) {
                    //打印 {key:1=[value->1], key:0=[value->0], key:8=[value->8, value->8]}
                    Log.e("Alee",stringCollectionMap.toString());
                }
            });
    

变换操作符

  1. map

    对Observable发射的每一项元素都应用一个函数来变换成新的元素。

    Observable.just(1,2,3)
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) {
                            return String.format(Locale.getDefault(),"我是%d",integer);
                        }
                    }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    Log.e("Alee",s); // 打印 我是1 我是2 我是3
                }
            });
    
  2. cast

    强制转换每一个元素的类型,内部调用map操作符来进行转换。

    Observable.just(1,2,3)
                    .cast(Number.class)
                    .subscribe(new Consumer<Number>() {
                        @Override
                        public void accept(Number number) {
                            Log.e("Alee",number.toString()); //打印 1 2 3
                        }
                    });
    
  3. flatMap

    把原始Observable中的每一个元素转换成新的Observable,每个新转换成的Observable发射的元素将会合并成一个单独的Observable,然后按顺序发射事件。

    Observable.just(1,2,3)
                    .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                        @Override
                        public ObservableSource<Integer> apply(Integer integer) {
                            return Observable.just(integer*10,integer*100);
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            //打印 10 100 20 200 30 300
                            Log.e("Alee",integer.toString());
                        }
                    });
    
  4. flatMapIterable

    和flatMap一样,只不过是把每个元素转换成了Iterable。

    Observable.just(1,2,3)
                    .flatMapIterable(new Function<Integer, Iterable<Integer>>() {
                        @Override
                        public Iterable<Integer> apply(Integer integer) {
                            return Arrays.asList(integer*10,integer*100);
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            //打印 10 100 20 200 30 300
                            Log.e("Alee",integer.toString());
                        }
                    });
    
  5. concatMap

    类似于flatMap

    flatMap操作符是使用merge合并元素,concatMap操作符则是通过concat合并元素,前者可能会出现元素交错问题,后者严格按照顺序发射。

    Observable.just(1,2,3)
                    .concatMap(new Function<Integer, ObservableSource<Integer>>() {
                        @Override
                        public ObservableSource<Integer> apply(Integer integer) {
                            return Observable.just(integer*10,integer*100);
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            //打印 10 100 20 200 30 300
                            Log.e("Alee",integer.toString());
                        }
                    });
    
  6. switchMap

    和flatMap类似,但是转换出来的每一个新的Observable会取代掉前一个Observable。

    Observable.just(1,2,3)
                    .switchMap(new Function<Integer, ObservableSource<Integer>>() {
                        @Override
                        public ObservableSource<Integer> apply(Integer integer) {
                            return Observable.timer(1,TimeUnit.SECONDS).map(new Function<Long, Integer>() {
                                @Override
                                public Integer apply(Long aLong) {
                                    return integer;
                                }
                            });
                        }
                    }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee",integer.toString()); // 打印 3
                }
            });
    
  7. scan

    扫描每一个元素,第一个元素将忽略扫描,从第二个元素开始(可以获得上一个元素的值)进行扫描变换后发射

    Observable.just(1,2,3)
                    .scan(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer last, Integer item) {
                            /**
                            会走2次  第一次 last=1 item=2
                            		第二次 last=3 item=3
                            */
                            return last+item;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString()); //打印 1 3 6
                        }
                    });
    
  8. groupBy

    通过Function接收每个数据的分组key,然后发射GroupedObservable,使用者可以再订阅这个GroupedObservable进行数据输出。

    Observable.just(1,2,3,4,5)
                    .groupBy(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) {
                            return integer % 2 == 0 ? "偶数" : "奇数";
                        }
                    }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
                @Override
                public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) {
                    stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",stringIntegerGroupedObservable.getKey()+":"+integer.toString());
                        }
                    });
       
                }
            });
    

    打印结果

    08-21 17:39:34.102 7971-7971/? E/Alee: 奇数:1
        偶数:2
        奇数:3
        偶数:4
        奇数:5
    
  9. buffer

    把全部元素按照指定大小打包成元素集合,然后发射这些集合。

    Observable.just(1,2,3,4,5)
                    .buffer(3)
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> list) {
                            // 打印 [1, 2, 3] [4, 5]
                            Log.e("Alee",list.toString());
                        }
                    });
    
  10. window

    把全部元素按照指定个数拆分成若干个Observable发射出去。

    Observable.just(1,2,3,4,5)
                    .window(3)
                    .subscribe(new Consumer<Observable<Integer>>() {
                        @Override
                        public void accept(Observable<Integer> integerObservable) {
                            integerObservable.subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) {
                                    //打印 1 2 3 4 5
                                    Log.e("Alee",integer.toString());
                                }
                            });
                        }
                    });
    

错误处理/重试机制

  1. onErrorResumeNext

    当原始Observable在遇到错误时,使用备用Observable发射数据。

    Observable.just(1,"2",3)
                    .cast(Integer.class)
                    .onErrorResumeNext(Observable.just(3,2,1))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString());// 打印 1 3 2 1
                        }
                    });
    
  2. onExceptionResumeNext

    当原始Observable在遇到异常时,使用备用Observable发射数据。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,而onExceptionResumeNext只能处理异常。

    Observable.just(1,2,"3")
                    .cast(Integer.class)
                    .onExceptionResumeNext(Observable.just(3,2,1))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString());// 打印 1 2 3 2 1
                        }
                    });
    
  3. onErrorReturn/onErrorReturnItem

    当原始Observable在遇到错误的时候发射一个特定的数据。

    Observable.just(1,2,"3")
                    .cast(Integer.class)
    //                .onErrorReturnItem(4)
                    .onErrorReturn(new Function<Throwable, Integer>() {
                        @Override
                        public Integer apply(Throwable throwable) {
                            return 4;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee",integer.toString()); // 打印 1 2 4
                }
            });
    
  4. retry

    当原始Observable遇到错误时进行重试。

    Observable.just(1,2,"3")
                    .cast(Integer.class)
                    .retry(3)//当遇到错误时,再重试3次
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee", integer.toString()); 
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) {
                            Log.e("Alee",throwable.toString());
                        }
                    });
    

    打印结果

    08-22 09:55:20.689 15859-15859/? E/Alee: 1
        2
        1
        2
        1
        2
        1
        2
        java.lang.ClassCastException: Cannot cast java.lang.String to java.lang.Integer
    
  5. retryWhen

    当原始Observable在遇到错误时,将错误传递给另一个Observable来决定是否要重新订阅原始Observable。

    Observable.just(1,2,"3")
                    .cast(Integer.class)
                    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) {
                            return Observable.timer(1,TimeUnit.SECONDS).map(new Function<Long, Integer>() {
                                @Override
                                public Integer apply(Long aLong) {
                                    return 7;
                                }
                            });
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee", integer.toString());
                        }
                    });
    

    打印结果

    08-22 10:08:38.321 16579-16579/com.aleej.demo E/Alee: 1
        2
    08-22 10:08:39.321 16579-16632/com.aleej.demo E/Alee: 1
        2
       
    

ConnectableObservable

可连接的Observable。在被订阅时并不开始发射数据,只有在它的connect()被调用的时候才开始发射数据。可以等所有潜在的订阅者都订阅了这个Observable之后才开始发射数据。

  1. connect

    指示一个可连接的Observable开始发射数据。

  2. Observable.publish()

    将一个Observable转换为一个可连接的Observable。

  3. Observable.replay()

    返回确保所有的订阅者看到相同的数据序列的ConnectableObservable,即使它们在Observable开始发射数据之后才订阅。

  4. refCount

    让一个可连接的Observable表现得像一个普通的Observable。

    ConnectableObservable<Integer> observable = Observable.just(1,2,3).publish();
            observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee", String.format("No1. %d",integer));
                }
            });
            observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) {
                    Log.e("Alee", String.format("No2. %d",integer));
                }
            });
            observable.connect();//调用此方法发射数据
    

    打印结果

    08-22 10:44:03.248 18881-18881/? E/Alee: No1. 1
        No2. 1
    08-22 10:44:03.249 18881-18881/? E/Alee: No1. 2
        No2. 2
        No1. 3
        No2. 3
    

阻塞操作符

在Rxjava1中的BlockingObservable已经在Rxjava2中去掉了,在Rxjava2中已经集成到了Observable中。

官方说明:https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0

  1. blockingForEach

    对Observable发射的每一项数据调用一个方法,会阻塞直到Observable完成

    Observable.just(1,2).blockingForEach(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws InterruptedException {
                    Log.e("Alee", String.format("%d %s",integer,Thread.currentThread().getName()));
                    Thread.sleep(500);
                }
            });
            Log.e("Alee",Thread.currentThread().getName());
    

    打印结果

    08-22 11:15:32.397 20699-20699/com.aleej.demo E/Alee: 1 main
    08-22 11:15:32.898 20699-20699/com.aleej.demo E/Alee: 2 main
    08-22 11:15:33.400 20699-20699/com.aleej.demo E/Alee: main
    
  2. blockingFirst

    阻塞直到Observable发射了一个数据,然后返回第一项数据

  3. blockingMostRecent

    返回一个总是返回Observable最近发射的数据的iterable

  4. blockingLatest

    返回一个iterable,会阻塞直到或者除非Observable发射了一个iterable没有返回的值,然后返回这个值

  5. blockingNext

    返回一个iterable,阻塞直到Observable发射了另外一个值

  6. blockingLast

    阻塞直到Observable终止,然后返回最后一项数据

  7. blockingIterable

    将Observable转换返回一个iterable.

  8. blockingSingle

    如果Observable终止时只发射了一个值,返回那个值,否则抛出异常

  9. blockingSubscribe

    在当前线程订阅,和forEach类似

工具集

  1. materialize

    将Observable转换成一个通知列表。

    Observable.just(1,2,3)
                    .materialize()
                    .subscribe(new Consumer<Notification<Integer>>() {
                        @Override
                        public void accept(Notification<Integer> integerNotification) {
                            Log.e("Alee",integerNotification.toString());
                        }
                    });
    

    打印结果

    08-22 11:40:45.995 21309-21309/com.aleej.demo E/Alee: OnNextNotification[1]
        OnNextNotification[2]
        OnNextNotification[3]
        OnCompleteNotification
    
  2. dematerialize

    将通知逆转回一个Observable。

    Observable.just(1,2,3)
                    .materialize()
                    .dematerialize()
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(Object integer) {
                            Log.e("Alee",integer.toString()); //打印 1 2 3
                        }
                    });
    
  3. timestamp

    给Observable发射的每个数据项添加一个时间戳。

    Observable.just(1,2,3)
                    .timestamp()
                    .subscribe(new Consumer<Timed<Integer>>() {
                        @Override
                        public void accept(Timed<Integer> integerTimed) {
                            Log.e("Alee",integerTimed.toString());
                        }
                    });
    

    打印结果

    Timed[time=1534909593930, unit=MILLISECONDS, value=1]
    Timed[time=1534909593930, unit=MILLISECONDS, value=2]
    Timed[time=1534909593930, unit=MILLISECONDS, value=3]
    
  4. timeInterval

    给Observable发射的两个数据项间添加一个时间差。

    Observable.intervalRange(1L,5L,0L,1L,TimeUnit.SECONDS)
                    .timeInterval()
                    .subscribe(new Consumer<Timed<Long>>() {
                        @Override
                        public void accept(Timed<Long> integerTimed) {
                            Log.e("Alee",integerTimed.toString());
                        }
                    });
    

    打印结果

    08-22 13:52:29.032 25153-25198/com.aleej.demo E/Alee: Timed[time=7, unit=MILLISECONDS, value=1]
    08-22 13:52:30.027 25153-25198/com.aleej.demo E/Alee: Timed[time=995, unit=MILLISECONDS, value=2]
    08-22 13:52:31.027 25153-25198/com.aleej.demo E/Alee: Timed[time=1000, unit=MILLISECONDS, value=3]
    08-22 13:52:32.027 25153-25198/com.aleej.demo E/Alee: Timed[time=1000, unit=MILLISECONDS, value=4]
    08-22 13:52:33.025 25153-25198/com.aleej.demo E/Alee: Timed[time=999, unit=MILLISECONDS, value=5]
    
  5. serialize

    强制一个Observable连续调用并保证行为正确。

    一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompletedonError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的

    RxJava中的实现是serialize,它默认不在任何特定的调度器上执行

  6. cache

    缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者。

  7. observeOn

    指定观察者观察Observable的调度器。

  8. subscribeOn

    指定Observable执行任务的调度器。

  9. doOnEach

    注册一个动作,对Observable发射的每个数据项使用。

  10. doOnCompleted

    注册一个动作,对正常完成的Observable使用。

  11. doOnError

    注册一个动作,对发生错误的Observable使用。

  12. doOnTerminate

    注册一个动作,对完成的Observable使用,无论是否发生错误。

  13. doOnSubscribe

    注册一个动作,在观察者订阅时使用。

  14. doOnDispose

    注册一个动作,在观察者取消订阅时使用。

  15. doAfterTerminate

    注册一个动作,在Observable完成时使用。

  16. delay

    延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)。

  17. delaySubscription

    延时处理订阅请求。

  18. using

    创建一个只在Observable生命周期存在的资源,当Observable终止时这个资源会被自动释放

    Observable.using(new Callable<File>() {
                @Override
                public File call() throws Exception {
                    //创建Observable生命周期内需要的资源
                    File file = new File(getCacheDir(),"alee.txt");
                    if (!file.exists()) {
                        boolean result = file.createNewFile();
                        Log.e("Alee","create");
                    }
                    return file;
                }
            }, new Function<File, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(File file) {
                    return Observable.just(file.exists() ? "exist" : "not exist");
                }
            }, new Consumer<File>() {
                @Override
                public void accept(File file) {
                    if (file.exists()) {
                        file.delete();
                        Log.e("Alee","delete");
                    }
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) {
                    Log.e("Alee",s);
                }
            });
        }
    

    打印结果

    08-22 15:06:57.747 27294-27294/com.aleej.demo E/Alee: create
    08-22 15:06:57.748 27294-27294/com.aleej.demo E/Alee: exist
        delete
    
  19. single

    如果原始Observable只发射了一个数据,则下游接收到该数据;如果一个数据都没发射,则下游接收到默认数据;如果发射了多个数据,否则抛出异常(IllegalArgumentException)。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    emitter.onComplete();
                }
            })
                    .single(9)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) {
                            Log.e("Alee",integer.toString()); //打印 9
                        }
                    });
    

调度器 Scheduler

如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的Observable)在特定的调度器(Scheduler)上执行。

  • Schedulers.computation

    用于计算任务,如事件循环或和回调处理。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • Schedulers.newThread

    开启一个新的线程。

  • Schedulers.io

    主要用于一些耗时操作,比如读写文件,数据库存取,网络交互等。 这个调度器根据需要,增加或者减少线程池中的线程数量。需要注意的是Schedulers.io()中的线程池数量是无限制大的,大量的I/O操作将创建许多线程,我们需要在性能和线程数量中做出取舍。

  • Schedulers.from(executor)

    使用指定的Executor作为调度器。

  • Schedulers.immediate

    在当前线程立即开始执行任务。

  • Schedulers.trampoline

    当其它排队的任务完成后,在当前线程排队开始执行。

  • AndroidSchedulers.mainThread

    Android专用,指定的操作在Android的主线程(UI线程中)运行。

    需要依赖RxAndroid库。

参考资料