Higher Order Observable
Observable высшего порядка
Observable, значениями которого являются другие Observable
const mapping = value => [value, value ** 2, value ** 3];
const mapped = [1, 2, 3].map(item => mapping(item));
[[1,1,1],[2,4,8],[3,9,27]]
1, 1, 1, 2, 4, 8, 3, 9, 27
1, 2, 3, 1, 4, 9, 1, 8, 27
of(1, 2, 3)
.pipe(
map(value => from(mapping(value)))
)
.subscribe(value => console.log(value))
<Observable>
<Observable>
<Observable>
of(1, 2, 3)
.pipe(
map(value => from(mapping(value)))
)
.subscribe(observable$ => {
observable$
.subscribe(value => console.log(value))
});
const mapping = value => [value, value ** 2, value ** 3];
const asyncMapping = value => {
return interval(100)
.pipe(
map(i => value ** (i + 1)),
take(3)
);
asyncMapping(2)
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
map(value => asyncMapping(value))
)
.subscribe(observable$ => {
observable$
.subscribe(value => console.log(value))
});
const subscription = of(1, 2, 3)
.pipe(
map(value => asyncMapping(value))
)
.subscribe(observable$ => {
observable$
.subscribe(value => console.log(value))
});
setTimeout(() => {
subscription.unsubscribe();
console.log('unsubscribe!');
}, 150);
const subscription = of(1, 2, 3)
.pipe(
map(value => asyncMapping(value)),
mergeAll()
)
.subscribe(value => console.log(value))
setTimeout(() => {
subscription.unsubscribe();
console.log('unsubscribe!');
}, 150);
const subscription = of(1, 2, 3)
.pipe(
mergeMap(value => asyncMapping(value))
)
.subscribe(value => console.log(value))
setTimeout(() => {
subscription.unsubscribe();
console.log('unsubscribe!');
}, 150);
timer(0, 1500).pipe(
take(3),
map((value) => 'abc'[value]), // a → b → c
mergeMap(value => {
return timer(0, 450).pipe(
take(6),
map(innerValue => value + innerValue)); // x0 → x1 → x2 → ...
})
).subscribe(renderValue);
of(1, 2, 3)
.pipe(
mergeMap(
value => asyncMapping(value),
(outValue, inValue, outIndex, inIndex) => {
console.log(`${outValue} → ${inValue}` +
` | ${outIndex} --- ${inIndex}`);
return inValue;
}
)
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
mergeMap(value => asyncMapping(value))
toArray()
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
mergeMap(value => asyncMapping(value))
toArray()
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
mergeMap(value => asyncMapping(value), 2)
toArray()
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
mergeMap(value => asyncMapping(value), 1)
toArray()
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
concatMap(value => asyncMapping(value))
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
switchMap(value => asyncMapping(value))
.subscribe(value => console.log(value))
timer(0, 1500).pipe(
take(3),
map((value) => 'abc'[value]), // a → b → c
switchMap(value => {
return timer(0, 450).pipe(
take(6),
map(innerValue => value + innerValue)); // x0 → x1 → x2 → ...
})
).subscribe(renderValue);
SwitchMap автоматически отписывается во внутренних Observable — вызывает subscription.unsubscribe()
Можем написать отменяемые запросы:
const request$ = Observable.create((observer) => {
const controller = new AbortController();
const signal = controller.signal;
fetch(url, {signal}).then(res => res.json())
.then(data => {
observer.next(data);
observer.complete();
});
return function unsubscribe() {
controller.abort();
};
});
of(1, 2, 3)
.pipe(
exhaustMap(value => asyncMapping(value))
.subscribe(value => console.log(value))
const request = value => {
const randomTimeout = 100 + Math.floor(Math.random() * 1000);
const randomResponse = `${value}_${randomTimeout}`;
return of(randomResponse).pipe(delay(randomTimeout));
};
request(1)
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
mergeMap(value => request(value))
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
concatMap(value => request(value))
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
switchMap(value => request(value))
.subscribe(value => console.log(value))
of(1, 2, 3)
.pipe(
exhaustMap(value => request(value))
.subscribe(value => console.log(value))
of('цепочка')
.pipe(
switchMap(value => request(value)),
switchMap(anotherValue => request(anotherValue))
)
.subscribe(value => console.log(value));
const requestWithError = value => {
if (value % 2 === 0) {
return throwError('💩');
}
return request(value); // <= 1100 ms
};
const source$ = interval(1200).pipe(
take(3),
map(v => v + 1)
);
source$
.pipe(
switchMap(value => requestWithError(value))
)
.subscribe(
value => console.log(value),
error => console.log(error)
);
source$
.pipe(
switchMap(value => requestWithError(value)),
catchError(error => {
console.log(`catch ${error}`);
return EMPTY;
})
)
.subscribe(
value => console.log(value),
error => console.log(error)
);
source$
.pipe(
switchMap(value => {
return requestWithError(value)
.pipe(catchError(error => {
console.log(`catch ${error}`);
return EMPTY;
}));
})
)
.subscribe(
value => console.log(value),
error => console.log(error)
);
const destroy$ = new Subject();
of(1)
.pipe(
takeUntil(destroy$),
switchMap(value => request(value))
)
.subscribe({
next: value => console.log(value),
complete: () => console.log('complete')
});
console.log('destroy');
destroy$.next();
const destroy$ = new Subject();
of(1)
.pipe(
switchMap(value => request(value)),
takeUntil(destroy$)
)
.subscribe({
next: value => console.log(value),
complete: () => console.log('complete')
});
console.log('destroy');
destroy$.next();
let flag = true;
interval(1200).pipe(take(3),map(v => v + 1));
.pipe(
switchMap(value => {
console.log(`value: ${value}, flag: ${flag}`);
return flag ? request(value) : EMPTY
})
)
.subscribe(value => console.log(value));
setTimeout(() => { flag = false; }, 1200);
setTimeout(() => { flag = true; }, 2400);
Механизм противодавления (backpressure)
const sourceInterval$ = interval(250).pipe(take(20));
const windowInterval$ = interval(1000);
sourceInterval$
.pipe(
window(windowInterval$)
map(win => win.pipe(take(2)))
mergeAll()
)
.subscribe(value => console.log(value));
const people = [
{name: 'Sue', age: 25},
{name: 'Joe', age: 30},
{name: 'Sarah', age: 35},
{name: 'Frank', age: 25},
];
interval(1000).pipe(take(4), map(x => people(x));
.pipe(
groupBy(person => person.age)
mergeMap(group => group
.pipe(map(({name}) => name), toArray()))
)
.subscribe(value => console.log(value));