RxJS Subjects

RxJS

RxJS Subjects

Об авторе

Андрей Алексеев

Разрабатываю в tinkoff.ru/business

Преподаю в Tinkoff Fintech School

Не пишу в твиттер aalexeev_ru

Observable

Observable – наблюдаемая сущность, доставляет изменения подписчикам.

				const stream$ = Rx.Observable.create((observer) => {
				    observer.next(1);
				    setTimeout(() => { observer.next(2);},    1000);
				    setTimeout(() => { observer.next(3);},    2000);
				    setTimeout(() => { observer.complete();}, 3000);
				});
				 
				const subscribtion = stream$.subscribe({
				    next: (v) => logger.textContent += ' ' + v,
				    complete: () => logger.textContent += ' complete!'
				});
		

Observable

Observable – наблюдаемая сущность, доставляет изменения подписчикам.

				const stream$ = Rx.Observable.create((observer) => {
				    observer.next(1);
				    setTimeout(() => { observer.next(2);},    1000);
				    setTimeout(() => { observer.next(3);},    2000);
				    setTimeout(() => { observer.complete();}, 3000);
				});
				 
				const subscribtion = stream$.subscribe({
				    next: (v) => logger.textContent += ' ' + v,
				    complete: () => logger.textContent += ' complete!'
				});
		

Observable

Функция создания определяет, наблюдатели (подписчики) будет получать значения.

			const stream$ = Rx.Observable.create((observer) => {
			    observer.next(1);
			    setTimeout(() => { observer.next(2);},    1000);
			    setTimeout(() => { observer.next(3);},    2000);
			    setTimeout(() => { observer.complete();}, 3000);
				});
				 
				const subscribtion = stream$.subscribe({
			    next: (v) => logger.textContent += ' ' + v,
			    complete: () => logger.textContent += ' complete!'
				});
		

Observable

Observer – сам наблюдатель, состоящий из функций-обработчиков next/error/complete

Их можно перечислить через запятую или задать как объект.

			const stream$ = Rx.Observable.create((observer) => {
				    observer.next(1);
				    setTimeout(() => { observer.next(2);},    1000);
				    setTimeout(() => { observer.next(3);},    2000);
				    setTimeout(() => { observer.complete();}, 3000);
				});
				 
				const subscribtion = stream$.subscribe({
			    next: (v) => logger.textContent += ' ' + v,
			    complete: () => logger.textContent += ' complete!'
				});
		

Observable

Subscribtion – подписка. Обычно используется для вызова unsubscribe .

				const stream$ = Rx.Observable.create((observer) => {
				    observer.next(1);
				    setTimeout(() => { observer.next(2);},    1000);
				    setTimeout(() => { observer.next(3);},    2000);
				    setTimeout(() => { observer.complete();}, 3000);
				});
				 
			const subscribtion = stream$.subscribe({
				    next: (v) => logger.textContent += ' ' + v,
				    complete: () => logger.textContent += ' complete!'
				});
			subscribtion.unsubscribe();
		

Observable

На каждую подписку создается отдельный поток значений

				const stream$ = Rx.Observable.interval(500)
				    .take(5)
				    .map(() => Math.random().toString().substr(2, 3)); // 0.12345 => 123
				 
				stream$.subscribe(/* ... */);
				stream$.subscribe(/* ... */);
				setTimeout(() => {
				    stream$.subscribe(/* ... */);
				}, 800);
		

Observable

Однако есть Observable, распределяюшие события по всем подписчикам

				const buttonElement = document.getElementById('click-button');
				const START = new Date();
				const stream$ = Rx.Observable.fromEvent(buttonElement, 'click')
				    .map(() => ((new Date() - START) / 1000).toFixed(2));
				 
				stream$.subscribe(/* ... */);
				setTimeout(() => {stream$.subscribe(/* ... */);}, 1000);
				setTimeout(() => {stream$.subscribe(/* ... */);}, 2000);
		

Observable

Эти 2 типа Observable называются холодными и горячими.


Холодные создают независимые потоки под каждую подписку.


Горячие разделяют поток друг с другом

Observable

Ну и что?

Иногда надо подогреть холодный Observable. Например, чтобы не делать 2 запроса:


				const request$ = this.http.get('https://api.github.com/search/repositories?q=rxjs');
				 
				request$
				 .map(data => data.items.map(repo => repo.name))
				 .subscribe(v => console.log(v));
				 
				request$
				 .map(data => data.items.filter(repo => repo.name === 'rxjs')[0])
				 .map(repo => repo.score)
				 .subscribe(v => console.log(v));

Observable

Ну и что?

Иногда надо подогреть холодный Observable. Например, чтобы не делать 2 запроса:


				const request$ = this.http.get('https://api.github.com/search/repositories?q=rxjs');
				 
				request$
				 .map(data => data.items.map(repo => repo.name))
				 .subscribe(v => console.log(v));
				 
				request$
				 .map(data => data.items.filter(repo => repo.name === 'rxjs')[0])
				 .map(repo => repo.score)
				 .subscribe(v => console.log(v));
		
2 requests

SUBJECT

Subject

Объединение Observable и Observer. Передает значение всем подписчикам

				const subject = new Rx.Subject();
				setSubjectTick(subject);
				 
				subject.subscribe();
				setTimeout(() => subject.subscribe(/* ... */), 1000);
				setTimeout(() => subject.subscribe(/* ... */), 2000);
		
					function setSubjectTick(subject) {
					    subject.next(0);
					    setTimeout(() => subject.next(1), 500);
					    setTimeout(() => subject.next(2), 1000);
					    setTimeout(() => subject.next(3), 1500);
					    setTimeout(() => subject.next(4), 2000);
					    setTimeout(() => subject.next(5), 2500);
					    setTimeout(() => subject.complete(), 3000);
					}

			

Subject

Объединение Observable и Observer. Передает значение всем подписчикам

				const subject = new Rx.Subject();
				setSubjectTick(subject);
				 
			subject.subscribe();
			setTimeout(() => subject.subscribe(/* ... */), 1000);
			setTimeout(() => subject.subscribe(/* ... */), 2000);
		
					function setSubjectTick(subject) {
					    subject.next(0);
					    setTimeout(() => subject.next(1), 500);
					    setTimeout(() => subject.next(2), 1000);
					    setTimeout(() => subject.next(3), 1500);
					    setTimeout(() => subject.next(4), 2000);
					    setTimeout(() => subject.next(5), 2500);
					    setTimeout(() => subject.complete(), 3000);
					}

			

Subject

Объединение Observable и Observer . Передает значение всем подписчикам

				const subject = new Rx.Subject();
				setSubjectTick(subject);
				 
			subject.subscribe();
			setTimeout(() => subject.subscribe(/* ... */), 1000);
			setTimeout(() => subject.subscribe(/* ... */), 2000);
		
					function setSubjectTick(subject) {
				    subject.next(0);
				    setTimeout(() => subject.next(1), 500);
				    setTimeout(() => subject.next(2), 1000);
				    setTimeout(() => subject.next(3), 1500);
				    setTimeout(() => subject.next(4), 2000);
				    setTimeout(() => subject.next(5), 2500);
				    setTimeout(() => subject.complete(), 3000);
					}

			

Subject

Пример

				const aalexeev = new Subject();
				 
				for (let i = 0; i < meetupListeners; i++) {
				   meetupListeners[i] = aalexeev.subscribe(/* ... */);
				}
				 
				aalexeev.next('Я рассказываю этот доклад');
				aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
		

Subject

Пример

				const aalexeev = new Subject();
				 
			for (let i = 0; i < meetupListeners; i++) {
			   meetupListeners[i] = aalexeev.subscribe(/* ... */);
			}
				 
				aalexeev.next('Я рассказываю этот доклад');
				aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
		

Subject

Пример

				const aalexeev = new Subject();
				 
				for (let i = 0; i < meetupListeners; i++) {
				   meetupListeners[i] = aalexeev.subscribe(/* ... */);
				}
				 
			aalexeev.next('Я рассказываю этот доклад');
			aalexeev.next('Произношу какую-то мысль — передаю значение через .next');
		

Subject

Пример

				aalexeev.next('Если кто-то опоздал и пришел только сейчас');
				aalexeev.next('он не получит все прошлые значения');
				 
			const newListener = aalexeev.subscribe(/* ... */);
				meetupListeners.push(newListener);
				 
				aalexeev.next('но будет получать все будущие');
		

Subject

Пример

				const leaving = Math.floor(Math.random() * meetupListeners); // кто-то
				meetupListeners[leaving].unsubscribe();
				meetupListeners.splice(leaving, 1);
				 
				aalexeev.next('Кто только что ушел, не услышит самого главного');

		

Subject

Пример

				aalexeev.error('Внимание! В одном из помещений обнаружено задымление. Просьба покинуть помещение!')
				 
				aalexeev.next('Как же так!'); // это никто не услышит
				/* __________________ */
				aalexeev.next('Спасибо всем, кто пришел');
				aalexeev.complete();
				 
				aalexeev.next('Кстати, а вот еще...'); // это никто не услышит
		

Subject

Subject не завершает генерацию значений.

				const subject = new Rx.Subject();
				const subscribtion = subject.subscribe(/* ... */);
				 
				setSubjectTick(subject)
				setTimeout(() => { subscribtion.unsubscribe(); }, 1000);
				setTimeout(() => { subject.subscribe(/* ... */); }, 1500);
		
					function setSubjectTick(subject) {
					    subject.next(0);
					    setTimeout(() => subject.next(1), 500);
					    setTimeout(() => subject.next(2), 1000);
					    setTimeout(() => subject.next(3), 1500);
					    setTimeout(() => subject.next(4), 2000);
					    setTimeout(() => subject.next(5), 2500);
					    setTimeout(() => subject.complete(), 3000);
					}

			

РАЗНОВИДНОСТИ SUBJECT

Behaviour Subject

Всегда хранит текущее значение. При инициализации требует начальное значение.

При подписке наблюдатель незамедлительно получит текущее значение.

				const subject = new Rx.BehaviorSubject('abc');
				const subscribtion = subject.subscribe(/* ... */);
				setSubjectTick(subject);
				 
				setTimeout(() => {subscribtion.unsubscribe();}, 1000);
				setTimeout(() => {subject.subscribe(/* ... */);}, 1200);
		
					function setSubjectTick(subject) {
					    setTimeout(() => subject.next(1), 500);
					    setTimeout(() => subject.next(2), 1000);
					    setTimeout(() => subject.next(3), 1500);
					    setTimeout(() => subject.next(4), 2000);
					    setTimeout(() => subject.next(5), 2500);
					    setTimeout(() => subject.complete(), 3000);
					}
			

Replay Subject

Cохраняет буфер произошедших событий.

При подписке наблюдатель незамедлительно получит текущий буфер.

Нет начального значения, как в Behaviour.

				const subject = new Rx.ReplaySubject(2);
				setSubjectTick(subject);
				 
				subject.subscribe();
				 
				setTimeout(() => {
				   subject.subscribe(/* ... */);
				}, 1200);

		
					function setSubjectTick(subject) {
					    subject.next(1)
					    setTimeout(() => subject.next(1), 500);
					    setTimeout(() => subject.next(2), 1000);
					    setTimeout(() => subject.next(3), 1500);
					    setTimeout(() => subject.next(4), 2000);
					    setTimeout(() => subject.next(5), 2500);
					    setTimeout(() => subject.complete(), 3000);
					}
			

Async Subject

Передает только последнее значение и только после завершения.

Похож на Promise.

				const subject = new Rx.AsyncSubject();
				 
				setSubjectTick(subject);
				 
				subject.subscribe(/* ... */);
				setTimeout(() => { subject.subscribe(/* ... */); }, 1000);
				setTimeout(() => { subject.subscribe(/* ... */); }, 4000);

		
					function setSubjectTick(subject) {
					    setTimeout(() => subject.next(1), 500);
					    setTimeout(() => subject.next(2), 1000);
					    setTimeout(() => subject.next(3), 1500);
					    setTimeout(() => subject.next(4), 2000);
					    setTimeout(() => subject.next(5), 2500);
					    setTimeout(() => subject.complete(), 3000);
					}
			

Итого

  1. Subject

Subject

Вернемся к задаче разделения значений между несколькими подписчиками.

С помощью Subject-ов это можно решить вот так:

			const subject = new Rx.Subject();
				Rx.Observable.interval(500)
				   .take(5)
				   .map(() => Math.random().toString().substr(2, 3))
			   .subscribe(subject);
				 
			subject.subscribe(/* ... */);
			subject.subscribe(/* ... */);
			setTimeout(() => {subject.subscribe(/* ... */);}, 800);
		

Multicasting

Оператор multicast позволяет не выделять промежуточный Subject, а создать сразу “правильный” Observable.

				const connectable$ = Rx.Observable.interval(500)
				   .take(5)
				   .map(() => Math.random().toString().substr(2, 3))
				   .multicast(new Rx.Subject());
				 
				setTimeout(() => {connectable$.subscribe(/* ... */);}, 500);
				setTimeout(() => {connectable$.subscribe(/* ... */);}, 1000);
				setTimeout(() => {connectable$.subscribe(/* ... */);}, 1500);
				connectable$.connect();

		

Multicasting

multicast(new Rx.Subject()) -> publish()

multicast(new Rx.BehaviorSubject(value)) -> publishBehavior(value)

multicast(new Rx.ReplaySubject(buffer)) -> publishReplay(buffer)

multicast(new Rx.AsyncSubject()) -> publishLast()

Multicasting

Запустить получивщийся Observable нужно вызовом connect.

				const connectable$ = Rx.Observable.interval(500)
				   .take(5)
				   .map(() => Math.random().toString().substr(2, 3))
				   .publish();
				 
				setTimeout(() => {connectable$.subscribe(/* ... */);}, 500);
				setTimeout(() => {connectable$.subscribe(/* ... */);}, 1000);
				setTimeout(() => {connectable$.subscribe(/* ... */);}, 1500);
				connectable$.connect();

		

Multicasting

Запустить получивщийся Observable нужно вызовом connect.

				const connectable$ = Rx.Observable.interval(500)
				   .take(5)
				   .map(() => Math.random().toString().substr(2, 3))
				   .publish();
				 
				setTimeout(() => { connectable$.connect(); }, 800);
				 
				setTimeout(() => { connectable$.subscribe(/* ... */); }, 500);
				setTimeout(() => { connectable$.subscribe(/* ... */); }, 1000);
				setTimeout(() => { connectable$.subscribe(/* ... */); }, 1500);
		

Multicasting

Что не так с этим кодом?

				const connectable$ = Rx.Observable.interval(500)
				   .take(5)
				   .map(() => Math.random().toString().substr(2, 3))
				   .publish();
				 
				setTimeout(() => { connectable$.connect(); }, 1000);
		

Multicasting

Используйте refCount для автоматического обновления подписки.

0 -> 1 — старт генерации значений

1 -> 0 — отмена

				const stream$ = Rx.Observable.interval(TICK_MS)
				   .take(5)
				   .publish()
				   .refCount();
		

Multicasting

Используйте refCount для автоматического обновления подписки.

				const stream$ = Rx.Observable.interval(TICK_MS)
				   .take(5)
				   .publish()
				   .refCount();
				const subscription1 = stream$.subscribe();
				let subscription2;
				 
				setTimeout(() => {subscription2 = stream$.subscribe(); }, 1100);
				setTimeout(() => {subscription1.unsubscribe()}, 1100);
				setTimeout(() => {subscription2.unsubscribe()}, 2100);
				setTimeout(() => {stream$.subscribe()}, 2100);

		

Multicasting

publish().refCount() -> share()

				const stream$ = Rx.Observable.interval(TICK_MS)
				   .take(5)
				   // .publish()
				   // .refCount();
				   .share();
		

Примеры

Примеры

rxjs-subjects-example

  1. unsubscribe

Спасибо!

github.com/aalexeev239/rxjs-subjects

goo.gl/3J55dE

https://github.com/aalexeev239/rxjs-subjects