Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I wrote some code to download a file from server meanwhile updating progress bar. Downloading code was running in Schedulers.io thread and updating ui code was running in AndroidSchedulers.mainThread. My program terminated after download began. Here is my code:

    Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                Response response = getResponse(url);
                if (response != null && response.isSuccessful()) {
                    InputStream is = response.body().byteStream();
                    subscriber.onNext(response.body().contentLength()); // init progress
                    File storedFile = Utils.getStoredFile(context, filePath);
                    OutputStream os = new FileOutputStream(storedFile);

                    byte[] buffer = new byte[1024];
                    int len;
                    while ((len = is.read(buffer)) != -1) {
                        // write data
                        os.write(buffer, 0, len);

                        count += len;
                        subscriber.onNext(count); // update progress
                    }

                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }

                    os.close();
                    is.close();
                    response.body().close();

            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    })
    .subscribeOn(Schedulers.io()) // io and network operation  
    .observeOn(AndroidSchedulers.mainThread()) // UI view update operation  
    .subscribe(new Observer<Long>() {
        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long progress) {
            Log.d(TAG, "onNext -> " + Thread.currentThread().getName());
            Log.d(TAG, "onNext progress -> " + progress);
            // here update view in ui thread
        }
    }
    }

And here is error text:

java.io.InterruptedIOException: thread interrupted
    at okio.Timeout.throwIfReached(Timeout.java:145)
    at okio.Okio$2.read(Okio.java:136)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
    at okio.RealBufferedSource.read(RealBufferedSource.java:50)
    at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418)
    at okio.RealBufferedSource$1.read(RealBufferedSource.java:371)
    at java.io.InputStream.read(InputStream.java:163)
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74)
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52)
    at rx.Observable.unsafeSubscribe(Observable.java:8098)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executor    at java.util.concurrent.FutureTask.run(FutureTask.java:23    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
    at java.lang.Thread.run(Thread.java:841)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
243 views
Welcome To Ask or Share your Answers For Others

1 Answer

The observerOn is apply to the Observable.create but internaly you're creating a new observable in another thread. So your pipeline never give the monitor to the main thread. I think your code it's too much complex for what you want to achieve.

Just in case that help you out to understand the concepts of Scheduler

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...