RxJS: Higher Order Observables

RxJS

RxJS
Higher Order Observables

Привет


Андрей Алексеев

👨‍💻 Tinkoff business

👨‍🏫 Tinkoff Fintech School

def

Higher Order Observable

Observable высшего порядка

Observable, значениями которого являются другие Observable

Аналогия массивов

                const mapping = value => [value, value ** 2, value ** 3];
                const mapped = [1, 2, 3].map(item => mapping(item));
                
[[1,1,1],[2,4,8],[3,9,27]] 1, 1, 1, 2, 4, 8, 3, 9, 27 1, 2, 3, 1, 4, 9, 1, 8, 27

Observable из Observable

                of(1, 2, 3)
                    .pipe(
                        map(value => from(mapping(value)))
                    )
                    .subscribe(value => console.log(value))
                
            

Подписка в подписке

                of(1, 2, 3)
                    .pipe(
                        map(value => from(mapping(value)))
                    )
                    .subscribe(observable$ => {
                         observable$
                            .subscribe(value => console.log(value))
                    });
            

Асинхронный вариант

                const mapping = value => [value, value ** 2, value ** 3];
                
const asyncMapping = value => { return interval(100) .pipe( map(i => value ** (i + 1)), take(3) );
asyncMapping(2) .subscribe(value => console.log(value))

Подписка в подписке асинхронно

                of(1, 2, 3)
                    .pipe(
                        map(value => asyncMapping(value))
                    )
                    .subscribe(observable$ => {
                         observable$
                            .subscribe(value => console.log(value))
                    });
            

Подписка в подписке асинхронно

                const subscription = of(1, 2, 3)
                    .pipe(
                        map(value => asyncMapping(value))
                    )
                    .subscribe(observable$ => {
                         observable$
                            .subscribe(value => console.log(value))
                    });
                
setTimeout(() => { subscription.unsubscribe(); console.log('unsubscribe!'); }, 150);

mergeAll

                const subscription = of(1, 2, 3)
                    .pipe(
                        map(value => asyncMapping(value)),
                        mergeAll()
                    )
                    .subscribe(value => console.log(value))
                
setTimeout(() => { subscription.unsubscribe(); console.log('unsubscribe!'); }, 150);

map + mergeAll = mergeMap (aka flatMap)

                const subscription = of(1, 2, 3)
                    .pipe(
                        mergeMap(value => asyncMapping(value))
                    )
                    .subscribe(value => console.log(value))
                
setTimeout(() => { subscription.unsubscribe(); console.log('unsubscribe!'); }, 150);

mergeMap: визуализация

            timer(0, 1500).pipe(
                take(3),
                map((value) => 'abc'[value]), // a → b → c
                mergeMap(value => {
                    return timer(0, 450).pipe(
                        take(6),
                            map(innerValue => value + innerValue)); // x0 → x1 → x2 → ...
                })
            ).subscribe(renderValue);
        

mergeMap: selector

                of(1, 2, 3)
                    .pipe(
                        mergeMap(
                            value => asyncMapping(value),
                            (outValue, inValue, outIndex, inIndex) => {
                                console.log(`${outValue} → ${inValue}` +
                                ` | ${outIndex} --- ${inIndex}`);
                                return inValue;
                            }
                    )
                    .subscribe(value => console.log(value))
            

mergeMap: counter

                of(1, 2, 3)
                    .pipe(
                        mergeMap(value => asyncMapping(value))
                        toArray()
                    .subscribe(value => console.log(value))
            

mergeMap: counter

                of(1, 2, 3)
                    .pipe(
                        mergeMap(value => asyncMapping(value))
                        toArray()
                    .subscribe(value => console.log(value))
                
of(1, 2, 3) .pipe( mergeMap(value => asyncMapping(value), 2) toArray() .subscribe(value => console.log(value))

mergeMap: counter

                of(1, 2, 3)
                    .pipe(
                        mergeMap(value => asyncMapping(value), 1)
                        toArray()
                    .subscribe(value => console.log(value))
            

concatMap

                of(1, 2, 3)
                    .pipe(
                        concatMap(value => asyncMapping(value))
                    .subscribe(value => console.log(value))
            

switchMap

                of(1, 2, 3)
                    .pipe(
                        switchMap(value => asyncMapping(value))
                    .subscribe(value => console.log(value))
            

SwitchMap marble

Rx Marble Diagram: switchMap

switchMap: визуализация

            timer(0, 1500).pipe(
                take(3),
                map((value) => 'abc'[value]), // a → b → c
                switchMap(value => {
                    return timer(0, 450).pipe(
                        take(6),
                            map(innerValue => value + innerValue)); // x0 → x1 → x2 → ...
                })
            ).subscribe(renderValue);
        

SwitchMap и отписки

SwitchMap автоматически отписывается во внутренних Observable — вызывает subscription.unsubscribe()

Можем написать отменяемые запросы:

Пример: отменяемый fetch

            const request$ = Observable.create((observer) => {
            
            
            
            
            });
        

SwitchMap и запросы


Отмена fetch-запросов

exhaustMap

                of(1, 2, 3)
                    .pipe(
                        exhaustMap(value => asyncMapping(value))
                    .subscribe(value => console.log(value))
            

mergeMap
concatMap
switchMap
exhaustMap

Фейковый запрос

                const request = value => {
                    const randomTimeout = 100 + Math.floor(Math.random() * 1000);
                    const randomResponse = `${value}_${randomTimeout}`;
                
                    return of(randomResponse).pipe(delay(randomTimeout));
                };
                
request(1) .subscribe(value => console.log(value))

mergeMap

                of(1, 2, 3)
                    .pipe(
                        mergeMap(value => request(value))
                    .subscribe(value => console.log(value))
            

concatMap

                of(1, 2, 3)
                    .pipe(
                        concatMap(value => request(value))
                    .subscribe(value => console.log(value))
            

switchMap

                of(1, 2, 3)
                    .pipe(
                        switchMap(value => request(value))
                    .subscribe(value => console.log(value))
            

exhaustMap

                of(1, 2, 3)
                    .pipe(
                        exhaustMap(value => request(value))
                    .subscribe(value => console.log(value))
            

Итого

mergeMap
данные нельзя потерять, порядок не важен
concatMap 
данные нельзя потерять, порядок важен
switchMap 
устаревшие данные потеряются.
Удобен из-за автоотписки. Опасен из-за автоотписки
(например, create / update / delete операции)
exhaustMap
надо игнорировать внешний поток до наступления какого-то события (например, авторизации)

Рецепты

Цепочки запросов

                of('цепочка')
                    .pipe(
                        switchMap(value => request(value)),
                        switchMap(anotherValue => request(anotherValue))
                    )
                    .subscribe(value => console.log(value));
            

Обработка ошибок

                const requestWithError = value => {
                    if (value % 2 === 0) {
                        return throwError('💩');
                    }
                 
                    return request(value); // <= 1100 ms
                };
                
const source$ = interval(1200).pipe( take(3), map(v => v + 1) );

Обработка ошибок

                source$
                    .pipe(
                        switchMap(value => requestWithError(value))
                    )
                    .subscribe(
                        value => console.log(value),
                        error => console.log(error)
                    );
            

Обработка ошибок

                source$
                    .pipe(
                        switchMap(value => requestWithError(value)),
                        catchError(error => {
                            console.log(`catch ${error}`);
                            return EMPTY;
                        })
                    )
                    .subscribe(
                        value => console.log(value),
                        error => console.log(error)
                    );
            

Обработка ошибок

                source$
                    .pipe(
                        switchMap(value => {
                            return requestWithError(value)
                                .pipe(catchError(error => {
                                    console.log(`catch ${error}`);
                                    return EMPTY;
                                }));
                        })
                    )
                    .subscribe(
                        value => console.log(value),
                        error => console.log(error)
                    );
            

Отписка

                const destroy$ = new Subject();
                of(1)
                    .pipe(
                        takeUntil(destroy$),
                        switchMap(value => request(value))
                    )
                    .subscribe({
                        next: value => console.log(value),
                        complete: () => console.log('complete')
                    });
                console.log('destroy');
                destroy$.next();
            

Отписка

                const destroy$ = new Subject();
                of(1)
                    .pipe(
                        switchMap(value => request(value)),
                        takeUntil(destroy$)
                    )
                    .subscribe({
                        next: value => console.log(value),
                        complete: () => console.log('complete')
                    });
                console.log('destroy');
                destroy$.next();
            

Условные ветвления

                let flag = true;
                
interval(1200).pipe(take(3),map(v => v + 1)); .pipe( switchMap(value => { console.log(`value: ${value}, flag: ${flag}`); return flag ? request(value) : EMPTY }) ) .subscribe(value => console.log(value));
setTimeout(() => { flag = false; }, 1200); setTimeout(() => { flag = true; }, 2400);

Итоги

Спасибо!

github.com/aalexeev239/rxjs-hoo

https://github.com/aalexeev239/rxjs-hoo

Бонус: window*

Механизм противодавления (backpressure)

                const sourceInterval$ = interval(250).pipe(take(20));
                const windowInterval$ = interval(1000);
                sourceInterval$
                    .pipe(
                        window(windowInterval$)
                        map(win => win.pipe(take(2)))
                        mergeAll()
                    )
                    .subscribe(value => console.log(value));
            

Бонус: groupBy

                const people = [
                    {name: 'Sue', age: 25},
                    {name: 'Joe', age: 30},
                    {name: 'Sarah', age: 35},
                    {name: 'Frank', age: 25},
                ];
                
interval(1000).pipe(take(4), map(x => people(x)); .pipe( groupBy(person => person.age) mergeMap(group => group .pipe(map(({name}) => name), toArray())) ) .subscribe(value => console.log(value));