File: Connectable-Observable-Operators.md

package info (click to toggle)
rx-java 3.0.7%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 20,744 kB
  • sloc: java: 310,776; xml: 235; makefile: 8
file content (70 lines) | stat: -rw-r--r-- 3,574 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
This section explains the [`ConnectableObservable`](http://reactivex.io/RxJava/javadoc/rx/observables/ConnectableObservable.html) subclass and its operators:

* [**`ConnectableObservable.connect( )`**](http://reactivex.io/documentation/operators/connect.html) — instructs a Connectable Observable to begin emitting items
* [**`Observable.publish( )`**](http://reactivex.io/documentation/operators/publish.html) — represents an Observable as a Connectable Observable
* [**`Observable.replay( )`**](http://reactivex.io/documentation/operators/replay.html) — ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items
* [**`ConnectableObservable.refCount( )`**](http://reactivex.io/documentation/operators/refcount.html) — makes a Connectable Observable behave like an ordinary Observable

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its `connect()` method is called. In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

<img src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.v3.png" width="640" height="510" />

The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output:

**Example #1:**
```java
Observable firstMillion = Observable.range(1, 1000000).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

firstMillion.subscribe(next -> System.out.println("Subscriber #1: " + next), // onNext
		throwable -> System.out.println("Error: " + throwable), // onError
		() -> System.out.println("Sequence #1 complete") // onComplete
	);
firstMillion.subscribe(next -> System.out.println("Subscriber #2: " + next), // onNext
        throwable -> System.out.println("Error: " + throwable), // onError
		() -> System.out.println("Sequence #2 complete") // onComplete
    );
```
```
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete
```
**Example #2:**
```java
ConnectableObservable firstMillion = Observable.range(1, 1000000).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

firstMillion.subscribe(next -> System.out.println("Subscriber #1: " + next), // onNext
		throwable -> System.out.println("Error: " + throwable), // onError
		() -> System.out.println("Sequence #1 complete") // onComplete
	);

firstMillion.subscribe(next -> System.out.println("Subscriber #2: " + next), // onNext
		throwable -> System.out.println("Error: " + throwable), // onError
		() -> System.out.println("Sequence #2 complete") // onComplete
	);

firstMillion.connect();
```
```
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete
```

#### see also:
* javadoc: <a href="http://reactivex.io/RxJava/javadoc/rx/observables/ConnectableObservable.html">`ConnectableObservable`</a>
* <a href="http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect">Introduction to Rx: Publish and Connect</a>