We can start by extracting a Factory Method for the Observable that is responsible for the retrieval of financial stock quotes from Yahoo API using this:
Observable.interval(0, 5, TimeUnit.SECONDS)
.flatMapSingle(i -> yahooService.yqlQuery(query, env))
.map(r -> r.getQuery().getResults().getQuote())
.flatMap(Observable::fromIterable)
.map(StockUpdate::create)
.groupBy(stockUpdate -> stockUpdate.getStockSymbol())
.flatMap(groupObservable -> groupObservable.distinctUntilChanged())
This code can be extracted into a method:
createFinancialStockUpdateObservable()
This consists of the following:
private Observable<StockUpdate> createFinancialStockUpdateObservable(YahooService yahooService, String query, String env) {
return Observable.interval(0, 5, TimeUnit.SECONDS)
.flatMapSingle(i -> yahooService.yqlQuery(query, env))
.map(r -> r.getQuery().getResults().getQuote())
.flatMap(Observable::fromIterable)
.map(StockUpdate::create)
.groupBy(stockUpdate -> stockUpdate.getStockSymbol())
.flatMap(groupObservable ->
groupObservable.distinctUntilChanged());
}
It is a simple change, but now the Observable.merge() block looks as follows:
Observable.merge(
createFinancialStockUpdateObservable(yahooService, query, env),
observeTwitterStream(configuration, filterQuery)
.sample(2700, TimeUnit.MILLISECONDS)
.map(StockUpdate::create)
.filter(containsAnyOfKeywords(trackingKeywords))
.flatMapMaybe(skipTweetsThatDoNotContainKeywords
(trackingKeywords))
)
The extracted Factory Method can even be moved to some other class, but there is no need for that yet and there is a clear destination class for that, so we will leave it in the MainActivity class for now.