Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

My application terminates when an error is thrown in OnNext by an observer when I use ObserveOn(Scheduler.ThreadPool). The only way I have found to deal with this is by using a custom extension method below (apart from making sure OnNext never throws an exception). And then making sure that each ObserveOn is followed by an ExceptionToError.

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }

However, this does not feel right. Is there a better way to deal with this?

Example

This program crashes because of uncaught exception.

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached

            xs.OnNext(1);
            xs.OnNext(2);
            xs.OnNext(3);
            xs.OnNext(4);
            xs.OnNext(5);
        } catch (Exception e) {
            Console.WriteLine("Caught : " + e.Message); // <- also not reached
        } finally {

            Console.ReadKey();
        }
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
302 views
Welcome To Ask or Share your Answers For Others

1 Answer

We're addressing this issue in Rx v2.0, starting with the RC release. You can read all about it on our blog at http://blogs.msdn.com/rxteam. It basically boils down to more disciplined error handling in the pipeline itself, combined with a SubscribeSafe extension method (to redirect errors during subscription into the OnError channel), and a Catch extension method on IScheduler (to wrap a scheduler with exception handling logic around scheduled actions).

Concerning the ExceptionToError method proposed here, it has one flaw. The IDisposable subscription object can still be null when the callbacks run; there's a fundamental race condition. To work around this, you'd have to use a SingleAssignmentDisposable.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...