After all the modifications we have made, the flow now became much more easy to understand:
Observable.merge(
createFinancialStockUpdateObservable(yahooService, query, env),
createTweetStockUpdateObservable(configuration,
trackingKeywords, filterQuery)
)
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.doOnError(ErrorHandler.get())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(this::showToastErrorNotificationMethod)
.observeOn(Schedulers.io())
.doOnNext(this::saveStockUpdate)
.onExceptionResumeNext(StorIOFactory
.createLocalDbStockUpdateRetrievalObservable(this))
.doOnNext(update -> log(update))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(stockUpdate -> {
Log.d("APP", "New update " + stockUpdate.getStockSymbol());
noDataAvailableView.setVisibility(View.GONE);
stockDataAdapter.add(stockUpdate);
recyclerView.smoothScrollToPosition(0);
}, error -> {
if (stockDataAdapter.getItemCount() == 0) {
noDataAvailableView.setVisibility(View.VISIBLE);
}
});
It still has some unwieldy .doOnNext() and .observeOn() calls, but as we will see in the next section, some of them can be taken care of using Observable Transformations.