RxJS — доступно, в деталях и на практике

RxJS

RxJS
доступно, в деталях и на практике

Привет


Андрей Алексеев

👨‍💻 Tinkoff business

👨‍🏫 Tinkoff Fintech School

О чем поговорим

Реактивный подход

A1 = 1; B1 = 3; C1 = A1 + B1 = 4

A1 = 1; B1 = 3;
C1 = A1 + B1;

Реактивный подход

A1 = 1; B1 = 3; C1 = A1 + B1 = 4

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

Реактивный подход

A1 = 2; B1 = 3; C1 = A1 + B1 = 4

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;

Реактивный подход

A1 = 2; B1 = 3; C1 = A1 + B1 = 5

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 5;

Реактивный подход

Push-стратегия

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 5;

Императивный подход

Push-стратегия

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 5;

Pull-статегия

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 4;

Push / Pull

Push value

Единственное значение

Promise — асинхронное значение

Асинхронное значение — Promise

Множественное значение

Множественное значение

Iterable

Множественное значение - Iterable

И множественное, и асинхронное?

Асинхронное и множественное значение

Observable

Observable

Поток событий

Поток событий

Поток кликов

2 часа созерцания

Поток кликов

3 часа созерцания

Поток кликов

4 часа созерцания

Поток кликов

4 часа и 10 секунд созерцания

Поток кликов

Поток событий

            Rx.Observable
                .from([{x: 11, y: 99}, {x: 91, y: 151}, {x: 152, y: 106}, {x: 274, y: 84}])
                .map((value) => { ... })
                .filter((value) => { ... })
                .reduce((acc, curr) => { ... })
            
        

Поток событий
~
массив, распределенный во времени

Observable
Observer pattern + Iterator pattern

Observer pattern

                class Subject {
                
                    add(observer) {...}
                    remove(observer) {...}
                
                }
            

Observer pattern


Observer

Observer pattern

Субъект изменяет значения.
Также называется Observable, наблюдаемый объект.

Наблюдатель (Observer) реагирует на изменения.

Iterator pattern

                class Iterator {
                    constructor(list) {
                        this.list = list;
                        this.cursor = 0;
                    }
                 
                    next() {...}
                    hasNext() {...}
                }
            

Iterator pattern

Observer

Observable

Observable

                const stream$ = Rx.Observable.create((observer) => {
                    observer.next(1);
                    setTimeout(() => { observer.next(2);},    1000);
                    setTimeout(() => { observer.next(3);},    2000);
                    setTimeout(() => { observer.complete();}, 3000);
                });
                const subscribtion = stream$.subscribe({
                    next: (value) => renderNext(value),
                    error: (error) => renderError(error),
                    complete: () => renderComplete()
                });
        

Observable

Observable – сам наблюдаемый объект, доставляет изменения в подписку.

            const stream$ = Rx.Observable.create((observer) => {...});
             
            const subscribtion = stream$.subscribe(...);
        

Observable

Observer – объект-наблюдатель,
опциональные обработчики next/error/complete

            const subscribtion = stream$.subscribe(
                {
                    next: (value) => renderNext(value),
                    error: (error) => renderError(error),
                    complete: () => renderComplete()
                }
            );
            stream$.subscribe((value) => renderNext(value));
            stream$.subscribe(renderNext);
        

Observable

Функция создания определяет, как наблюдатель будет получать значения.

            const stream$ = Rx.Observable.create((observer) => {
                observer.next(1);
                setTimeout(() => { observer.next(2);},    1000);
                setTimeout(() => { observer.next(3);},    2000);
                setTimeout(() => { observer.complete();}, 3000);
            });
            stream$.subscribe(renderNext);
        

Observable

Subscribtion – подписка. Старт получения значений.
unsubscribe – завершение получения значений.

            
            const stream$ = Rx.Observable.create((observer) => {...});
             
            const subscribtion = stream$.subscribe(...);
            subscribtion.unsubscribe();
        

Observable

Подписки не зависят друг от друга.

            const stream$ = Rx.Observable.create((observer) => {...});
             
            const subscribtionA = stream$.subscribe(observerA);
            const subscribtionB = stream$.subscribe(observerBC);
            const subscribtionC = stream$.subscribe(observerBC);
        

Observable

1
2
3
                const stream$ = Rx.Observable.create((observer) => {
                    observer.next(1);
                    setTimeout(() => { observer.next(2);},    1000);
                    setTimeout(() => { observer.next(3);},    2000);
                    setTimeout(() => { observer.complete();}, 3000);
                });
                const subscribtion = stream$.subscribe({
                    next: (value) => renderNext(value),
                    error: (error) => renderError(error),
                    complete: () => renderComplete()
                });
        

Ок, что дальше?

Дальше — 127+ операторов. 127, Карл!

Cоздание Observable

            Rx.Observable.of('foo');
            // ---foo---|
            
            
        

Cоздание Observable

            const promise = new Promise((resolve) => { resolve('foo')});
            Rx.Observable.fromPromise(promise);
            // ----------------foo---|
            
        

Операторы преобразования

            Rx.Observable.timer(0, 500)
                .take(12)
                .subscribe(renderNext);
            
        

Rx Marble Diagram

Rx Marble Diagram: filter
            ---2--30--22--5--60--1-----|
            filter(x => x > 10)         
            ------30--22-----60--------|
        

Операторы преобразования

            Rx.Observable.timer(0, 250)
                .take(40)
                .filter(() => Math.random() < 0.3)
                .debounceTime(500)
                .subscribe(renderNext);
        

Комбинирование: merge

            const streamA$ = Rx.Observable.timer(0, 1500).take(4);
            const streamB$ = Rx.Observable.timer(800, 1000).take(6)
                .map((value) => 'abcdef'[value]);
             
            Rx.Observable.merge(streamA$, streamB$)
                .subscribe(renderNext);
        

Комбинирование: combineLatest

            const streamA$ = Rx.Observable.timer(0, 1500).take(4);
            const streamB$ = Rx.Observable.timer(800, 1000).take(6)
                .map((value) => 'abcdef'[value]);
             
            Rx.Observable.combineLatest(streamA$, streamB$)
                .map(([a, b]) => a + b)
                .subscribe(renderNext);
        

Операторы высшего порядка: mergeMap

            Rx.Observable.timer(0, 1500)
                .take(3)
                .map((value) => 'abcd'[value])
            
        

Операторы высшего порядка: switchMap

            Rx.Observable.timer(0, 1500)
                .take(3)
                .map((value) => 'abcd'[value])
                .switchMap(value => {
                    return Rx.Observable.timer(0, 450).take(6)
                        .map(innerValue => value + innerValue);
                })
                .subscribe(renderNext);
        

Rx Marble Diagram

Rx Marble Diagram: switchMap

SwitchMap
is the new black

Promise <--> Observable

            const getPromise = (value, time) => new Promise((resolve) => {
                setTimeout(() => {
                    resolve(value);
                }, time);
            });
             
            
        

SwitchMap + Promise

            const streamA$ = Rx.Observable.create((observer) => {
                observer.next(1);
                setTimeout(() => { observer.next(2);}, 2000);
                setTimeout(() => { observer.next(3);}, 3000);
                setTimeout(() => { observer.complete()}, 4000);
            })
            const streamB$ = streamA$
                .switchMap((value) => getPromise(value, 1500));
        

SwitchMap и отписки

SwitchMap автоматически отписывает внутренний поток — вызывает subscription.unsubscribe()

Можем написать отменяемые запросы:

Пример: отменяемый fetch

            const request$ = Rx.Observable.create((observer) => {
            
            
            
            
            });
        

SwitchMap и запросы


Отмена fetch-запросов

ДЕМО

Демо — поиск

демо

Демо: поток ввода

Демо: дебаунс

Демо: фильтрация совпадений

Демо: запрос данных

Демо: лоадер

Демо: результаты

Демо: результаты

SUBJECTS

1 подписка - 1 поток значений

                const stream$ = Rx.Observable.interval(500)
                    .take(5)
                    .map(() => random());
                stream$.subscribe(/* ... */);
                stream$.subscribe(/* ... */);
                
        

Subject

Объединение Observable и Observer. Передает значение всем подписчикам

                const subject = new Rx.Subject();
                 
                subject.next(0);
                setTimeout(() => subject.next(1), 1000);
                setTimeout(() => subject.next(2), 2000);
                setTimeout(() => subject.next(3), 3000);
                setTimeout(() => subject.next(4), 4000);
                setTimeout(() => subject.complete(), 5000);
            

Subject

                const subject = new Rx.Subject();
                setSubjectTick(subject);
                 
                subject.subscribe();
                setTimeout(() => subject.subscribe(/* ... */), 1500);
                setTimeout(() => subject.subscribe(/* ... */), 2500);
        

Разновидности Subject

  1. Subject

Пример - автоотписка

            const subscription = streamA$
                .map(...)
                .switchMap(...)
                .subscribe(...);
            
        

Пример - автоотписка

            const destroyStream = new Subject();
            
            
        

Пример - авторизация

            @Injectable()
            export class StateService {
              private authSubject = new BehaviorSubject<boolean>(false);
            
            }
        

Пример - авторизация

            @Injectable()
            export class StateService {
              private authSubject = new BehaviorSubject<boolean>(false);
              setAuthState(state: boolean) {...}
            
            }
        

Пример - авторизация

            @Injectable()
            export class StateService {
              private authSubject = new BehaviorSubject<boolean>(false);
              setAuthState(state: boolean) {...}
              getStateChange(): Observable<AuthState> {...}
             
            
            }
        

Пример - кеширование

            let cachingSubject;
             
            getResource() {
            
            
            }
        

Итоги

Спасибо!

github.com/aalexeev239/rxjs-stachka-2018

goo.gl/RWjkDe

https://github.com/aalexeev239/rxjs-stachka-2018