Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I've got a WPF app using ReactiveUI, and it works by periodically fetching state from an external process.

Given a fetch observable as follows:

var latestState =
    Observable.Interval(TimeSpan.FromSeconds(.5))
    .SelectMany(async _ =>
    {
        try
        {
            var state = await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
            return state;
        }
        catch (Exception)
        {
            return null;
        }
    })
    .Publish();

I need to be able interrupt the fetching of data, if it fails.

What I want to be able to do, is something like:

var latestState =
    Observable.Interval(TimeSpan.FromSeconds(.5))
    .SelectMany(async _ =>
    {
        try
        {
            var state = await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
            return state;
        }
        catch (Exception)
        {
            // Show and await the dialog dismissal
            // instructions for starting the external process provided etc etc
            await dialogs.ShowErrorMessageAsync("Failed to fetch info", "Failed to get the latest state");

            /* MISSING: 
             * Some magical gubbins that will produce the state on a steady interval, but also still support
             * displaying the dialog and halting
             */
            return null;
        }
    })
    .Publish();

Obviously that's not feasible, because you end up with a chicken and egg problem.

Every way I've tried to slice this (e.g. using a Subject<bool> to track success / failure) has ultimately resulted in the fact that the failure case still needs to be able to emit an observable that fetches on the interval, and respects the failure handling - but that's not possible from inside the handler.

I'm almost certain this is an issue with conceptualising the way to signal the error / retrieve the data / resume the interval.


Partial solution / implementation based on comment feedback:

var stateTimer = Observable.Interval(TimeSpan.FromSeconds(10));

var stateFetcher =
    Observable.FromAsync(async () => 
        await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

IObservable<GetRobotsStateReply> DisplayStateError(Exception causingException)
    => Observable.FromAsync(async () =>
    {
        await dialogs.ShowErrorMessageAsync(
            "Failed to get robot info",
            "Something went wrong");
        return new GetRobotsStateReply { };
    });

var stateStream =
    stateTimer
    .SelectMany(stateFetcher)
    .Catch((Exception ex) => DisplayStateError(ex))
    .Publish();

stateStream.Connect();

This implementation gets me the behaviour I need, and has the benefit of not triggering the timer when displaying the error dialog; however, it doesn't then subsequently trigger after dismissing the dialog (I believe because the stream has been terminated) - I'm going to use suggestion in the comments to fix this and then add an answer.


Working solution (can be added as an answer if reopened).

var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(5));
var stateFetcher = Observable.FromAsync(async () =>
    await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

var timerFetch = Observable.SelectMany(fetchTimer, stateFetcher);

IObservable<GetRobotsStateReply> GetErrorHandler(Exception ex) =>
    Observable.FromAsync(async () =>
    {
        await dialogs.ShowErrorMessageAsync(
            "TEST",
            "TEST");
        return (GetRobotsStateReply)null;
    });

IObservable<GetRobotsStateReply> GetStateFetchCycleObservable(
    IObservable<GetRobotsStateReply> source) =>
        source
        .Catch((Exception ex) => GetErrorHandler(ex))
        .SelectMany(state =>
            state != null
            ? GetStateFetchCycleObservable(timerFetch)
            : GetStateFetchCycleObservable(stateFetcher));

var latestState =
    GetStateFetchCycleObservable(timerFetch)
    .Publish();

Thanks to Theodor's suggestions, I've been able to hit on a solution.

I'd made the mistake of not thinking in terms of hot/cold observables and not making proper use of the built-in error handling mechanisms.

I was initially using Observable.Interval but this had the undesired consequence of firing and initiating a new remote request while the previous one was still in-flight (I suppose I could have throttled).

This solution works by using Observable.Timer to set up an initial delay, then make the remote request; this stream is then observed, on error it displays the dialog, and then binds back to the delay + fetch stream.

As the delay + fetch stream is cold, the delay works again as intended, and everything flows back around in a nice loop.

This has been further worked on, as there were issues with double firings of the timer (when using Retry), or the second time around not doing anything after the dialog dismissal.

I realised that was down to the inner observable not having the outer observable's projection back to a value-producing observable.

The new solution manages this, and even solves the problem of immediately re-fetching state if the user dismisses the dialog, or padding with a time interval in the case of a successful result.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
140 views
Welcome To Ask or Share your Answers For Others

1 Answer

Here is my suggestion:

var observable = Observable
    .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
    .Select(x => Observable.FromAsync(async () =>
    {
        return await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest());
    }))
    .Concat()
    .Catch((Exception ex) => Observable.FromAsync<GetRobotsStateReply>(async () =>
    {
        await dialogs.ShowErrorMessageAsync("Failed to fetch info",
            "Failed to get the latest state");
        throw ex;
    }))
    .Retry();

The Timer+Select+Concat operators ensure that the GetRobotsStateAsync will be executed without overlapping. In case of an exception the timer will be discarded, the Catch operator will kick in, and the original error will be rethrown after closing the dialog, in order to trigger the Retry operator. Then everything will be repeated again, with a brand new timer. The loop will keep spinning until the subscription to the observable is disposed.

This solution makes the assumption that the execution of the GetRobotsStateAsync will not exceed the timer's 500 msec interval in a regular basis. Otherwise the ticks produced by the timer will start stacking up (inside the Concat's internal queue), putting the system under memory pressure. For a more sophisticated (but also more complex) periodic mechanism that avoids this problem look at this answer.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...