File: How-To-Use-RxJava.md

package info (click to toggle)
rx-java 3.0.7%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 20,744 kB
  • sloc: java: 310,776; xml: 235; makefile: 8
file content (405 lines) | stat: -rw-r--r-- 16,134 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# Hello World!

The following sample implementations of “Hello World” in Java, Groovy, Clojure, and Scala create an Observable from a list of Strings, and then subscribe to this Observable with a method that prints “Hello _String_!” for each string emitted by the Observable.

You can find additional code examples in the `/src/examples` folders of each [language adaptor](https://github.com/ReactiveX/):

* [RxGroovy examples](https://github.com/ReactiveX/RxGroovy/tree/1.x/src/examples/groovy/rx/lang/groovy/examples)
* [RxClojure examples](https://github.com/ReactiveX/RxClojure/tree/0.x/src/examples/clojure/rx/lang/clojure/examples)
* [RxScala examples](https://github.com/ReactiveX/RxScala/tree/0.x/examples/src/main/scala)

### Java

```java
public static void hello(String... args) {
  Flowable.fromArray(args).subscribe(s -> System.out.println("Hello " + s + "!"));
}
```

If your platform doesn't support Java 8 lambdas (yet), you have to create an inner class of ```Consumer``` manually:
```java
public static void hello(String... args) {
  Flowable.fromArray(args).subscribe(new Consumer<String>() {
      @Override
      public void accept(String s) {
          System.out.println("Hello " + s + "!");
      }
  });
}
```

```java
hello("Ben", "George");
Hello Ben!
Hello George!
```

### Groovy

```groovy
def hello(String[] names) {
    Observable.from(names).subscribe { println "Hello ${it}!" }
}
```

```groovy
hello("Ben", "George")
Hello Ben!
Hello George!
```

### Clojure

```clojure
(defn hello
  [&rest]
  (-> (Observable/from &rest)
    (.subscribe #(println (str "Hello " % "!")))))
```

```
(hello ["Ben" "George"])
Hello Ben!
Hello George!
```
### Scala

```scala
import rx.lang.scala.Observable

def hello(names: String*) {
  Observable.from(names) subscribe { n =>
    println(s"Hello $n!")
  }
}
```

```scala
hello("Ben", "George")
Hello Ben!
Hello George!
```

# How to Design Using RxJava

To use RxJava you create Observables (which emit data items), transform those Observables in various ways to get the precise data items that interest you (by using Observable operators), and then observe and react to these sequences of interesting items (by implementing Observers or Subscribers and then subscribing them to the resulting transformed Observables).

## Creating Observables

To create an Observable, you can either implement the Observable's behavior manually by passing a function to [`create( )`](http://reactivex.io/documentation/operators/create.html) that exhibits Observable behavior, or you can convert an existing data structure into an Observable by using [some of the Observable operators that are designed for this purpose](Creating-Observables).

### Creating an Observable from an Existing Data Structure

You use the Observable [`just( )`](http://reactivex.io/documentation/operators/just.html) and [`from( )`](http://reactivex.io/documentation/operators/from.html) methods to convert objects, lists, or arrays of objects into Observables that emit those objects:

```groovy
Observable<String> o = Observable.from("a", "b", "c");

def list = [5, 6, 7, 8]
Observable<Integer> o2 = Observable.from(list);

Observable<String> o3 = Observable.just("one object");
```

These converted Observables will synchronously invoke the [`onNext( )`](Observable#onnext-oncompleted-and-onerror) method of any subscriber that subscribes to them, for each item to be emitted by the Observable, and will then invoke the subscriber’s [`onCompleted( )`](Observable#onnext-oncompleted-and-onerror) method.

### Creating an Observable via the `create( )` method

You can implement asynchronous i/o, computational operations, or even “infinite” streams of data by designing your own Observable and implementing it with the [`create( )`](http://reactivex.io/documentation/operators/create.html) method.

#### Synchronous Observable Example

```groovy
/**
 * This example shows a custom Observable that blocks 
 * when subscribed to (does not spawn an extra thread).
 */
def customObservableBlocking() {
    return Observable.create { aSubscriber ->
        50.times { i ->
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onNext("value_${i}")
            }
        }
        // after sending all values we complete the sequence
        if (!aSubscriber.unsubscribed) {
            aSubscriber.onCompleted()
        }
    }
}

// To see output:
customObservableBlocking().subscribe { println(it) }
```

#### Asynchronous Observable Example

The following example uses Groovy to create an Observable that emits 75 strings.

It is written verbosely, with static typing and implementation of the `Func1` anonymous inner class, to make the example more clear:

```groovy
/**
 * This example shows a custom Observable that does not block
 * when subscribed to as it spawns a separate thread.
 */
def customObservableNonBlocking() {
    return Observable.create({ subscriber ->
        Thread.start {
            for (i in 0..<75) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext("value_${i}")
            }
            // after sending all values we complete the sequence
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
    } as Observable.OnSubscribe)
}

// To see output:
customObservableNonBlocking().subscribe { println(it) }
```

Here is the same code in Clojure that uses a Future (instead of raw thread) and is implemented more consisely:

```clojure
(defn customObservableNonBlocking []
  "This example shows a custom Observable that does not block 
   when subscribed to as it spawns a separate thread.
   
  returns Observable<String>"
  (Observable/create 
    (fn [subscriber]
      (let [f (future 
                (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
                ; after sending all values we complete the sequence
                (-> subscriber .onCompleted))
        ))
      ))
```

```clojure
; To see output
(.subscribe (customObservableNonBlocking) #(println %))
```

Here is an example that fetches articles from Wikipedia and invokes onNext with each one:

```clojure
(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
  "Fetch a list of Wikipedia articles asynchronously.
  
   return Observable<String> of HTML"
  (Observable/create 
    (fn [subscriber]
      (let [f (future
                (doseq [articleName wikipediaArticleNames]
                  (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
                ; after sending response to onnext we complete the sequence
                (-> subscriber .onCompleted))
        ))))
```

```clojure
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) 
  (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))
```

Back to Groovy, the same Wikipedia functionality but using closures instead of anonymous inner classes:

```groovy
/*
 * Fetch a list of Wikipedia articles asynchronously.
 */
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
    return Observable.create { subscriber ->
        Thread.start {
            for (articleName in wikipediaArticleNames) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
            }
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
        return subscriber
    }
}

fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
    .subscribe { println "--- Article ---\n${it.substring(0, 125)}" }
```

Results:

```text
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...
```

Note that all of the above examples ignore error handling, for brevity. See below for examples that include error handling.

More information can be found on the [[Observable]] and [[Creating Observables|Creating-Observables]] pages.

## Transforming Observables with Operators

RxJava allows you to chain _operators_ together to transform and compose Observables.

The following example, in Groovy, uses a previously defined, asynchronous Observable that emits 75 items, skips over the first 10 of these ([`skip(10)`](http://reactivex.io/documentation/operators/skip.html)), then takes the next 5 ([`take(5)`](http://reactivex.io/documentation/operators/take.html)), and transforms them ([`map(...)`](http://reactivex.io/documentation/operators/map.html)) before subscribing and printing the items:

```groovy
/**
 * Asynchronously calls 'customObservableNonBlocking' and defines
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
    customObservableNonBlocking().skip(10).take(5)
        .map({ stringValue -> return stringValue + "_xform"})
        .subscribe({ println "onNext => " + it})
}
```

This results in:

```text
onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform
```

Here is a marble diagram that illustrates this transformation:

<img src="/Netflix/RxJava/wiki/images/rx-operators/Composition.1.v3.png" width="640" height="536" />

This next example, in Clojure, consumes three asynchronous Observables, including a dependency from one to another, and emits a single response item by combining the items emitted by each of the three Observables with the [`zip`](http://reactivex.io/documentation/operators/zip.html) operator and then transforming the result with [`map`](http://reactivex.io/documentation/operators/map.html):

```clojure
(defn getVideoForUser [userId videoId]
  "Get video metadata for a given userId
   - video metadata
   - video bookmark position
   - user data
  return Observable<Map>"
    (let [user-observable (-> (getUser userId)
              (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
          bookmark-observable (-> (getVideoBookmark userId videoId)
              (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
          ; getVideoMetadata requires :language from user-observable so nest inside map function
          video-metadata-observable (-> user-observable 
              (.mapMany
                ; fetch metadata after a response from user-observable is received
                (fn [user-map] 
                  (getVideoMetadata videoId (:language user-map)))))]
          ; now combine 3 observables using zip
          (-> (Observable/zip bookmark-observable video-metadata-observable user-observable 
                (fn [bookmark-map metadata-map user-map]
                  {:bookmark-map bookmark-map 
                  :metadata-map metadata-map
                  :user-map user-map}))
            ; and transform into a single response object
            (.map (fn [data]
                  {:video-id videoId
                   :video-metadata (:metadata-map data)
                   :user-id userId
                   :language (:language (:user-map data))
                   :bookmark (:viewed-position (:bookmark-map data))
                  })))))
```

The response looks like this:

```clojure
{:video-id 78965, 
 :video-metadata {:video-id 78965, :title House of Cards: Episode 1, 
                  :director David Fincher, :duration 3365}, 
 :user-id 12345, :language es-us, :bookmark 0}
```

And here is a marble diagram that illustrates how that code produces that response:

<img src="/Netflix/RxJava/wiki/images/rx-operators/Composition.2.v3.png" width="640" height="742" />

The following example, in Groovy, comes from [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013). It combines two Observables with the [`merge`](http://reactivex.io/documentation/operators/merge.html) operator, then uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to construct a single item out of the resulting sequence, then transforms that item with [`map`](http://reactivex.io/documentation/operators/map.html) before emitting it:

```groovy
public Observable getVideoSummary(APIVideo video) {
   def seed = [id:video.id, title:video.getTitle()];
   def bookmarkObservable = getBookmark(video);
   def artworkObservable = getArtworkImageUrl(video);
   return( Observable.merge(bookmarkObservable, artworkObservable)
      .reduce(seed, { aggregate, current -> aggregate << current })
      .map({ [(video.id.toString() : it] }))
}
```

And here is a marble diagram that illustrates how that code uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to bring the results from multiple Observables together in one structure:

<img src="/Netflix/RxJava/wiki/images/rx-operators/Composition.3.v3.png" width="640" height="640" />

## Error Handling

Here is a version of the Wikipedia example from above revised to include error handling:

```groovy
/*
 * Fetch a list of Wikipedia articles asynchronously, with error handling.
 */
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
    return Observable.create({ subscriber ->
        Thread.start {
            try {
                for (articleName in wikipediaArticleNames) {
                    if (true == subscriber.isUnsubscribed()) {
                        return;
                    }
                    subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
                }
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } catch(Throwable t) {
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }
            return (subscriber);
        }
    });
}
```

Notice how it now invokes [`onError(Throwable t)`](Observable#onnext-oncompleted-and-onerror) if an error occurs and note that the following code passes [`subscribe()`](http://reactivex.io/documentation/operators/subscribe.html) a second method that handles `onError`:

```groovy
fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
    .subscribe(
        { println "--- Article ---\n" + it.substring(0, 125) }, 
        { println "--- Error ---\n" + it.getMessage() })
```

See the [Error-Handling-Operators](Error-Handling-Operators) page for more information on specialized error handling techniques in RxJava, including methods like [`onErrorResumeNext()`](http://reactivex.io/documentation/operators/catch.html) and [`onErrorReturn()`](http://reactivex.io/documentation/operators/catch.html) that allow Observables to continue with fallbacks in the event that they encounter errors.

Here is an example of how you can use such a method to pass along custom information about any exceptions you encounter. Imagine you have an Observable or cascade of Observables — `myObservable` — and you want to intercept any exceptions that would normally pass through to an Subscriber’s `onError` method, replacing these with a customized Throwable of your own design. You could do this by modifying `myObservable` with the [`onErrorResumeNext()`](http://reactivex.io/documentation/operators/catch.html) method, and passing into that method an Observable that calls `onError` with your customized Throwable (a utility method called [`error()`](http://reactivex.io/documentation/operators/empty-never-throw.html) will generate such an Observable for you):

```groovy
myModifiedObservable = myObservable.onErrorResumeNext({ t ->
   Throwable myThrowable = myCustomizedThrowableCreator(t);
   return (Observable.error(myThrowable));
});
```