RxJS: введение

RxJS

RxJS
введение

Привет


Андрей Алексеев

👨‍💻 Tinkoff business

👨‍🏫 Tinkoff Fintech School

О чем поговорим

Реактивный подход

A1 = 1; B1 = 3; C1 = A1 + B1 = 4

A1 = 1; B1 = 3;
C1 = A1 + B1;

Реактивный подход

A1 = 1; B1 = 3; C1 = A1 + B1 = 4

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

Реактивный подход

A1 = 2; B1 = 3; C1 = A1 + B1 = 4

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;

Реактивный подход

A1 = 2; B1 = 3; C1 = A1 + B1 = 5

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 5;

Реактивный подход

Push-стратегия

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 5;

Императивный подход

Push-стратегия

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 5;

Pull-статегия

A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;

A1 = 2;
C1 ~ 4;

Push / Pull

Push value

Единственное значение

Promise — асинхронное значение

Асинхронное значение — Promise

Множественное значение

Множественное значение

Iterable

Множественное значение - Iterable

И множественное, и асинхронное?

Асинхронное и множественное значение

Observable

Observable

Поток событий

Поток событий

Поток кликов

Поток событий

Поток кликов

Поток событий

Поток кликов

Поток событий

Поток кликов

Поток событий

Поток кликов

Поток событий

            from([{x: 11, y: 99}, {x: 91, y: 151}, {x: 152, y: 106}]).pipe(
                map((value) => { ... }),
                filter((value) => { ... }),
                reduce((acc, curr) => { ... })
            
        

Поток событий
~
массив, распределенный во времени

Observable
Observer pattern + Iterator pattern

Observer pattern

                class Subject {
                
                    add(observer) {...}
                    remove(observer) {...}
                
            

Observer pattern

Observer pattern

Observer pattern

Субъект изменяет значения.
Также называется Observable, наблюдаемый объект.

Наблюдатель (Observer) реагирует на изменения.

Iterator pattern

                class Iterator {
                
                
            

Iterator pattern

Iterator pattern

Observable

Observable

            const stream$ = Observable.create((observer) => {
                    observer.next(1);
                    setTimeout(() => { observer.next(2);},    1000);
                    setTimeout(() => { observer.next(3);},    2000);
                    setTimeout(() => { observer.complete();}, 3000);
                });
                const subscription = stream$.subscribe({
                    next: (value) => renderNext(value),
                    error: (error) => renderError(error),
                    complete: () => renderComplete()
                });
        

Observable

Observable – наблюдаемый объект, доставляет изменения в подписку.

            const stream$ = Observable.create((observer) => {...});
             
            const subscription = stream$.subscribe(...);
        

Observable

Observer – объект-наблюдатель,
опциональные обработчики next/error/complete

            const subscription = stream$.subscribe(
                {
                    next: (value) => renderNext(value),
                    error: (error) => renderError(error),
                    complete: () => renderComplete()
                }
            );
            
        

Observable

Функция создания определяет,
как наблюдатель будет получать значения.

            const stream$ = Observable.create((observer) => {
                observer.next(1);
                setTimeout(() => { observer.next(2);},    1000);
                setTimeout(() => { observer.next(3);},    2000);
            
            
stream$.subscribe(renderNext);

Observable

            const stream$ = Observable.create((observer) => {
                observer.next(1);
                setTimeout(() => { observer.next(2);},    1000);
                setTimeout(() => { observer.next(3);},    2000);
                setTimeout(() => { observer.complete();}, 3000);
            });
            
const promise = new Promise((resolve, reject) => { resolve(1); });

Observable

Subscribtion – подписка.
Старт получения значений.

            const stream$ = Observable.create((observer) => {...});
             
            const subscription = stream$.subscribe(...);
            subscription.unsubscribe();
        

Observable

unsubscribe – завершение получения значений.

            const stream$ = Observable.create((observer) => {
                const interval = setInterval(() => observer.next(1), 1000);
            
            });
             
            const subscription = stream$.subscribe(...);
            subscription.unsubscribe();
        

Observable

Нет подписки — нет потока

            const stream$ = Observable.create((observer) => {...});
            
        

Observable

Каждый observer создаст свой поток значений
Подписки не зависят друг от друга

            const stream$ = Observable.create((observer) => {...});
            
            const subscriptionB = stream$.subscribe(observerBC);
            const subscriptionC = stream$.subscribe(observerBC);
        

Observable

            
            
        

Ок, что дальше?

Дальше — 120+ операторов. 120, Карл!

Cоздание Observable

            import {of} from 'rxjs';
            of('foo');
            // ---foo-|
            
            
        

Cоздание Observable

            import {from} from 'rxjs';
            const promise = new Promise((resolve) => { resolve('foo')});
            from(promise);
            // ----------------foo-|
            
        

Операторы преобразования

            import {take, filter} from 'rxjs/operators';
            timer(0, 500).pipe(
                take(12)
            ).subscribe(renderNext);
            
        

Pipe

            .pipe(
        
pipe в реальном мире
            ).subscribe(...)
        

Rx Marble Diagram

Rx Marble Diagram: filter
            ---2--30--22--5--60--1-----|
            filter(x => x > 10)         
            ------30--22-----60--------|
        

Операторы преобразования

            timer(0, 250).pipe(
                take(40),
                filter(() => Math.random() < 0.3),
                debounceTime(500)
            ).subscribe(renderNext);
        

Комбинирование: merge

            const streamA$ = timer(0, 1500).pipe(take(4));
            const streamB$ = timer(800, 1000).pipe(
                take(6),
                map((i) => 'abcdef'[i])
            );
            merge(streamA$, streamB$)
                .subscribe(renderNext);
        

merge


Пример merge

merge


Пример merge

Комбинирование: combineLatest

            const streamA$ = timer(0, 1500).pipe(take(4));
            const streamB$ = timer(800, 1000).pipe(
                take(6),
                map((i) => 'abcdef'[i])
            );
            combineLatest(streamA$, streamB$).pipe(
                map(([a, b]) => a + b)
            ).subscribe(renderNext);
        

combineLatest


Пример combineLatest

Операторы высшего порядка: mergeMap

            timer(0, 1500).pipe(
                take(3),
                map((i) => 'abcd'[i]),
                mergeMap(letter => {
                    return timer(0, 625).pipe(
                        take(6),
                        map(digit => letter + digit));
                })
            ).subscribe(renderNext);
        

Операторы высшего порядка: switchMap

            timer(0, 1500).pipe(
                take(3),
                map((i) => 'abcd'[i]),
                switchMap(letter => {
                    return timer(0, 625).pipe(
                        take(6),
                        map(digit => letter + digit));
                })
            ).subscribe(renderNext);
        

mergeMap vs switchMap

a
b
c
0
1
2
3
4
5

mergeMap

a0
a1
a2
b0
a3
b1
a4
b2
c0
a5
b3
c1
b4
c2
b5
c3
c4

switchMap

a0
a1
a2
b0
b1
b2
c0
c1
c2
c3
c4

SwitchMap

Promise <--> Observable

            const getPromise = (value) => new Promise((resolve) => {
                setTimeout(() => {
                    resolve(value);
                }, 1500);
            });
             
            from(getPromise('⏰'))
                .subscribe(renderNext);
        

SwitchMap + Promise

            const streamA$ = Observable.create((observer) => {
                observer.next(1);
                setTimeout(() => { observer.next(2);}, 2000);
                setTimeout(() => { observer.next(3);}, 3000);
                setTimeout(() => { observer.complete()}, 4000);
            });
            
        

SwitchMap и отписки

SwitchMap автоматически отписывает внутренний поток — вызывает subscription.unsubscribe()

Можем написать отменяемые запросы:

Пример: отменяемый fetch

            const request$ = Observable.create((observer) => {
            
            
            
            
        

SwitchMap и запросы


Отмена fetch-запросов

ДЕМО

Демо — поиск

демо

Демо: поток ввода

Демо: дебаунс

Демо: фильтрация совпадений

Демо: запрос данных

Демо: лоадер

Демо: результаты

Демо: результаты

Работаем с потоками,
а не с промежуточными состояниями

SUBJECTS

1 подписка - 1 поток значений

            const stream$ = timer(0, 500).pipe(
                    take(5),
                    map(() => random()));
                stream$.subscribe(/* ... */);
                stream$.subscribe(/* ... */);
                
        

Subject

Объединение Observable и Observer.
Передает значение всем подписчикам

                const subject = new Subject();
                 
                subject.next(random());
                setTimeout(() => subject.next(random()), 1000);
                setTimeout(() => subject.next(random()), 2000);
                setTimeout(() => subject.next(random()), 3000);
                setTimeout(() => subject.next(random()), 4000);
                setTimeout(() => subject.complete(), 5000);
            

Subject

                const subject = new Subject();
                setSubjectTick(subject);
                 
                subject.subscribe();
                setTimeout(() => subject.subscribe(/* ... */), 1500);
                setTimeout(() => subject.subscribe(/* ... */), 2500);
        

Пример - автоотписка

            const subscription = streamA$.pipe(
                map(...),
                switchMap(...)
            ).subscribe(...);
            
        

Пример - автоотписка

            const destroy$ = new Subject();
            
            
        

Пример - авторизация

            class StateService {
              private authSubject = new BehaviorSubject<boolean>(false);
            
        

Пример - авторизация

            class StateService {
              private authSubject = new BehaviorSubject<boolean>(false);
              setAuthState(state: boolean) {...}
            
        

Пример - авторизация

            class StateService {
              private authSubject = new BehaviorSubject<boolean>(false);
              setAuthState(state: boolean) {...}
              getStateChange(): Observable<boolean> {...}
             
            
        

ИТОГИ

Итоги

Спасибо!


https://github.com/aalexeev239/rxjs-intro

github.com/aalexeev239/rxjs-intro