Let's take the error displaying code that we examined before:
.observeOn(AndroidSchedulers.mainThread())
.doOnError(this::showToastErrorNotificationMethod)
.observeOn(Schedulers.io())
Since it applies a few transformations of the original Observable, we can extract a method that will apply these three actions to the original Observable, as shown:
private Observable<StockUpdate>
addUiErrorHandling(Observable<StockUpdate> observable) {
return observable.observeOn(AndroidSchedulers.mainThread())
.doOnError(this::showToastErrorNotificationMethod)
.observeOn(Schedulers.io());
}
Also, the modified flow will become as follows:
addUiErrorHandling(
Observable.merge(
createFinancialStockUpdateObservable(yahooService,
query, env),
createTweetStockUpdateObservable
(configuration, trackingKeywords, filterQuery)
)
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.doOnError(ErrorHandler.get())
)
.doOnNext(this::saveStockUpdate)
.onExceptionResumeNext(StorIOFactory
.createLocalDbStockUpdateRetrievalObservable(this))
.doOnNext(update -> log(update))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(stockUpdate -> {
Log.d("APP", "New update " + stockUpdate.getStockSymbol());
noDataAvailableView.setVisibility(View.GONE);
stockDataAdapter.add(stockUpdate);
recyclerView.smoothScrollToPosition(0);
}, error -> {
if (stockDataAdapter.getItemCount() == 0) {
noDataAvailableView.setVisibility(View.VISIBLE);
}
});
It is important to note that this method has to wrap the entire Observable that came before it. This is because the Observable is being constructed using a Builder pattern, and each new operation returns a new Observable. Consider the following block:
Observable.merge(
createFinancialStockUpdateObservable(yahooService,
query, env),
createTweetStockUpdateObservable(configuration,
trackingKeywords, filterQuery)
)
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.doOnError(ErrorHandler.get())
Basically, it returns an Observable that has to be passed to the following method:
Observable<StockUpdate> addUiErrorHandling(Observable<StockUpdate> observable)
An identical way to do that would be an extraction of a variable, as illustrated:
final Observable<StockUpdate> observable = Observable.merge(
createFinancialStockUpdateObservable(yahooService, query, env),
createTweetStockUpdateObservable(configuration,
trackingKeywords, filterQuery)
)
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.doOnError(ErrorHandler.get());
addUiErrorHandling(observable)
.doOnNext(this::saveStockUpdate)