Reactive Extensions (Rx) are a set of methods and interfaces that provide a rapid, easy to maintain and understand ways to solve Java developers problems. You got it right, Rx is a set of tools to help you write elegant yet simpler code.
In case with Java, adding Rx library allows you to orchestrate multiple actions that happen due to certain events in the system. Besides, it takes away the pain of using callbacks and horrific global state management.
When you only start working with RxJava, you might think that it is way to difficult to understand and this set of new tools won’t help much. But with the time you’ll learn multiple advantager RX offers:
Intuitivity — description of actions in Rx use the same style as in functional programming, for example, Java Streams. If you deal with Rx, it gives you a possibility to use functional transformations over streams of events.
Expandability — RxJava can be extended by user operators.
Declarativity — functional transformations are announced declaratively.
Composability — operators in RxJava are easily assembled to conduct difficult operations.
Convertibility — operators in RxJava can transform data types by filtering, processing and expanding data streams.
Rx is built on the Observer pattern. The key elements are the Observer and the Subscriber. Observable is the base type. This class contains the main part of the implementation of Rx and includes all basic operators.
So, why RxJava?
1. Asynchronous streams
2. Functional approach
3. Caching is easy
4. Operators with Schedulers
5. Using of Subjects
1. Asynchronous streams
Let’s take an example:
You need to send a request to the database and then after it is accomplished you have to immediately start picking both messages and settings. After the work is completed you need to display a welcome message.
If we want to do the same in Java SE and Android, we have to follow the next steps:
- Run 3-4 different AsyncTasks;
- Create a semaphore that waits for both requests to complete (settings and messages);
- Create fields at the Objects level for storing results.
We can minimize all of these operations by working with RxJava. In this case the code looks like a thread which is located in one place and built on the basis of a functional paradigm.
Alternately, we can use optional binding:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Observable.fromCallable(createNewUser()) .subscribeOn(Schedulers.io()) .flatMap(new Func1<User, Observable<Pair<Settings, List>>>() { // works with settings }) .doOnNext(new Action1<Pair<Settings, List>>() { @Override public void call(Pair<Settings, List> pair) { System.out.println("Received settings" + pair.first); } }) .flatMap(new Func1<Pair<Settings, List>, Observable>() { //works with messages }) .subscribe(new Action1() { @Override public void call(Message message) { System.out.println("New message " + message); } }); |
2. Functional approach
If you are familiar with the functional programming such as concepts of map and zip, working with RxJava will be much easier for you besides. We can say that functional programming is the active use of functions as parameters and results in other functions. For example, map is a function of higher order, used in many programming languages. It applies this function to each element in the list, returning a list of results. Here’s how it looks:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Observable.from(jsonFile) .map(new Func1<File, String>() { @Override public String call(File file) { try { return new Gson().toJson(new FileReader(file), Object.class); } catch (FileNotFoundException e) { // this exception is a part of rx-java throw OnErrorThrowable.addValueAsLastCause(e, file); } } }); |
3. Caching is easy
The next piece of a code uses the caching method in such a way that the only one copy saves the result after it was successful for the first time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Single < List < Todo >> todosSingle = Single.create(emitter - > { Thread thread = new Thread(() - > { try { List < Todo > todosFromWeb = // query a webservice System.out.println("I am only called once!"); emitter.onSuccess(todosFromWeb); } catch (Exception e) { emitter.onError(e); } }); thread.start(); }); // cache the result of the single, so that the web query is only done once Single < List < Todo >> cachedSingle = todosSingle.cache(); |
4. Operators with Schedulers
There is a variety of operators which require to denote Scheduler to operate. At the same time, they have their overloaded methods that use computation(), delay () as a Scheduler Here’s the example:
1 2 3 4 5 6 |
TestSubscriber subscriber = new TestSubscriber<>(); Observable.just(1).delay(1, TimeUnit.SECONDS).subscribe(subscriber); subscriber.awaitTerminalEvent(); Logger.d("LastSeenThread: " + subscriber.getLastSeenThread().getName()); |
We can see the next result in spite of the fact that we didn’t indicate any Scheduler.
Despite the fact we didn’t indicate any Scheduler, the result will be as follows
1 2 3 |
LastSeenThread: RxComputationThreadPool-1 |
If you want to avoid using computation scheduler, you should indicate the required schedulers as the 3rd argument.
1 2 3 |
.delay(1, TimeUnit.SECONDS, Schedulers.immediate()) |
Apart from delay(), there are lots of other operators who can change Scheduler: interval (), timer (), overloads of buffer (), debounce (), skip (), take (), timeout () etc.
5. Using of Subjects
Working with Objects, you should take into account, that by default the sequence of changes in data, sent to onNext subject would be executed(implemented) in the same thread, that was used to call onNext() method, until observeOn() operator wouldn’t appear in this sequence.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
BehaviorSubject<object> subject = BehaviorSubject.create();subject.doOnNext(obj -> Logger.logThread("doOnNext")) .subscribeOn(Schedulers.io()) .observeOn(Schedulers .newThread()) .subscribe(new Subscriber<object>() { @Overridepublic void onNext(Object o) { Logger.logThread("onNext"); } }); subject.onNext("str"); Handler handler = new Handler(); handler.postDelayed(() -> subject.onNext("str"), 1000); |
Both observeOn and subscribeOn are mentioned here, but the result will be the next:
1 2 3 |
doOnNext: RxCachedThreadScheduler-1onNext: RxNewThreadScheduler-1doOnNext: mainonNext: RxNewThreadScheduler-1 |
This means, that when we subscribe to subject it returns the value immediately and then it is processed in a thread of Shedulers.io (). And then, when the following message arrives to subject, we use the thread where onNext() was called.
Conclusion
Using of Rx helps us to solve following problems:
- Solve the problem with caching in an easy way (without creating caching classes;
- Get rid of standard AsyncTask, combine requests and process results during receiving them;
- Memory leak was decreased on 90%;
- Reduce the amount of code lines and to increase an application response. There is no need to write many different AsyncTask,RxJava makes it possible to combine methods without problems.
As you can see, RxJava is a new and really sharp tool to work with network requests and threads, to process responses and to prevent memory leak.
About Redwerk
Since 2005, Redwerk provides outsource programming services for a wide spectrum of industries such as E-commerce, Business Automation, E-health, Media & Entertainment, E-government, Game Development, and of course, for Startups & Innovation. Our dedicated offshore developers team with their abundant experience always find the best technical solutions for all sized projects. Redwerk is engaged into iOS and custom Android application development from the scratch. We respect the time of our clients, thus, we always meet deadlines.