The final steps now left to take are to convert the preceding code into something that can be used in RxJava Observable flows.
We will do this using Observable.create() and by utilizing the Emitter interface.
Let's start by creating the given method:
Observable<Status> observeTwitterStream(Configuration configuration, FilterQuery filterQuery)
This will be responsible for the creation of such an Observable. Inside the methods, we will have a call to the Observable.create() call:
Observable<Status> observeTwitterStream(Configuration configuration, FilterQuery filterQuery) {
return Observable.create(emitter -> {
});
}
This will provide us with a convenient interface where we will be able to specify connection settings (Configuration argument) and the query (FilterQuery argument).
Inside the Observable.create() call, we will initialize the TwitterStream Object and attach it to the listener:
return Observable.create(emitter -> {
final TwitterStream twitterStream
= new TwitterStreamFactory(configuration).getInstance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
}
@Override
public void onDeletionNotice(StatusDeletionNotice
statusDeletionNotice) {
}
@Override
public void onTrackLimitationNotice(int
numberOfLimitedStatuses) {
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
}
@Override
public void onStallWarning(StallWarning warning) {
}
@Override
public void onException(Exception ex) {
}
};
twitterStream.addListener(listener);
twitterStream.filter(filterQuery);
});
The next step is to plug in the onStatus(Status status) to receive status updates into Observable by adding this:
public void onStatus(Status status) {
emitter.onNext(status);
}
Also, we will add onException(Exception ex) to handle exceptions:
public void onException(Exception ex) {
emitter.onError(ex);
}
Finally, we need to ensure that the TwitterStream is properly terminated when the Observable is being disposed of. That can be done by adding a callback to the emitter with .setCancellable():
emitter.setCancellable(() -> twitterStream.shutdown());
In the end, the whole method to create such an Observable will look like this:
Observable<Status> observeTwitterStream(Configuration configuration, FilterQuery filterQuery) {
return Observable.create(emitter -> {
final TwitterStream twitterStream
= new TwitterStreamFactory(configuration).getInstance();
emitter.setCancellable(() -> twitterStream.shutdown());
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
emitter.onNext(status);
}
[...]
@Override
public void onException(Exception ex) {
emitter.onError(ex);
}
};
twitterStream.addListener(listener);
twitterStream.filter(filterQuery);
});
}
Here, some of the overridden methods were skipped for the sake of compactness.
observeTwitterStream() will provide us with a very easy way to plug in to the Twitter data stream as we will see that next.