multicast

函数签名: multicast(selector: Function): Observable

使用提供 的 Subject 来共享源 observable

示例

示例 1: 使用标准的 Subject 进行 multicast

( StackBlitz | jsBin | jsFiddle )

// RxJS v6+
import { Subject, interval } from 'rxjs';
import { take, tap, multicast, mapTo } from 'rxjs/operators';

// 每2秒发出值并只取前5个
const source = interval(2000).pipe(take(5));

const example = source.pipe(
  // 因为我们在下面进行了多播,所以副作用只会调用一次
  tap(() => console.log('Side Effect #1')),
  mapTo('Result!')
);


// 使用 subject 订阅 source 需要调用 connect() 方法
const multi = example.pipe(multicast(() => new Subject()));
/*
  多个订阅者会共享 source 
  输出:
  "Side Effect #1"
  "Result!"
  "Result!"
  ...
*/
const subscriberOne = multi.subscribe(val => console.log(val));
const subscriberTwo = multi.subscribe(val => console.log(val));
// 使用 subject 订阅 source
multi.connect();
示例 2: 使用 ReplaySubject 进行 multicast

( StackBlitz | jsBin | jsFiddle )

// RxJS v6+
import { interval, ReplaySubject } from 'rxjs';
import { take, multicast, tap, mapTo } from 'rxjs/operators';

// 每2秒发出值并只取前5个
const source = interval(2000).pipe(take(5));

// 使用 ReplaySubject 的示例
const example = source.pipe(
  // 因为我们在下面进行了多播,所以副作用只会调用一次
  tap(_ => console.log('Side Effect #2')),
  mapTo('Result Two!')
);
// 可以使用任何类型的 subject
const multi = example.pipe(multicast(() => new ReplaySubject(5)));
// 使用 subject 订阅 source
multi.connect();

setTimeout(() => {
  /*
   因为使用的是 ReplaySubject,订阅者会接收到 subscription 中的之前所有值。
   */
  const subscriber = multi.subscribe(val => console.group(val));
}, 5000);

其他资源


:file_folder: 源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/multicast.ts

results matching ""

    No results matching ""