A1 = 1; B1 = 3;
C1 = A1 + B1;
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 5;
Push-стратегия
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 5;
Push-стратегия
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 5;
Pull-статегия
A1 = 1; B1 = 3;
C1 = A1 + B1;
C1 ~ 4;
A1 = 2;
C1 ~ 4;
Rx.Observable
.from([{x: 11, y: 99}, {x: 91, y: 151}, {x: 152, y: 106}, {x: 274, y: 84}])
.map((value) => { ... })
.filter((value) => { ... })
.reduce((acc, curr) => { ... })
.subscribe(result => {
console.log(result);
});
class Subject {
constructor() {
this.observers = [];
}
add(observer) {...}
remove(observer) {...}
notify(value) {
this.observers.forEach(observer => {
observer.update(value)
});
}
}
Субъект изменяет значения.
Также называется Observable, наблюдаемый объект.
Наблюдатель (Observer) реагирует на изменения.
class Iterator {
constructor(list) {
this.list = list;
this.cursor = 0;
}
next() {...}
hasNext() {...}
}
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (value) => renderNext(value),
error: (error) => renderError(error),
complete: () => renderComplete()
});
Observable – сам наблюдаемый объект, доставляет изменения в подписку.
const stream$ = Rx.Observable.create((observer) => {...});
const subscribtion = stream$.subscribe(...);
Observer
– объект-наблюдатель,
опциональные обработчики next/error/complete
const subscribtion = stream$.subscribe(
{
next: (value) => renderNext(value),
error: (error) => renderError(error),
complete: () => renderComplete()
}
);
stream$.subscribe((value) => renderNext(value));
stream$.subscribe(renderNext);
Функция создания определяет, как наблюдатель будет получать значения.
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
stream$.subscribe(renderNext);
Subscribtion
– подписка. Старт получения значений.
unsubscribe – завершение получения значений.
const stream$ = Rx.Observable.create((observer) => {
const timeout = setTimeout(() => observer.next(1), 1000);
return function unsubscribe() {
clearTimeout(timeout);
}
});
const stream$ = Rx.Observable.create((observer) => {...});
const subscribtion = stream$.subscribe(...);
subscribtion.unsubscribe();
Подписки не зависят друг от друга.
const stream$ = Rx.Observable.create((observer) => {...});
const subscribtionA = stream$.subscribe(observerA);
const subscribtionB = stream$.subscribe(observerBC);
const subscribtionC = stream$.subscribe(observerBC);
const stream$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 1000);
setTimeout(() => { observer.next(3);}, 2000);
setTimeout(() => { observer.complete();}, 3000);
});
const subscribtion = stream$.subscribe({
next: (value) => renderNext(value),
error: (error) => renderError(error),
complete: () => renderComplete()
});
Дальше — 127+ операторов. 127, Карл!
Rx.Observable.of('foo');
// ---foo---|
Rx.Observable.from(['foo', 'bar', 'baz']);
// ---foo---bar---baz---|
Rx.Observable.timer(100, 500);
// -0---1---2---3--...
const promise = new Promise((resolve) => { resolve('foo')});
Rx.Observable.fromPromise(promise);
// ----------------foo---|
const btnElement = document.getElementById('btn');
Rx.Observable.fromEvent(btnElement, 'click');
// --evt-evt------evt----evt--...
Rx.Observable.timer(0, 500)
.take(12)
.subscribe(renderNext);
Rx.Observable.timer(0, 500)
.take(12)
.filter(() => Math.random() < 0.3)
.subscribe(renderNext);
---2--30--22--5--60--1-----|
filter(x => x > 10)
------30--22-----60--------|
Rx.Observable.timer(0, 250)
.take(40)
.filter(() => Math.random() < 0.3)
.debounceTime(500)
.subscribe(renderNext);
const streamA$ = Rx.Observable.timer(0, 1500).take(4);
const streamB$ = Rx.Observable.timer(800, 1000).take(6)
.map((value) => 'abcdef'[value]);
Rx.Observable.merge(streamA$, streamB$)
.subscribe(renderNext);
const streamA$ = Rx.Observable.timer(0, 1500).take(4);
const streamB$ = Rx.Observable.timer(800, 1000).take(6)
.map((value) => 'abcdef'[value]);
Rx.Observable.combineLatest(streamA$, streamB$)
.map(([a, b]) => a + b)
.subscribe(renderNext);
Rx.Observable.timer(0, 1500)
.take(3)
.map((value) => 'abcd'[value])
.mergeMap(value => {
return Rx.Observable.timer(0, 450).take(6)
.map(innerValue => value + innerValue);
})
.subscribe(renderNext);
Rx.Observable.timer(0, 1500)
.take(3)
.map((value) => 'abcd'[value])
.switchMap(value => {
return Rx.Observable.timer(0, 450).take(6)
.map(innerValue => value + innerValue);
})
.subscribe(renderNext);
const getPromise = (value, time) => new Promise((resolve) => {
setTimeout(() => {
resolve(value);
}, time);
});
Rx.Observable.fromPromise(getPromise('⏰', 1500))
.subscribe(renderNext);
const streamA$ = Rx.Observable.create((observer) => {
observer.next(1);
setTimeout(() => { observer.next(2);}, 2000);
setTimeout(() => { observer.next(3);}, 3000);
setTimeout(() => { observer.complete()}, 4000);
})
const streamB$ = streamA$
.switchMap((value) => getPromise(value, 1500));
SwitchMap автоматически отписывает внутренний поток — вызывает subscription.unsubscribe()
Можем написать отменяемые запросы:
const request$ = Rx.Observable.create((observer) => {
const controller = new AbortController();
const signal = controller.signal;
fetch(url, {signal}).then(res => res.json())
.then(data => {
observer.next(data);
observer.complete();
});
return function unsubscribe() {
controller.abort();
};
});
Rx.Observable
.fromEvent(inputElement, 'input')
.map((event) => event.target.value)
.subscribe(logToOutput)
Rx.Observable
.fromEvent(inputElement, 'input')
.map((event) => event.target.value)
.debounceTime(500)
.subscribe(logToOutput)
Rx.Observable
.fromEvent(inputElement, 'input')
.map((event) => event.target.value)
.debounceTime(500)
.distinctUntilChanged()
.subscribe(logToOutput)
Rx.Observable
.fromEvent(inputElement, 'input')
.map((event) => event.target.value)
.debounceTime(500)
.distinctUntilChanged()
.switchMap(query => fetchData(query))
.subscribe(logToOutput)
Rx.Observable
.fromEvent(inputElement, 'input')
.map((event) => event.target.value)
.debounceTime(500)
.distinctUntilChanged()
.do(() => setLoading(true))
.switchMap(value => fetchData(value))
.do(() => setLoading(false))
.subscribe(logToOutput)
Rx.Observable
.fromEvent(inputElement, 'input')
.map((event) => event.target.value)
.debounceTime(500)
.distinctUntilChanged()
.do(() => setLoading(true))
.switchMap(value => fetchData(value))
.do(() => setLoading(false))
.subscribe(logToOutput)
const stream$ = Rx.Observable.interval(500)
.take(5)
.map(() => random());
stream$.subscribe(/* ... */);
stream$.subscribe(/* ... */);
setTimeout(() => {
stream$.subscribe(/* ... */);
}, 800);
Объединение Observable и Observer. Передает значение всем подписчикам
const subject = new Rx.Subject();
subject.next(0);
setTimeout(() => subject.next(1), 1000);
setTimeout(() => subject.next(2), 2000);
setTimeout(() => subject.next(3), 3000);
setTimeout(() => subject.next(4), 4000);
setTimeout(() => subject.complete(), 5000);
const subject = new Rx.Subject();
setSubjectTick(subject);
subject.subscribe();
setTimeout(() => subject.subscribe(/* ... */), 1500);
setTimeout(() => subject.subscribe(/* ... */), 2500);
const subscription = streamA$
.map(...)
.switchMap(...)
.subscribe(...);
...
onDestroy() {
subscription.unsubscribe();
}
const destroyStream = new Subject();
streamA$
.map(...)
.switchMap(...)
.takeUntil(destroyStream)
.subscribe()
...
onDestroy() {
destroyStream.next();
}
@Injectable()
export class StateService {
private authSubject = new BehaviorSubject<boolean>(false);
setAuthState(state: boolean) {
this.authSubject.next(state);
}
}
@Injectable()
export class StateService {
private authSubject = new BehaviorSubject<boolean>(false);
setAuthState(state: boolean) {...}
getStateChange(): Observable<AuthState> {
return this.authSubject.asObservable();
}
}
@Injectable()
export class StateService {
private authSubject = new BehaviorSubject<boolean>(false);
setAuthState(state: boolean) {...}
getStateChange(): Observable<AuthState> {...}
getCurrentState(): boolean {
return this.authSubject.getValue();
}
}
let cachingSubject;
getResource() {
if (!cachingSubject) {
cachingSubject = new ReplaySubject(1);
loadData().subscribe((result) => {
this.cachingSubject.next(result);
});
}
return this.cachingSubject.asObservable();
}