Разрабатываю в tinkoff.ru/business
Преподаю в Tinkoff Fintech School
Не пишу в твиттер aalexeev_ru
Observable – наблюдаемая сущность, доставляет изменения подписчикам.
const stream$ = Rx.Observable.create((observer) => {observer.next(1);setTimeout(() => { observer.next(2);}, 1000);setTimeout(() => { observer.next(3);}, 2000);setTimeout(() => { observer.complete();}, 3000);});const subscribtion = stream$.subscribe({next: (v) => logger.textContent += ' ' + v,complete: () => logger.textContent += ' complete!'});
Observable – наблюдаемая сущность, доставляет изменения подписчикам.
const stream$ = Rx.Observable.create((observer) => {observer.next(1);setTimeout(() => { observer.next(2);}, 1000);setTimeout(() => { observer.next(3);}, 2000);setTimeout(() => { observer.complete();}, 3000);});const subscribtion = stream$.subscribe({next: (v) => logger.textContent += ' ' + v,complete: () => logger.textContent += ' complete!'});
Функция создания определяет, наблюдатели (подписчики) будет получать значения.
const stream$ = Rx.Observable.create((observer) => {observer.next(1);setTimeout(() => { observer.next(2);}, 1000);setTimeout(() => { observer.next(3);}, 2000);setTimeout(() => { observer.complete();}, 3000);});const subscribtion = stream$.subscribe({next: (v) => logger.textContent += ' ' + v,complete: () => logger.textContent += ' complete!'});
Observer – сам наблюдатель, состоящий из функций-обработчиков next/error/complete
Их можно перечислить через запятую или задать как объект.
const stream$ = Rx.Observable.create((observer) => {observer.next(1);setTimeout(() => { observer.next(2);}, 1000);setTimeout(() => { observer.next(3);}, 2000);setTimeout(() => { observer.complete();}, 3000);});const subscribtion = stream$.subscribe({next: (v) => logger.textContent += ' ' + v,complete: () => logger.textContent += ' complete!'});
Subscribtion – подписка. Обычно используется для вызова unsubscribe .
const stream$ = Rx.Observable.create((observer) => {observer.next(1);setTimeout(() => { observer.next(2);}, 1000);setTimeout(() => { observer.next(3);}, 2000);setTimeout(() => { observer.complete();}, 3000);});const subscribtion = stream$.subscribe({next: (v) => logger.textContent += ' ' + v,complete: () => logger.textContent += ' complete!'});subscribtion.unsubscribe();
На каждую подписку создается отдельный поток значений
const stream$ = Rx.Observable.interval(500).take(5).map(() => Math.random().toString().substr(2, 3)); // 0.12345 => 123stream$.subscribe(/* ... */);stream$.subscribe(/* ... */);setTimeout(() => {stream$.subscribe(/* ... */);}, 800);
Однако есть Observable, распределяюшие события по всем подписчикам
const buttonElement = document.getElementById('click-button');const START = new Date();const stream$ = Rx.Observable.fromEvent(buttonElement, 'click').map(() => ((new Date() - START) / 1000).toFixed(2));stream$.subscribe(/* ... */);setTimeout(() => {stream$.subscribe(/* ... */);}, 1000);setTimeout(() => {stream$.subscribe(/* ... */);}, 2000);
Эти 2 типа Observable называются холодными и горячими.
Холодные создают независимые потоки под каждую подписку.
Горячие разделяют поток друг с другом
Ну и что?
Иногда надо подогреть холодный Observable. Например, чтобы не делать 2 запроса:
const request$ = this.http.get('https://api.github.com/search/repositories?q=rxjs'); request$.map(data => data.items.map(repo => repo.name)).subscribe(v => console.log(v));request$.map(data => data.items.filter(repo => repo.name === 'rxjs')[0]).map(repo => repo.score).subscribe(v => console.log(v));
Ну и что?
Иногда надо подогреть холодный Observable. Например, чтобы не делать 2 запроса:
const request$ = this.http.get('https://api.github.com/search/repositories?q=rxjs'); request$.map(data => data.items.map(repo => repo.name)).subscribe(v => console.log(v));request$.map(data => data.items.filter(repo => repo.name === 'rxjs')[0]).map(repo => repo.score).subscribe(v => console.log(v));
Объединение Observable и Observer. Передает значение всем подписчикам
const subject = new Rx.Subject();setSubjectTick(subject);subject.subscribe();setTimeout(() => subject.subscribe(/* ... */), 1000);setTimeout(() => subject.subscribe(/* ... */), 2000);
function setSubjectTick(subject) {subject.next(0);setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Объединение Observable и Observer. Передает значение всем подписчикам
const subject = new Rx.Subject();setSubjectTick(subject);subject.subscribe();setTimeout(() => subject.subscribe(/* ... */), 1000);setTimeout(() => subject.subscribe(/* ... */), 2000);
function setSubjectTick(subject) {subject.next(0);setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Объединение Observable и Observer . Передает значение всем подписчикам
const subject = new Rx.Subject();setSubjectTick(subject);subject.subscribe();setTimeout(() => subject.subscribe(/* ... */), 1000);setTimeout(() => subject.subscribe(/* ... */), 2000);
function setSubjectTick(subject) {subject.next(0);setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Пример
const aalexeev = new Subject();for (let i = 0; i < meetupListeners; i++) {meetupListeners[i] = aalexeev.subscribe(/* ... */);}aalexeev.next('Я рассказываю этот доклад');aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
Пример
const aalexeev = new Subject();for (let i = 0; i < meetupListeners; i++) {meetupListeners[i] = aalexeev.subscribe(/* ... */);}aalexeev.next('Я рассказываю этот доклад');aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
Пример
const aalexeev = new Subject();for (let i = 0; i < meetupListeners; i++) {meetupListeners[i] = aalexeev.subscribe(/* ... */);}aalexeev.next('Я рассказываю этот доклад');aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
Пример
aalexeev.next('Если кто-то опоздал и пришел только сейчас');aalexeev.next('он не получит все прошлые значения');const newListener = aalexeev.subscribe(/* ... */);meetupListeners.push(newListener);aalexeev.next('но будет получать все будущие');
Пример
const leaving = Math.floor(Math.random() * meetupListeners); // кто-тоmeetupListeners[leaving].unsubscribe();meetupListeners.splice(leaving, 1);aalexeev.next('Кто только что ушел, не услышит самого главного');
Пример
aalexeev.error('Внимание! В одном из помещений обнаружено задымление. Просьба покинуть помещение!')aalexeev.next('Как же так!'); // это никто не услышит/* __________________ */aalexeev.next('Спасибо всем, кто пришел');aalexeev.complete();aalexeev.next('Кстати, а вот еще...'); // это никто не услышит
Subject не завершает генерацию значений.
const subject = new Rx.Subject();const subscribtion = subject.subscribe(/* ... */);setSubjectTick(subject)setTimeout(() => { subscribtion.unsubscribe(); }, 1000);setTimeout(() => { subject.subscribe(/* ... */); }, 1500);
function setSubjectTick(subject) {subject.next(0);setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Всегда хранит текущее значение. При инициализации требует начальное значение.
При подписке наблюдатель незамедлительно получит текущее значение.
const subject = new Rx.BehaviorSubject('abc');const subscribtion = subject.subscribe(/* ... */);setSubjectTick(subject);setTimeout(() => {subscribtion.unsubscribe();}, 1000);setTimeout(() => {subject.subscribe(/* ... */);}, 1200);
function setSubjectTick(subject) {setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Cохраняет буфер произошедших событий.
При подписке наблюдатель незамедлительно получит текущий буфер.
Нет начального значения, как в Behaviour.
const subject = new Rx.ReplaySubject(2);setSubjectTick(subject);subject.subscribe();setTimeout(() => {subject.subscribe(/* ... */);}, 1200);
function setSubjectTick(subject) {subject.next(1)setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Передает только последнее значение и только после завершения.
Похож на Promise.
const subject = new Rx.AsyncSubject();setSubjectTick(subject);subject.subscribe(/* ... */);setTimeout(() => { subject.subscribe(/* ... */); }, 1000);setTimeout(() => { subject.subscribe(/* ... */); }, 4000);
function setSubjectTick(subject) {setTimeout(() => subject.next(1), 500);setTimeout(() => subject.next(2), 1000);setTimeout(() => subject.next(3), 1500);setTimeout(() => subject.next(4), 2000);setTimeout(() => subject.next(5), 2500);setTimeout(() => subject.complete(), 3000);}
Вернемся к задаче разделения значений между несколькими подписчиками.
С помощью Subject-ов это можно решить вот так:
const subject = new Rx.Subject();Rx.Observable.interval(500).take(5).map(() => Math.random().toString().substr(2, 3)).subscribe(subject);subject.subscribe(/* ... */);subject.subscribe(/* ... */);setTimeout(() => {subject.subscribe(/* ... */);}, 800);
Оператор multicast позволяет не выделять промежуточный Subject, а создать сразу “правильный” Observable.
const connectable$ = Rx.Observable.interval(500).take(5).map(() => Math.random().toString().substr(2, 3)).multicast(new Rx.Subject());setTimeout(() => {connectable$.subscribe(/* ... */);}, 500);setTimeout(() => {connectable$.subscribe(/* ... */);}, 1000);setTimeout(() => {connectable$.subscribe(/* ... */);}, 1500);connectable$.connect();
multicast(new Rx.Subject()) -> publish()
multicast(new Rx.BehaviorSubject(value)) -> publishBehavior(value)
multicast(new Rx.ReplaySubject(buffer)) -> publishReplay(buffer)
multicast(new Rx.AsyncSubject()) -> publishLast()
Запустить получивщийся Observable нужно вызовом connect.
const connectable$ = Rx.Observable.interval(500).take(5).map(() => Math.random().toString().substr(2, 3)).publish();setTimeout(() => {connectable$.subscribe(/* ... */);}, 500);setTimeout(() => {connectable$.subscribe(/* ... */);}, 1000);setTimeout(() => {connectable$.subscribe(/* ... */);}, 1500);connectable$.connect();
Запустить получивщийся Observable нужно вызовом connect.
const connectable$ = Rx.Observable.interval(500).take(5).map(() => Math.random().toString().substr(2, 3)).publish();setTimeout(() => { connectable$.connect(); }, 800);setTimeout(() => { connectable$.subscribe(/* ... */); }, 500);setTimeout(() => { connectable$.subscribe(/* ... */); }, 1000);setTimeout(() => { connectable$.subscribe(/* ... */); }, 1500);
Что не так с этим кодом?
const connectable$ = Rx.Observable.interval(500).take(5).map(() => Math.random().toString().substr(2, 3)).publish();setTimeout(() => { connectable$.connect(); }, 1000);
Используйте refCount для автоматического обновления подписки.
0 -> 1 — старт генерации значений
1 -> 0 — отмена
const stream$ = Rx.Observable.interval(TICK_MS).take(5).publish().refCount();
Используйте refCount для автоматического обновления подписки.
const stream$ = Rx.Observable.interval(TICK_MS).take(5).publish().refCount();const subscription1 = stream$.subscribe();let subscription2;setTimeout(() => {subscription2 = stream$.subscribe(); }, 1100);setTimeout(() => {subscription1.unsubscribe()}, 1100);setTimeout(() => {subscription2.unsubscribe()}, 2100);setTimeout(() => {stream$.subscribe()}, 2100);
publish().refCount() -> share()
const stream$ = Rx.Observable.interval(TICK_MS).take(5)// .publish()// .refCount();.share();