Combining Observables
Trong bài viết này mình sẽ cùng đi qua các toán tử kết hợp trong Rx thường được sử dụng để xem cách chúng hoạt động nhé.
startWith
Toán tử startWith phát ra item được chỉ định trước khi bắt đầu phát các item từ Observable.
Hoạt động với :
Flowable
,Observable
1
2
3
4
Observable<String> names = Observable.just("Spock", "McCoy");
names.startWith("Kirk").subscribe(item -> System.out.println(item));
// Output : Kirk, Spock, McCoy
merge
Toán tử merge Kết hợp các Observable lại với nhau thành một bằng cách hợp các emit của chúng lại.
Hoạt động với :
Flowable
,Observable
,Maybe
,Single
,Completable
Nếu bất kì source nào trong chuỗi xảy ra lỗi,
onError()
sẽ được bắn ra ngay và observable sẽ bị terminate.
1
2
3
4
5
Observable.just(1, 2, 3)
.mergeWith(Observable.just(4, 5, 6))
.subscribe(item -> System.out.println(item));
// Output : 1, 2, 3, 4, 5, 6
mergeDelayError
- Tương tự với merge, mergeDelayError cũng kết hợp các Observable lại với nhau thành một bằng cách hợp các emit của chúng lại.
- Nếu bất kì source nào trong chuỗi xảy ra lỗi,
onError()
sẽ được giữ lại cho đến khi quá trình merge hoàn thành. Đây chính là điểm khác biệt so với toán tử merge. - Hoạt động với :
Flowable
,Observable
,Maybe
,Single
,Completable
1
2
3
4
5
6
7
Observable<String> observable1 = Observable.error(new IllegalArgumentException(""));
Observable<String> observable2 = Observable.just("Four", "Five", "Six");
Observable.mergeDelayError(observable1, observable2)
.subscribe(item -> System.out.println(item));
// emits 4, 5, 6 and then the IllegalArgumentException (in this specific
// example, this throws an `OnErrorNotImplementedException`).
zip
Toán tử zip kết hợp tập các item được phát ra bởi 2 hay nhiều Observable lại thông qua một function chỉ định và phát ra các item dựa trên kết quả của function này.
- zip phát lượng item bằng lượng item được phát ra bởi source phát ít item nhất.
- Hoạt động với :
Flowable
,Observable
,Maybe
,Single
1
2
3
4
5
6
Observable<String> firstNames = Observable.just("A", "B", "C");
Observable<String> lastNames = Observable.just("1", "2", "3");
firstNames.zipWith(lastNames, (first, last) -> first + " " + last)
.subscribe(item -> System.out.println(item));
// Output: A 1, B 2, C 3
combineLatest
Khi một item được phát ra bởi một trong 2 source, nó sẽ kết hợp với item cuối cùng được phát bởi source còn lại thông qua function chỉ định và sẽ phát ra item dựa trên kết quả của function này.
Hoạt động với :
Flowable
,Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<Long> newsRefreshes = Observable.interval(100, TimeUnit.MILLISECONDS);
Observable<Long> weatherRefreshes = Observable.interval(50, TimeUnit.MILLISECONDS);
Observable.combineLatest(newsRefreshes, weatherRefreshes,
(newsRefreshTimes, weatherRefreshTimes) ->
"Refreshed news " + newsRefreshTimes + " times and weather " + weatherRefreshTimes)
.subscribe(item -> System.out.println(item));
// prints:
// Refreshed news 0 times and weather 0
// Refreshed news 0 times and weather 1
// Refreshed news 0 times and weather 2
// Refreshed news 1 times and weather 2
// Refreshed news 1 times and weather 3
// Refreshed news 1 times and weather 4
// Refreshed news 2 times and weather 4
// Refreshed news 2 times and weather 5
// ...
Conclusion
Chúc ae coding vui vẻ :trollface: :trollface: :trollface: