A1 = 1; B1 = 3;
C1 = A1 + B1;
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 5;
Push-стратегия
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 5;
Push-стратегия
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 5;
Pull-статегия
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 4;
from([{x: 11, y: 99}, {x: 91, y: 151}, {x: 152, y: 106}]).pipe(
map((value) => { ... }),
filter((value) => { ... }),
reduce((acc, curr) => { ... })
).subscribe(result => {
console.log(result);
});
class Subject {
constructor() {
this.observers = [];
}
add(observer) {...}
remove(observer) {...}
notify(value) {
this.observers.forEach(observer => {
observer.update(value)
});
}
}
Субъект изменяет значения.
Также называется Observable, наблюдаемый объект.
Наблюдатель (Observer) реагирует на изменения.
class Iterator {
next() {...}
hasNext() {...}
}
const stream$ = Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscription = stream$.subscribe({
next: (value) => renderNext(value),
error: (error) => renderError(error),
complete: () => renderComplete()
});
Observable – наблюдаемый объект, доставляет изменения в подписку.
const stream$ = Observable.create((observer) => {...});
const subscription = stream$.subscribe(...);
Observer
– объект-наблюдатель,
опциональные обработчики next/error/complete
const subscription = stream$.subscribe(
{
next: (value) => renderNext(value),
error: (error) => renderError(error),
complete: () => renderComplete()
}
);
stream$.subscribe((value) => renderNext(value));
Функция создания
определяет,
как наблюдатель
будет получать значения.
const stream$ = Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
stream$.subscribe(renderNext);
const stream$ = Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const promise = new Promise((resolve, reject) => {
resolve(1);
});
Subscribtion
– подписка.
Старт получения значений.
const stream$ = Observable.create((observer) => {...});
const subscription = stream$.subscribe(...);
subscription.unsubscribe();
unsubscribe – завершение получения значений.
const stream$ = Observable.create((observer) => {
const interval = setInterval(() => observer.next(1), 1000);
return function unsubscribe() {
clearInterval(interval);
}
});
const subscription = stream$.subscribe(...);
subscription.unsubscribe();
Нет подписки — нет потока
const stream$ = Observable.create((observer) => {...});
const subscriptionA = stream$.subscribe(observerA);
Каждый observer создаст свой поток значений
Подписки не зависят друг от друга
const stream$ = Observable.create((observer) => {...});
const subscriptionA = stream$.subscribe(observerA);
const subscriptionB = stream$.subscribe(observerBC);
const subscriptionC = stream$.subscribe(observerBC);
const stream$ = Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscription = stream$.subscribe({
next: (value) => renderNext(value),
error: (error) => renderError(error),
complete: () => renderComplete()
});
import {of} from 'rxjs';
of('foo');
// ---foo-|
import {from} from 'rxjs';
from(['foo', 'bar', 'baz']);
// ---foo---bar---baz-|
import {timer} from 'rxjs';
timer(100, 500);
// -0---1---2---3--...
import {from} from 'rxjs';
const promise = new Promise((resolve) => { resolve('foo')});
from(promise);
// ----------------foo-|
import {fromEvent} from 'rxjs';
const btnElement = document.getElementById('btn');
fromEvent(btnElement, 'click');
// --evt-evt------evt----evt--...
import {take, filter} from 'rxjs/operators';
timer(0, 500).pipe(
take(12)
).subscribe(renderNext);
timer(0, 500).pipe(
take(12),
filter(() => Math.random() < 0.3)
).subscribe(renderNext);
.pipe(
).subscribe(...)
---2--30--22--5--60--1-----|
filter(x => x > 10)
------30--22-----60--------|
timer(0, 250).pipe(
take(40),
filter(() => Math.random() < 0.3),
debounceTime(500)
).subscribe(renderNext);
const streamA$ = timer(0, 1500).pipe(take(4));
const streamB$ = timer(800, 1000).pipe(
take(6),
map((i) => 'abcdef'[i])
);
merge(streamA$, streamB$)
.subscribe(renderNext);
const streamA$ = timer(0, 1500).pipe(take(4));
const streamB$ = timer(800, 1000).pipe(
take(6),
map((i) => 'abcdef'[i])
);
combineLatest(streamA$, streamB$).pipe(
map(([a, b]) => a + b)
).subscribe(renderNext);
timer(0, 1500).pipe(
take(3),
map((i) => 'abcd'[i]),
mergeMap(letter => {
return timer(0, 625).pipe(
take(6),
map(digit => letter + digit));
})
).subscribe(renderNext);
timer(0, 1500).pipe(
take(3),
map((i) => 'abcd'[i]),
switchMap(letter => {
return timer(0, 625).pipe(
take(6),
map(digit => letter + digit));
})
).subscribe(renderNext);
mergeMap
switchMap
const getPromise = (value) => new Promise((resolve) => {
setTimeout(() => {
resolve(value);
}, 1500);
});
from(getPromise('⏰'))
.subscribe(renderNext);
const streamA$ = Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 2000);
setTimeout(() => { observer.next(3);}, 3000);
setTimeout(() => { observer.complete()}, 4000);
});
const streamB$ = streamA$.pipe(
switchMap((value) => getPromise(value))
).subscribe(renderNext);
SwitchMap автоматически отписывает внутренний поток — вызывает subscription.unsubscribe()
Можем написать отменяемые запросы:
const request$ = Observable.create((observer) => {
const controller = new AbortController();
const signal = controller.signal;
fetch(url, {signal}).then(res => res.json())
.then(data => {
observer.next(data);
observer.complete();
});
return function unsubscribe() {
controller.abort();
};
});
fromEvent(inputElement, 'input')
.pipe(
map(event => event.target.value)
)
.subscribe(logToOutput)
fromEvent(inputElement, 'input')
.pipe(
map(event => event.target.value),
debounceTime(500)
)
.subscribe(logToOutput)
fromEvent(inputElement, 'input')
.pipe(
map(event => event.target.value),
debounceTime(500),
distinctUntilChanged()
)
.subscribe(logToOutput)
fromEvent(inputElement, 'input')
.pipe(
map(event => event.target.value),
debounceTime(500),
distinctUntilChanged(),
switchMap(query => fetchData(query))
)
.subscribe(logToOutput)
fromEvent(inputElement, 'input')
.pipe(
map(event => event.target.value),
debounceTime(500),
distinctUntilChanged(),
tap(() => setLoading(true)),
switchMap(query => fetchData(query)),
tap(() => setLoading(false))
)
.subscribe(logToOutput)
fromEvent(inputElement, 'input')
.pipe(
map(event => event.target.value),
debounceTime(500),
distinctUntilChanged(),
tap(() => setLoading(true)),
switchMap(query => fetchData(query)),
tap(() => setLoading(false))
)
.subscribe(logToOutput)
Работаем с потоками,
а не с промежуточными состояниями
const stream$ = timer(0, 500).pipe(
take(5),
map(() => random()));
stream$.subscribe(/* ... */);
stream$.subscribe(/* ... */);
setTimeout(() => {
stream$.subscribe(/* ... */);
}, 800);
Объединение Observable и Observer.
Передает значение всем подписчикам
const subject = new Subject();
subject.next(random());
setTimeout(() => subject.next(random()), 1000);
setTimeout(() => subject.next(random()), 2000);
setTimeout(() => subject.next(random()), 3000);
setTimeout(() => subject.next(random()), 4000);
setTimeout(() => subject.complete(), 5000);
const subject = new Subject();
setSubjectTick(subject);
subject.subscribe();
setTimeout(() => subject.subscribe(/* ... */), 1500);
setTimeout(() => subject.subscribe(/* ... */), 2500);
const subscription = streamA$.pipe(
map(...),
switchMap(...)
).subscribe(...);
...
onDestroy() {
subscription.unsubscribe();
subscription2.unsubscribe();
subscription3.unsubscribe();
subscription4.unsubscribe();
...
}
const destroy$ = new Subject();
streamA$.pipe(
map(...),
switchMap(...),
takeUntil(destroy$)
).subscribe()
...
onDestroy() {
destroy$.next();
}
class StateService {
private authSubject = new BehaviorSubject<boolean>(false);
setAuthState(state: boolean) {
this.authSubject.next(state);
}
}
class StateService {
private authSubject = new BehaviorSubject<boolean>(false);
setAuthState(state: boolean) {...}
getStateChange(): Observable<boolean> {
return this.authSubject.asObservable();
}
}
class StateService {
private authSubject = new BehaviorSubject<boolean>(false);
setAuthState(state: boolean) {...}
getStateChange(): Observable<boolean> {...}
getCurrentState(): boolean {
return this.authSubject.getValue();
}
}