multicast
函数签名: multicast(selector: Function): Observable
使用提供 的 Subject 来共享源 observable
示例
示例 1: 使用标准的 Subject 进行 multicast
( StackBlitz | jsBin | jsFiddle )
// RxJS v6+
import { Subject, interval } from 'rxjs';
import { take, tap, multicast, mapTo } from 'rxjs/operators';
// 每2秒发出值并只取前5个
const source = interval(2000).pipe(take(5));
const example = source.pipe(
// 因为我们在下面进行了多播,所以副作用只会调用一次
tap(() => console.log('Side Effect #1')),
mapTo('Result!')
);
// 使用 subject 订阅 source 需要调用 connect() 方法
const multi = example.pipe(multicast(() => new Subject()));
/*
多个订阅者会共享 source
输出:
"Side Effect #1"
"Result!"
"Result!"
...
*/
const subscriberOne = multi.subscribe(val => console.log(val));
const subscriberTwo = multi.subscribe(val => console.log(val));
// 使用 subject 订阅 source
multi.connect();
示例 2: 使用 ReplaySubject 进行 multicast
( StackBlitz | jsBin | jsFiddle )
// RxJS v6+
import { interval, ReplaySubject } from 'rxjs';
import { take, multicast, tap, mapTo } from 'rxjs/operators';
// 每2秒发出值并只取前5个
const source = interval(2000).pipe(take(5));
// 使用 ReplaySubject 的示例
const example = source.pipe(
// 因为我们在下面进行了多播,所以副作用只会调用一次
tap(_ => console.log('Side Effect #2')),
mapTo('Result Two!')
);
// 可以使用任何类型的 subject
const multi = example.pipe(multicast(() => new ReplaySubject(5)));
// 使用 subject 订阅 source
multi.connect();
setTimeout(() => {
/*
因为使用的是 ReplaySubject,订阅者会接收到 subscription 中的之前所有值。
*/
const subscriber = multi.subscribe(val => console.group(val));
}, 5000);
其他资源
- multicast - 官方文档
源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/multicast.ts