Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a filterable 'activity log' that's currently implemented using a ReplaySubject (since a few components use it and they might subscribe at different times).

When the user changes the filter settings, a new request is made, however the results are appended to the ReplaySubject rather than replacing it.

I was wondering if there is anyway to update the ReplaySubject to only send through the new items using something like a switchMap?

Otherwise, I might need to either use a BehaviorSubject that returns an array of all the activity entries or recreate the ReplaySubject and notify users (probably by using another observable) to unsubscribe and resubscribe to the new observable.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
931 views
Welcome To Ask or Share your Answers For Others

1 Answer

If you want to be able to reset a subject without having its subscribers explicitly unsubscribe and resubscribe, you could do something like this:

import { Observable, Subject } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";

function resettable<T>(factory: () => Subject<T>): {
  observable: Observable<T>,
  reset(): void,
  subject: Subject<T>
} {
  const resetter = new Subject<any>();
  const source = new Subject<T>();
  let destination = factory();
  let subscription = source.subscribe(destination);
  return {
    observable: resetter.asObservable().pipe(
      startWith(null),
      switchMap(() => destination)
    ),
    reset: () => {
      subscription.unsubscribe();
      destination = factory();
      subscription = source.subscribe(destination);
      resetter.next();
    },
    subject: source
  };
}

resettable will return an object containing:

  • an observable to which subscribers to the re-settable subject should subscribe;
  • a subject upon which you'd call next, error or complete; and
  • a reset function that will reset the (inner) subject.

You'd use it like this:

import { ReplaySubject } from "rxjs";
const { observable, reset, subject } = resettable(() => new ReplaySubject(3));
observable.subscribe(value => console.log(`a${value}`)); // a1, a2, a3, a4, a5, a6
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
observable.subscribe(value => console.log(`b${value}`)); // b2, b3, b4, b5, b6
reset();
observable.subscribe(value => console.log(`c${value}`)); // c5, c6
subject.next(5);
subject.next(6);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...