I'm new to RX Java. I need to execute in asynchronous mode some works and get a callback when ALL works are done. I've placed some Log.d into callbacks methods, and I see that the onComplete (and onNext as well) methods are executed for every completed jobs, but this is not my desidered behavior. Also, I'm unable to resubmit new jobs if I call the dispose method because threads just doesn't starts and I have to set null to the reference of my class containing RX Java methods and create a new Instance.
P.S. please avoid lambda expressions
That's my code:
public class Async2 {
private final CompositeDisposable disposables = new CompositeDisposable();
private ArrayList<FileRepresentation> fileRepresentationList = null;
public Async2() {
fileRepresentationList = new ArrayList<>();
public ArrayList<FileRepresentation> getFileRepresentationList() {
return fileRepresentationList;
public void dispose(){
public Observable<FileRepresentation> calcObservable(Uri uri, Context context) {
return Observable.defer(new Callable<ObservableSource<? extends FileRepresentation>>() {
public ObservableSource<? extends FileRepresentation> call() {
FileUtils fu = new FileUtils();
FileRepresentation fileRepresentation = FileUtils.calcolaChecksumFromUri(uri, context); //this is the long running job
Log.d("test-0X", fileRepresentation.nome);
Log.d("test-0X", fileRepresentation.hash);
Log.d("Thread name: ", Thread.currentThread().getName());
FileRepresentation finalFileRepresentation = fileRepresentation;
//return Observable.defer(() -> Observable.just(finalFileRepresentation));
return Observable.just(finalFileRepresentation);
public void addWorks(List<Uri> uriList, Context context, CommunicationInterface com){
int nObservable = uriList.size();
AtomicInteger remainings = new AtomicInteger(nObservable);
Disposable[] disposableArr = new Disposable[nObservable];
Log.d("addworks", "addWorks method (nObservable var): "+nObservable);
Log.d("addworks", "addWorks method (disposable.size() ): "+disposables.size());
for (int i= 0; i<nObservable; i++){
Disposable disposable = calcObservable(uriList.get(i), context)
// Run on a background thread
// Be notified on the main thread
.subscribeWith(new DisposableObserver<FileRepresentation>() {
public void onComplete() {
if(remainings.decrementAndGet() == 0){
Log.d("Method onComplete called", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onComplete called", "End!!");
Log.d("Method onComplete called", "End!!");
public void onError(Throwable e) {
if(remainings.decrementAndGet() == 0){
Log.d("Method onError", "elementi lista: "+fileRepresentationList.size());
Log.d("Method onError", "End!!");
Log.d("method onError", "method onError called");
public void onNext(FileRepresentation value) {
disposableArr[i] = disposable;
Log.d("addworks", "addWorks method (disposable.size() ): "+disposables.size());
Here I start works:
ArrayList<FileRepresentation> li = async2.getFileRepresentationList();
question from:https://stackoverflow.com/questions/65947191/rx-java-2-oncomplete-method-called-for-every-observable