Разрабатываю в tinkoff.ru/business
Преподаю в Tinkoff Fintech School
Не пишу в твиттер aalexeev_ru
Observable – наблюдаемая сущность, доставляет изменения подписчикам.
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (v) => logger.textContent += ' ' + v,
complete: () => logger.textContent += ' complete!'
});
Observable – наблюдаемая сущность, доставляет изменения подписчикам.
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (v) => logger.textContent += ' ' + v,
complete: () => logger.textContent += ' complete!'
});
Функция создания определяет, наблюдатели (подписчики) будет получать значения.
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (v) => logger.textContent += ' ' + v,
complete: () => logger.textContent += ' complete!'
});
Observer – сам наблюдатель, состоящий из функций-обработчиков next/error/complete
Их можно перечислить через запятую или задать как объект.
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (v) => logger.textContent += ' ' + v,
complete: () => logger.textContent += ' complete!'
});
Subscribtion – подписка. Обычно используется для вызова unsubscribe .
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (v) => logger.textContent += ' ' + v,
complete: () => logger.textContent += ' complete!'
});
subscribtion.unsubscribe();
На каждую подписку создается отдельный поток значений
const stream$ = Rx.Observable.interval(500)
.take(5)
.map(() => Math.random().toString().substr(2, 3)); // 0.12345 => 123
stream$.subscribe(/* ... */);
stream$.subscribe(/* ... */);
setTimeout(() => {
stream$.subscribe(/* ... */);
}, 800);
Однако есть Observable, распределяюшие события по всем подписчикам
const buttonElement = document.getElementById('click-button');
const START = new Date();
const stream$ = Rx.Observable.fromEvent(buttonElement, 'click')
.map(() => ((new Date() - START) / 1000).toFixed(2));
stream$.subscribe(/* ... */);
setTimeout(() => {stream$.subscribe(/* ... */);}, 1000);
setTimeout(() => {stream$.subscribe(/* ... */);}, 2000);
Эти 2 типа Observable называются холодными и горячими.
Холодные создают независимые потоки под каждую подписку.
Горячие разделяют поток друг с другом
Ну и что?
Иногда надо подогреть холодный Observable. Например, чтобы не делать 2 запроса:
const request$ = this.http.get
('https://api.github.com/search/repositories?q=rxjs');
request$
.map(data => data.items.map(repo => repo.name))
.subscribe(v => console.log(v));
request$
.map(data => data.items.filter(repo => repo.name === 'rxjs')[0])
.map(repo => repo.score)
.subscribe(v => console.log(v));
Ну и что?
Иногда надо подогреть холодный Observable. Например, чтобы не делать 2 запроса:
const request$ = this.http.get
('https://api.github.com/search/repositories?q=rxjs');
request$
.map(data => data.items.map(repo => repo.name))
.subscribe(v => console.log(v));
request$
.map(data => data.items.filter(repo => repo.name === 'rxjs')[0])
.map(repo => repo.score)
.subscribe(v => console.log(v));
Объединение Observable и Observer. Передает значение всем подписчикам
const subject = new Rx.Subject();
setSubjectTick(subject);
subject.subscribe();
setTimeout(() => subject.subscribe(/* ... */), 1000);
setTimeout(() => subject.subscribe(/* ... */), 2000);
function setSubjectTick(subject) {
subject.next(0);
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Объединение Observable и Observer. Передает значение всем подписчикам
const subject = new Rx.Subject();
setSubjectTick(subject);
subject.subscribe();
setTimeout(() => subject.subscribe(/* ... */), 1000);
setTimeout(() => subject.subscribe(/* ... */), 2000);
function setSubjectTick(subject) {
subject.next(0);
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Объединение Observable и Observer . Передает значение всем подписчикам
const subject = new Rx.Subject();
setSubjectTick(subject);
subject.subscribe();
setTimeout(() => subject.subscribe(/* ... */), 1000);
setTimeout(() => subject.subscribe(/* ... */), 2000);
function setSubjectTick(subject) {
subject.next(0);
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Пример
const aalexeev = new Subject();
for (let i = 0; i < meetupListeners; i++) {
meetupListeners[i] = aalexeev.subscribe(/* ... */);
}
aalexeev.next('Я рассказываю этот доклад');
aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
Пример
const aalexeev = new Subject();
for (let i = 0; i < meetupListeners; i++) {
meetupListeners[i] = aalexeev.subscribe(/* ... */);
}
aalexeev.next('Я рассказываю этот доклад');
aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
Пример
const aalexeev = new Subject();
for (let i = 0; i < meetupListeners; i++) {
meetupListeners[i] = aalexeev.subscribe(/* ... */);
}
aalexeev.next('Я рассказываю этот доклад');
aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
Пример
aalexeev.next('Если кто-то опоздал и пришел только сейчас');
aalexeev.next('он не получит все прошлые значения');
const newListener = aalexeev.subscribe(/* ... */);
meetupListeners.push(newListener);
aalexeev.next('но будет получать все будущие');
Пример
const leaving = Math.floor(Math.random() * meetupListeners); // кто-то
meetupListeners[leaving].unsubscribe();
meetupListeners.splice(leaving, 1);
aalexeev.next('Кто только что ушел, не услышит самого главного');
Пример
aalexeev.error('Внимание! В одном из помещений обнаружено задымление. Просьба покинуть помещение!')
aalexeev.next('Как же так!'); // это никто не услышит
/* __________________ */
aalexeev.next('Спасибо всем, кто пришел');
aalexeev.complete();
aalexeev.next('Кстати, а вот еще...'); // это никто не услышит
Subject не завершает генерацию значений.
const subject = new Rx.Subject();
const subscribtion = subject.subscribe(/* ... */);
setSubjectTick(subject)
setTimeout(() => { subscribtion.unsubscribe(); }, 1000);
setTimeout(() => { subject.subscribe(/* ... */); }, 1500);
function setSubjectTick(subject) {
subject.next(0);
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Всегда хранит текущее значение. При инициализации требует начальное значение.
При подписке наблюдатель незамедлительно получит текущее значение.
const subject = new Rx.BehaviorSubject('abc');
const subscribtion = subject.subscribe(/* ... */);
setSubjectTick(subject);
setTimeout(() => {subscribtion.unsubscribe();}, 1000);
setTimeout(() => {subject.subscribe(/* ... */);}, 1200);
function setSubjectTick(subject) {
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Cохраняет буфер произошедших событий.
При подписке наблюдатель незамедлительно получит текущий буфер.
Нет начального значения, как в Behaviour.
const subject = new Rx.ReplaySubject(2);
setSubjectTick(subject);
subject.subscribe();
setTimeout(() => {
subject.subscribe(/* ... */);
}, 1200);
function setSubjectTick(subject) {
subject.next(1)
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Передает только последнее значение и только после завершения.
Похож на Promise.
const subject = new Rx.AsyncSubject();
setSubjectTick(subject);
subject.subscribe(/* ... */);
setTimeout(() => { subject.subscribe(/* ... */); }, 1000);
setTimeout(() => { subject.subscribe(/* ... */); }, 4000);
function setSubjectTick(subject) {
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.next(4), 2000);
setTimeout(() => subject.next(5), 2500);
setTimeout(() => subject.complete(), 3000);
}
Вернемся к задаче разделения значений между несколькими подписчиками.
С помощью Subject-ов это можно решить вот так:
const subject = new Rx.Subject();
Rx.Observable.interval(500)
.take(5)
.map(() => Math.random().toString().substr(2, 3))
.subscribe(subject);
subject.subscribe(/* ... */);
subject.subscribe(/* ... */);
setTimeout(() => {subject.subscribe(/* ... */);}, 800);
Оператор multicast позволяет не выделять промежуточный Subject, а создать сразу “правильный” Observable.
const connectable$ = Rx.Observable.interval(500)
.take(5)
.map(() => Math.random().toString().substr(2, 3))
.multicast(new Rx.Subject());
setTimeout(() => {connectable$.subscribe(/* ... */);}, 500);
setTimeout(() => {connectable$.subscribe(/* ... */);}, 1000);
setTimeout(() => {connectable$.subscribe(/* ... */);}, 1500);
connectable$.connect();
multicast(new Rx.Subject()) -> publish()
multicast(new Rx.BehaviorSubject(value)) -> publishBehavior(value)
multicast(new Rx.ReplaySubject(buffer)) -> publishReplay(buffer)
multicast(new Rx.AsyncSubject()) -> publishLast()
Запустить получивщийся Observable нужно вызовом connect.
const connectable$ = Rx.Observable.interval(500)
.take(5)
.map(() => Math.random().toString().substr(2, 3))
.publish();
setTimeout(() => {connectable$.subscribe(/* ... */);}, 500);
setTimeout(() => {connectable$.subscribe(/* ... */);}, 1000);
setTimeout(() => {connectable$.subscribe(/* ... */);}, 1500);
connectable$.connect();
Запустить получивщийся Observable нужно вызовом connect.
const connectable$ = Rx.Observable.interval(500)
.take(5)
.map(() => Math.random().toString().substr(2, 3))
.publish();
setTimeout(() => { connectable$.connect(); }, 800);
setTimeout(() => { connectable$.subscribe(/* ... */); }, 500);
setTimeout(() => { connectable$.subscribe(/* ... */); }, 1000);
setTimeout(() => { connectable$.subscribe(/* ... */); }, 1500);
Что не так с этим кодом?
const connectable$ = Rx.Observable.interval(500)
.take(5)
.map(() => Math.random().toString().substr(2, 3))
.publish();
setTimeout(() => { connectable$.connect(); }, 1000);
Используйте refCount для автоматического обновления подписки.
0 -> 1 — старт генерации значений
1 -> 0 — отмена
const stream$ = Rx.Observable.interval(TICK_MS)
.take(5)
.publish()
.refCount();
Используйте refCount для автоматического обновления подписки.
const stream$ = Rx.Observable.interval(TICK_MS)
.take(5)
.publish()
.refCount();
const subscription1 = stream$.subscribe();
let subscription2;
setTimeout(() => {subscription2 = stream$.subscribe(); }, 1100);
setTimeout(() => {subscription1.unsubscribe()}, 1100);
setTimeout(() => {subscription2.unsubscribe()}, 2100);
setTimeout(() => {stream$.subscribe()}, 2100);
publish().refCount() -> share()
const stream$ = Rx.Observable.interval(TICK_MS)
.take(5)
// .publish()
// .refCount();
.share();