Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have an user interaction scenario I'd like to handle with Rx.

The scenario is similar to the canonical "when user stops typing, do some work" (usually, search for what the user has typed so far) (1) - but I also need to :

  • (2) only get the latest of the results of "do some work" units (see below)
  • (3) when a new unit of work starts, cancel any work in progress (in my case it's CPU intensive)

For (1) I use an IObservable for the user events, throttled with .Throttle() to only trigger on pauses between events ("user stops typing").

From that, i .Select(_ => CreateMyTask(...).ToObservable()).

This gives me an IObservable<IObservable<T>> where each of the inner observables wraps a single task.

To get (2) I finally apply .Switch() to only get the results from the newest unit of work.

What about (3) - cancel pending tasks ?

If I understand correctly, whenever there's a new inner IObservable<T>, the .Switch() method subscribes to it and unsubscribes from the previous one(s), causing them to Dispose().
Maybe that can be somehow wired to trigger the task to cancel?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
376 views
Welcome To Ask or Share your Answers For Others

1 Answer

You can just use Observable.FromAsync which will generate tokens that are cancelled when the observer unsubcribes:

input.Throttle(...)
     .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token)))
     .Switch()
     .Subscribe(...);

This will generate a new token for each unit of work and cancel it every time Switch switches to the new one.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...