Posts [RxJava] Combining Observables
Post
Cancel

[RxJava] Combining Observables

Combining Observables

Trong bài viết này mình sẽ cùng đi qua các toán tử kết hợp trong Rx thường được sử dụng để xem cách chúng hoạt động nhé.

startWith

alt text

  • Toán tử startWith phát ra item được chỉ định trước khi bắt đầu phát các item từ Observable.

  • Hoạt động với : Flowable, Observable

1
2
3
4
Observable<String> names = Observable.just("Spock", "McCoy");
names.startWith("Kirk").subscribe(item -> System.out.println(item));

// Output : Kirk, Spock, McCoy

merge

alt text

  • Toán tử merge Kết hợp các Observable lại với nhau thành một bằng cách hợp các emit của chúng lại.

  • Hoạt động với : Flowable, Observable, Maybe, Single, Completable

  • Nếu bất kì source nào trong chuỗi xảy ra lỗi, onError() sẽ được bắn ra ngay và observable sẽ bị terminate.

1
2
3
4
5
Observable.just(1, 2, 3)
    .mergeWith(Observable.just(4, 5, 6))
    .subscribe(item -> System.out.println(item));

// Output : 1, 2, 3, 4, 5, 6

mergeDelayError

alt text

  • Tương tự với merge, mergeDelayError cũng kết hợp các Observable lại với nhau thành một bằng cách hợp các emit của chúng lại.
  • Nếu bất kì source nào trong chuỗi xảy ra lỗi, onError() sẽ được giữ lại cho đến khi quá trình merge hoàn thành. Đây chính là điểm khác biệt so với toán tử merge.
  • Hoạt động với : Flowable, Observable, Maybe, Single, Completable
1
2
3
4
5
6
7
Observable<String> observable1 = Observable.error(new IllegalArgumentException(""));
Observable<String> observable2 = Observable.just("Four", "Five", "Six");
Observable.mergeDelayError(observable1, observable2)
        .subscribe(item -> System.out.println(item));

// emits 4, 5, 6 and then the IllegalArgumentException (in this specific
// example, this throws an `OnErrorNotImplementedException`).

zip

alt text

  • Toán tử zip kết hợp tập các item được phát ra bởi 2 hay nhiều Observable lại thông qua một function chỉ định và phát ra các item dựa trên kết quả của function này.

  • zip phát lượng item bằng lượng item được phát ra bởi source phát ít item nhất.
  • Hoạt động với : Flowable, Observable, Maybe, Single
1
2
3
4
5
6
Observable<String> firstNames = Observable.just("A", "B", "C");
Observable<String> lastNames = Observable.just("1", "2", "3");
firstNames.zipWith(lastNames, (first, last) -> first + " " + last)
    .subscribe(item -> System.out.println(item));

// Output: A 1, B 2, C 3

combineLatest

alt text

  • Khi một item được phát ra bởi một trong 2 source, nó sẽ kết hợp với item cuối cùng được phát bởi source còn lại thông qua function chỉ định và sẽ phát ra item dựa trên kết quả của function này.

  • Hoạt động với : Flowable, Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<Long> newsRefreshes = Observable.interval(100, TimeUnit.MILLISECONDS);
Observable<Long> weatherRefreshes = Observable.interval(50, TimeUnit.MILLISECONDS);
Observable.combineLatest(newsRefreshes, weatherRefreshes,
    (newsRefreshTimes, weatherRefreshTimes) ->
        "Refreshed news " + newsRefreshTimes + " times and weather " + weatherRefreshTimes)
    .subscribe(item -> System.out.println(item));

// prints:
// Refreshed news 0 times and weather 0
// Refreshed news 0 times and weather 1
// Refreshed news 0 times and weather 2
// Refreshed news 1 times and weather 2
// Refreshed news 1 times and weather 3
// Refreshed news 1 times and weather 4
// Refreshed news 2 times and weather 4
// Refreshed news 2 times and weather 5
// ...

Conclusion

Chúc ae coding vui vẻ :trollface: :trollface: :trollface: