File: Writing-operators-for-2.0.md

package info (click to toggle)
rx-java 3.0.7%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 20,744 kB
  • sloc: java: 310,776; xml: 235; makefile: 8
file content (1746 lines) | stat: -rw-r--r-- 72,128 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
##### Table of contents

  - [Introduction](#introduction)
    - [Warning on internal components](#warning-on-internal-components)
  - [Atomics, serialization, deferred actions](#atomics-serialization-deferred-actions)
    - [Field updaters and Android](#field-updaters-and-android)
    - [Request accounting](#request-accounting)
    - [Once](#once)
    - [Serialization](#serialization)
    - [Queues](#queues)
    - [Deferred actions](#deferred-actions)
      - [Deferred cancellation](#deferred-cancellation)
      - [Deferred requesting](#deferred-requesting)
    - [Atomic error management](#atomic-error-management)
    - [Half-serialization](#half-serialization)
    - [Fast-path queue-drain](#fast-path-queue-drain)
    - [FlowableSubscriber](#flowablesubscriber)
  - [Backpressure and cancellation](#backpressure-and-cancellation)
    - [Replenishing](#replenishing)
    - [Stable prefetching](#stable-prefetching)
    - [Single-valued results](#single-valued-results)
    - [Single-element post-complete](#single-element-post-complete)
    - [Multi-element post-complete](#multi-element-post-complete)
  - [Creating operator classes](#creating-operator-classes)
    - [Operator by extending a base reactive class](#operator-by-extending-a-base-reactive-class)
    - [Operator targeting lift()](#operator-targeting-lift)
  - [Operator fusion](#operator-fusion)
    - [Generations](#generations)
    - [Components](#components)
      - [Callable and ScalarCallable](#callable-and-scalarcallable)
      - [ConditionalSubscriber](#conditionalsubscriber)
      - [QueueSubscription and QueueDisposable](#queuesubscription-and-queuedisposable)
  - [Example implementations](#example-implementations)
    - [Map-filter hybrid](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#map--filter-hybrid)
    - [Ordered merge](#ordered-merge)

# Introduction

Writing operators, source-like (`fromEmitter`) or intermediate-like (`flatMap`) **has always been a hard task to do in RxJava**. There are many rules to obey, many cases to consider but at the same time, many (legal) shortcuts to take to build a well performing code. Now writing an operator specifically for 2.x is 10 times harder than for 1.x. If you want to exploit all the advanced, 4th generation features, that's even 2-3 times harder on top (so 30 times harder in total).

*(If you have been following [my blog](http://akarnokd.blogspot.hu/) about RxJava internals, writing operators is maybe only 2 times harder than 1.x; some things have moved around, some tools popped up while others have been dropped but there is a relatively straight mapping from 1.x concepts and approaches to 2.x concepts and approaches.)*

In this article, I'll describe the how-to's from the perspective of a developer who skipped the 1.x knowledge base and basically wants to write operators that conforms the Reactive-Streams specification as well as RxJava 2.x's own extensions and additional expectations/requirements.

Since **Reactor 3** has the same architecture as **RxJava 2** (no accident, I architected and contributed 80% of **Reactor 3** as well) the same principles outlined in this page applies to writing operators for **Reactor 3**. Note however that they chose different naming and locations for their utility and support classes so you may have to search for the equivalent components.

## Warning on internal components

RxJava has several hundred public classes implementing various operators and helper facilities. Since there is no way to hide these in Java 6-8, the general contract is that anything below `io.reactivex.internal` is considered private and subject to change without warnings. It is not recommended to reference these in your code (unless you contribute to RxJava itself) and must be prepared that even a patch change may shuffle/rename things around in them. That being said, they usually contain valuable tools for operator builders and as such are quite attractive to use them in your custom code.

# Atomics, serialization, deferred actions

As RxJava itself has building blocks for creating reactive dataflows, its components have building blocks as well in the form of concurrency primitives and algorithms. Many refer to the book [Concurrency in Practice](https://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601) for learning the fundamentals needed. Unfortunately, other than some explanation of the Java Memory Model, the book lacks the techniques required for developing operators for RxJava 1.x and 2.x.

## Field updaters and Android

If you looked at the source code of RxJava and then at **Reactor 3**, you might have noticed that RxJava doesn't use the
`AtomicXFieldUpdater` classes. The reason for this is that on certain Android devices, the runtime "messes up" the field
names and the reflection logic behind the field updaters fails to locate those fields in the operators. To avoid this we decided to only use the full `AtomicX` classes (as fields or extending them).

If you target the RxJava library with your custom operator (or Android), you are encouraged to do the same. If you plan have operators running on desktop Java, feel free to use the field updaters instead.

## Request accounting

When dealing with backpressure in `Flowable` operators, one needs a way to account the downstream requests and emissions in response to those requests. For this we use a single `AtomicLong`. Accounting must be atomic because requesting more and emitting items to fulfill an earlier request may happen at the same time.

The naive approach for accounting would be to simply call `AtomicLong.getAndAdd()` with new requests and `AtomicLong.addAndGet()` for decrementing based on how many elements were emitted.

The problem with this is that the Reactive-Streams specification declares `Long.MAX_VALUE` as the upper bound for outstanding requests (interprets it as the unbounded mode) but adding two large longs may overflow the `long` into a negative value. In addition, if for some reason, there are more values emitted than were requested, the subtraction may yield a negative current request value, causing crashes or hangs.

Therefore, both addition and subtraction have to be capped at `Long.MAX_VALUE` and `0` respectively. Since there is no dedicated `AtomicLong` method for it, we have to use a Compare-And-Set loop. (Usually, requesting happens relatively rarely compared to emission amounts thus the lack of dedicated machine code instruction is not a performance bottleneck.)

```java
public static long add(AtomicLong requested, long n) {
    for (;;) {
        long current = requested.get();
        if (current == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long update = current + n;
        if (update < 0L) {
            update = Long.MAX_VALUE;
        }
        if (requested.compareAndSet(current, update)) {
            return current;
        }
    }
}

public static long produced(AtomicLong requested, long n) {
for (;;) {
        long current = requested.get();
        if (current == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long update = current - n;
        if (update < 0L) {
            update = 0;
        }
        if (requested.compareAndSet(current, update)) {
            return update;
        }
    }
}
```

In fact, these are so common in RxJava's operators that these algorithms are available as utility methods on the **internal** `BackpressureHelper` class under the same name.

Sometimes, instead of having a separate `AtomicLong` field, your operator can extend `AtomicLong` saving on the indirection and class size. The practice in RxJava 2.x operators is that unless there is another atomic counter needed by the operator, (such as work-in-progress counter, see the later subsection) and otherwise doesn't need a base class, they extend `AtomicLong` directly.

The `BackpressureHelper` class features special versions of `add` and `produced` which treat `Long.MIN_VALUE` as a cancellation indication and won't change the `AtomicLong`s value if they see it.

## Once

RxJava 2 expanded the single-event reactive types to include `Maybe` (called as the reactive `Optional` by some). The common property of `Single`, `Completable` and `Maybe` is that they can only call one of the 3 kinds of methods on their consumers: `(onSuccess | onError | onComplete)`. Since they also participate in concurrent scenarios, an operator needs a way to ensure that only one of them is called even though the input sources may call multiple of them at once.

To ensure this, operators may use the `AtomicBoolean.compareAndSet` to atomically chose the event to relay (and thus the other events to drop).

```java
final AtomicBoolean once = new AtomicBoolean();

final MaybeObserver<? super T> child = ...; 

void emitSuccess(T value) {
   if (once.compareAndSet(false, true)) {
       child.onSuccess(value);
   }
}

void emitFailure(Throwable e) {
    if (once.compareAndSet(false, true)) {
        child.onError(e);
    } else {
        RxJavaPlugins.onError(e);
    }
}
```

Note that the same sequential requirement applies to these 0-1 reactive sources as to `Flowable`/`Observable`, therefore, if your operator doesn't have to deal with events from multiple sources (and pick one of them), you don't need this construct.

## Serialization

With more complicated sources, it may happen that multiple things happen that may trigger emission towards the downstream, such as upstream becoming available while the downstream requests for more data while the sequence gets cancelled by a timeout.

Instead of working out the often very complicated state transitions via atomics, perhaps the easiest way is to serialize the events, actions or tasks and have one thread perform the necessary steps after that. This is what I call **queue-drain** approach (or trampolining by some). 

(The other approach, **emitter-loop** is no longer recommended with 2.x due to its potential blocking `synchronized` constructs that looks performant in single-threaded case but destroys it in true concurrent case.)


The concept is relatively simple: have a concurrent queue and a work-in-progress atomic counter, enqueue the item, increment the counter and if the counter transitioned from 0 to 1, keep draining the queue, work with the element and decrement the counter until it reaches zero again:

```java
final ConcurrentLinkedQueue<Runnable> queue = ...;
final AtomicInteger wip = ...;

void execute(Runnable r) {
    queue.offer(r);
    if (wip.getAndIncrement() == 0) {
        do {
            queue.poll().run();
        } while (wip.decrementAndGet() != 0);
    }
}
```

The same pattern applies when one has to emit onNext values to a downstream consumer:

```java
final ConcurrentLinkedQueue<T> queue = ...;
final AtomicInteger wip = ...;
final Subscriber<? super T> child = ...;

void emit(T r) {
    queue.offer(r);
    if (wip.getAndIncrement() == 0) {
        do {
            child.onNext(queue.poll());
        } while (wip.decrementAndGet() != 0);
    }
}
```

### Queues

Using `ConcurrentLinkedQueue` is a reliable although mostly an overkill for such situations because it allocates on each call to `offer()` and is unbounded. It can be replaced with more optimized queues (see [JCTools](https://github.com/JCTools/JCTools/)) and RxJava itself also has some customized queues available (internal!):

  - `SpscArrayQueue` used when the queue is known to be fed by a single thread but the serialization has to look at other things (request, cancellation, termination) that can be read from other fields. Example: `observeOn` has a fixed request pattern which fits into this type of queue and extra fields for passing an error, completion or downstream requests into the drain logic.
  - `SpscLinkedArrayQueue` used when the queue is known to be fed by a single thread but there is no bound on the element count. Example: `UnicastProcessor`, almost all buffering `Observable` operator. Some operators use it with multiple event sources by synchronizing on the `offer` side - a tradeoff between allocation and potential blocking:

```java
SpscLinkedArrayQueue<T> q = ...
synchronized(q) {
    q.offer(value);
}
```

  - `MpscLinkedQueue` where there could be many feeders and unknown number of elements. Example: `buffer` with reactive boundary.

The RxJava 2.x implementations of these types of queues have different class hierarchy than the JDK/JCTools versions. Our classes don't implement the `java.util.Queue` interface but rather a custom, simplified interface:

```java
interface SimpleQueue<T> {
    boolean offer(T t);

    boolean offer(T t1, T t2);

    T poll() throws Exception;

    boolean isEmpty();

    void clear();
}

interface SimplePlainQueue<T> extends SimpleQueue<T> {
    @Override
    T poll();
}

public final class SpscArrayQueue<T> implements SimplePlainQueue<T> {
    // ...
}
```

This simplified queue API gets rid of the unused parts (iterator, collections API remnants) and adds a bi-offer method (only implemented atomically in `SpscLinkedArrayQueue` currently). The second interface, `SimplePlainQueue` is defined to suppress the `throws Exception` on poll on queue types that won't throw that exception and there is no need for try-catch around them.

## Deferred actions

The Reactive-Streams has a strict requirement that calling `onSubscribe()` must happen before any calls to the rest of the `onXXX` methods and by nature, any calls to `Subscription.request()` and `Subscription.cancel()`. The same logic applies to the design of `Observable`, `Single`, `Completable` and `Maybe` with their connection type of `Disposable`.

Often though, such call to `onSubscribe` may happen later than the respective `cancel()` needs to happen. For example, the user may want to call `cancel()` before the respective `Subscription` actually becomes available in `subscribeOn`. Other operators may need to call `onSubscribe` before they connect to other sources but at that time, there is no direct way for relaying a `cancel` call to an unavailable upstream `Subscription`.

The solution is **deferred cancellation** and **deferred requesting** in general.

### Deferred cancellation

This approach affects all 5 reactive types and works the same way for everyone. First, have an `AtomicReference` that will hold the respective connection type (or any other type whose method call has to happen later). Two methods are needed handling the `AtomicReference` class, one that sets the actual instance and one that calls the `cancel`/`dispose` method on it.

```java
static final Disposable DISPOSED;
static {
    DISPOSED = Disposables.empty();
    DISPOSED.dispose();
}

static boolean set(AtomicReference<Disposable> target, Disposable value) {
    for (;;) {
        Disposable current = target.get();
        if (current == DISPOSED) {
            if (value != null) {
                value.dispose();
            }
            return false;
        }
        if (target.compareAndSet(current, value)) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
}

static boolean dispose(AtomicReference<Disposable> target) {
    Disposable current = target.getAndSet(DISPOSED);
    if (current != DISPOSED) {
        if (current != null) {
            current.dispose();
        }
        return true;
    }
    return false;
}
```

The approach uses an unique sentinel value `DISPOSED` - that should not appear elsewhere in your code - to indicate once a late actual `Disposable` arrives, it should be disposed immediately. Both methods return true if the operation succeeded or false if the target was already disposed.

Sometimes, only one call to `set` is permitted (i.e., `setOnce`) and other times, the previous non-null value needs no call to `dispose` because it is known to be disposed already (i.e., `replace`).

As with the request management, there are utility classes and methods for these operations:

   - (internal) `SequentialDisposable` that uses `update`, `replace` and `dispose` but leaks the API of `AtomicReference`
   - `SerialDisposable` that has safe API with `set`, `replace` and `dispose` among other things
   - (internal) `DisposableHelper` that features the methods shown above and the global disposed sentinel used by RxJava. It may come handy when one uses `AtomicReference<Disposable>` as a base class.

The same pattern applies to `Subscription` with its `cancel()` method and with helper (internal) class `SubscriptionHelper` (but no `SequentialSubscription` or `SerialSubscription`, see next subsection).

### Deferred requesting

With `Flowable`s (and Reactive-Streams `Publisher`s) the `request()` calls need to be deferred as well. In one form (the simpler one), the respective late `Subscription` will eventually arrive and we need to relay all previous and all subsequent request amount to its `request()` method.

In 1.x, this behavior was implicitly provided by `rx.Subscriber` but at a high cost that had to be payed by all instances whether or not they needed this feature.

The solution works by having the `AtomicReference` for the `Subscription` and an `AtomicLong` to store and accumulate the requests until the actual `Subscription` arrives, then atomically request all deferred value once.

```java
static boolean deferredSetOnce(AtomicReference<Subscription> subscription, 
        AtomicLong requested, Subscription newSubscription) {
    if (subscription.compareAndSet(null, newSubscription) {
        long r = requested.getAndSet(0L);
        if (r != 0) {
            newSubscription.request(r);
        }
        return true;
    }
    newSubscription.cancel();
    if (subscription.get() != SubscriptionHelper.CANCELLED) {
        RxJavaPlugins.onError(new IllegalStateException("Subscription already set!"));
    }
    return false;
}

static void deferredRequest(AtomicReference<Subscription> subscription, 
        AtomicLong requested, long n) {
    Subscription current = subscription.get();
    if (current != null) {
        current.request(n);
    } else {
        BackpressureHelper.add(requested, n);
        current = subscription.get();
        if (current != null) {
            long r = requested.getAndSet(0L);
            if (r != 0L) {
                current.request(r);
            }
        }
    }
}
```

In `deferredSetOnce`, if the CAS from null to the `newSubscription` succeeds, we atomically exchange the request amount to 0L and if the original value was nonzero, we request from `newSubscription`. In `deferredRequest`, if there is a `Subscription` we simply request from it directly. Otherwise, we accumulate the requests via the helper method then check again if the `Subscription` arrived or not. If it arrived in the meantime, we atomically exchange the accumulated request value and if nonzero, request it from the newly retrieved `Subscription`. This non-blocking logic makes sure that in case of concurrent invocations of the two methods, no accumulated request is left behind.

This complex logic and methods, along with other safeguards are available in the (internal) `SubscriptionHelper` utility class and can be used like this:

```java
final class Operator<T> implements Subscriber<T>, Subscription {
    final Subscriber<? super T> child;

    final AtomicReference<Subscription> ref = new AtomicReference<>();
    final AtomicLong requested = new AtomicLong();

    public Operator(Subscriber<? super T> child) {
        this.child = child;
    }

    @Override
    public void onSubscribe(Subscription s) {
        SubscriptionHelper.deferredSetOnce(ref, requested, s);
    }

    @Override
    public void onNext(T t) { ... }

    @Override
    public void onError(Throwable t) { ... }

    @Override
    public void onComplete() { ... }

    @Override
    public void cancel() {
        SubscriptionHelper.cancel(ref);
    }

    @Override
    public void request(long n) {
        SubscriptionHelper.deferredRequested(ref, requested, n);
    }
}

Operator<T> parent = new Operator<T>(child);

child.onSubscribe(parent);

source.subscribe(parent);
```

The second form is when multiple `Subscription`s replace each other and we not only need to hold onto request amounts when there is none of them but make sure a newer `Subscription` is requested only that much the previous `Subscription`'s upstream didn't deliver. This is called **Subscription arbitration** and the relevant algorithms are quite verbose and will be omitted here. There is, however, an utility class that manages this: (internal) `SubscriptionArbiter`.

You can extend it (to save on object headers) or have it as a field. Its main use is to send it to the downstream via `onSubscribe` and update its current `Subscription` in the current operator. Note that even though its methods are thread-safe, it is intended for swapping `Subscription`s when the current one finished emitting events. This makes sure that any newer `Subscription` is requested the right amount and not more due to production/switch race.

```java
final SubscriptionArbiter arbiter = ...

// ...

child.onSubscribe(arbiter);

// ...

long produced;

@Override
public void onSubscribe(Subscription s) {
    arbiter.setSubscription(s);
}

@Override
public void onNext(T value) {
   produced++;
   child.onNext(value);
}

@Override
public void onComplete() {
    long p = produced;
    if (p != 0L) {
        arbiter.produced(p);
    }
    subscribeNext();
}
```

For better performance, most operators can count the produced element amount and issue a single `SubscriptionArbiter.produced()` call just before switching to the next `Subscription`.


## Atomic error management

In some cases, multiple sources may signal a `Throwable` at the same time but the contract forbids calling `onError` multiple times. Once can, of course use the **once** approach with `AtomicReference<Throwable>` and throw out but the first one to set the `Throwable` on it.

The alternative is to collect these `Throwable`s into a `CompositeException` as long as possible and at one point lock out the others. This works by doing a copy-on-write scheme or by linking `CompositeException`s atomically and having a terminal sentinel to indicate all further errors should be dropped.

```java
static final Throwable TERMINATED = new Throwable();

static boolean addThrowable(AtomicReference<Throwable> ref, Throwable e) {
    for (;;) {
        Throwable current = ref.get();
        if (current == TERMINATED) {
            return false;
        }
        Throwable next;
        if (current == null) {
            next = e;
        } else {
            next = new CompositeException(current, e);
        }
        if (ref.compareAndSet(current, next)) {
            return true;
        }
    }
}

static Throwable terminate(AtomicReference<Throwable> ref) {
    return ref.getAndSet(TERMINATED);
}
```

as with most common logic, this is supported by the (internal) `ExceptionHelper` utility class and the (internal) `AtomicThrowable` class.

The usage pattern looks as follows:

```java

final AtomicThrowable errors = ...;

@Override
public void onError(Throwable e) {
    if (errors.addThrowable(e)) {
        drain();
    } else {
        RxJavaPlugins.onError(e);
    }
}

void drain() {
   // ...
   if (errors.get() != null) {
       child.onError(errors.terminate());
       return;
   }
   // ...
}
```

## Half-serialization

Sometimes having the queue-drain, `SerializedSubscriber` or `SerializedObserver` is a bit of an overkill. Such cases include when there is only one thread calling `onNext` but other threads may call `onError` or `onComplete` concurrently. Example operators include `takeUntil` where the other source may "interrupt" the main sequence and inject an `onComplete` into it before the main source itself would complete some time later. This is what I call **half-serialization**.

The approach uses the concepts of the deferred actions and atomic error management discussed above and has 3 methods for the `onNext`, `onError` and `onComplete` management:

```java
public static <T> void onNext(Subscriber<? super T> subscriber, T value,
        AtomicInteger wip, AtomicThrowable error) {
    if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
        subscriber.onNext(value);
        if (wip.decrementAndGet() != 0) {
            Throwable ex = error.terminate();
            if (ex != null) {
                subscriber.onError(ex);
            } else {
                subscriber.onComplete();
            }
        }
    }
}

public static void onError(Subscriber<?> subscriber, Throwable ex,
        AtomicInteger wip, AtomicThrowable error) {
    if (error.addThrowable(ex)) {
        if (wip.getAndIncrement() == 0) {
            subscriber.onError(error.terminate());
        }
    } else {
        RxJavaPlugins.onError(ex);
    }
}

public static void onComplete(Subscriber<?> subscriber,
        AtomicInteger wip, AtomicThrowable error) {
    if (wip.getAndIncrement() == 0) {
        Throwable ex = error.terminate();
        if (ex != null) {
            subscriber.onError(ex);
        } else {
            subscriber.onComplete();
        }
    }
}
```

Here, the `wip` counter indicates there is an active emission happening and if found non-zero when trying to leave the `onNext`, it is taken as indication there was a concurrent `onError` or `onComplete()` call and the child must be notified. All subsequent calls to any of these methods are ignored. In this case, the `wip` is never decremented back to zero.

RxJava 2.x, again, supports these with the (internal) utility class `HalfSerializer` and allows targeting `Subscriber`s and `Observer`s with it.

## Fast-path queue-drain

In some operators, it is unlikely concurrent threads try to enter into the drain loop at the same time and having to play the full enqueue-increment-dequeue adds unnecessary overhead.

Luckily, such situations can be detected by a simple compare-and-set attempt on the work-in-progress counter, trying to change the amount from 0 to 1. If it fails, there is a concurrent drain in progress and we revert back to the classical queue-drain logic. If succeeds, we don't enqueue anything but emit the value / perform the action right there and we try to leave the serialized section.

```java
public void onNext(T v) {
   if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
       child.onNext(v);
       if (wip.decrementAndGet() == 0) {
           break;
       }
   } else {
       queue.offer(v);
       if (wip.getAndIncrement() != 0) {
           break;
       }
   }
   drainLoop();
}

void drain() {
   if (getAndIncrement() == 0) {
       drainLoop();
   }
}

void drainLoop() {
   // the usual drain loop part after the classical getAndIncrement()
}
```

In this pattern, the classical `drain` is spit into `drain` and `drainLoop`. The new `drain` does the increment-check and calls `drainLoop` and `drainLoop` contains the remaining logic with the loop, emission and wip management as usual.

On the fast path, when we try to leave it, it is possible a concurrent call to `onNext` or `drain` incremented the `wip` counter further and the decrement didn't return it to zero. This is an indication for further work and we call `drainLoop` to process it.

## FlowableSubscriber

Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Subscriber` from Reactive-Streams. It has the same methods with the same parameter types but different textual rules attached to it, a set of relaxations to the Reactive-Streams specification to enable better performing RxJava internals while still honoring the specification to the letter for non-RxJava consumers of `Flowable`s.

The rule relaxations are as follows:

- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.

When a `Flowable` gets subscribed by a `Subscriber`, an `instanceof` check will detect `FlowableSubscriber` and not apply the `StrictSubscriber` wrapper that makes sure the relaxations don't happen. In practice, ensuring rule §3.9 has the most overhead because a bad request may happen concurrently with an emission of any normal event and thus has to be serialized with one of the methods described in previous sections. 

**In fact, 2.x was always implemented in this relaxed manner thus looking at existing code and style is the way to go.**

Therefore, it is strongly recommended one implements custom intermediate and end operators via `FlowableSubscriber`.

From a source operator's perspective, extending the `Flowable` class and implementing `subscribeActual` has no need for
dispatching over the type of the `Subscriber`; the backing infrastructure already applies wrapping if necessary thus one can be sure in `subscribeActual(Subscriber<? super T> s)` the parameter `s` is a `FlowableSubscriber`. (The signature couldn't be changed for compatibility reasons.) Since the two interfaces on the Java level are the same, no real preferential treating is necessary within sources (i.e., don't cast `s` into `FlowableSubscriber`.

The other base reactive consumers, `Observer`, `SingleObserver`, `MaybeObserver` and `CompletableObserver` don't need such relaxation, because

- they are only defined and used in RxJava (i.e., no other library implemented with them),
- they were conceptionally always derived from the relaxed `Subscriber` RxJava had,
- they don't have backpressure thus no `request()` call that would introduce another concurrency to think about,
- there is no way to trigger emission from upstream before `onSubscribe(Disposable)` returns in standard operators (again, no `request()` method).

# Backpressure and cancellation

Backpressure (or flow control) in Reactive-Streams is the means to tell the upstream how many elements to produce or to tell it to stop producing elements altogether. Unlike the name suggest, there is no physical pressure preventing the upstream from calling `onNext` but the protocol to honor the request amount.

## Replenishing

When dealing with basic transformations in a flow, there are often cases when the number of items the upstream sends should be different what the downstream receives. Some operators may want to filter out certain items, others would batch up items before sending one item to the downstream.

However, when an item is not forwarded by an operator, the downstream has no way of knowing its `request(1)` triggered an item generation that got dropped/buffered. Therefore, it can't know to (nor should it) repeat `request(1)` to "nudge" the source somewhere more upstream to try producing another item which now hopefully will result in an item being received by the downstream. Unlike, say the ACK-NACK based protocols, the requesting specified by the Reactive Streams are to be treated as cumulative. In the previous example, an impatient downstream would have 2 outstanding requests.

Therefore, if an operator is not guaranteed to relay an upstream item to downstream, and thus keeping a 1:1 ratio, it has the duty to keep requesting items from the upstream until said operator ends up in a position to supply an item to the downstream.

This may sound a bit complicated, but perhaps a demonstration of a `filter` operator can help:

```java
final class FilterOddSubscriber implements FlowableSubscriber<Integer>, Subscription {
    
    final Subscriber<? super Integer> downstream;
    
    Subscription upstream;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
        if (upstream != null) {
            s.cancel();
        } else {
            upstream = s;
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(Integer item) {
        if (item % 2 != 0) {
           downstream.onNext(item);
        } else {
           upstream.request(1);
        }
    }
     
    @Override
    public void request(long n) {
        upstream.request(n);
    }

    // the rest omitted for brevity
}
```

In such operators, thee downstream's `request` calls are forwarded to the upstream as is, and for `n` times (at most, unless completed) the `onNext` will be invoked. In this operator, we look for the odd numbers of the flow. If we find one, the downstream will be notified. If the incoming item is even, we won't forward it to the downstream. However, the downstream is still expecting at least 1 item, but since the upstream and downstream practically talk to each other directly, the upstream considers its duty to generate items fulfilled. This misalignment is then resolved by requesting 1 more item from the upstream for the previous ignored item. If more items arrive that get ignored, more will be requested as replenishment.

Given that backpressure involves some overhead in the form of one or more atomic operations, requesting one by one could add a lot of overhead if the number of items filtered out is significantly more than those that passed through. If necessary, this situation can be either solved by [decoupling the upstream and downstream's request management](#stable-prefetching) or using an RxJava-specific type and protocol extension in the form of [ConditionalSubscriber](#conditionalsubscriber)s.

## Stable prefetching

In a previous section, we saw primitives to deal with request accounting and delayed `Subscriptions`, but often, operators have to react to request amount changes as well. This comes up when the operator has to decouple the downstream request amount from the amount it requests from upstream, such as `observeOn`.

Such logic can get quite complicated in operators but one of the simplest manifestation can be the `rebatchRequest` operator that combines request management with serialization to ensure that upstream is requested with a predictable pattern no matter how the downstream requested (less, more or even unbounded):

```java
final class RebatchRequests<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {

    final Subscriber<? super T> child;

    final AtomicLong requested;

    final SpscArrayQueue<T> queue;

    final int batchSize;

    final int limit;

    Subscription s;

    volatile boolean done;
    Throwable error;

    volatile boolean cancelled;

    long emitted;

    public RebatchRequests(Subscriber<? super T> child, int batchSize) {
        this.child = child;
        this.batchSize = batchSize;
        this.limit = batchSize - (batchSize >> 2); // 75% of batchSize
        this.requested = new AtomicLong();
        this.queue = new SpscArrayQueue<T>(batchSize);
    }

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        child.onSubscribe(this);
        s.request(batchSize);
    }

    @Override
    public void onNext(T t) {
        queue.offer(t);
        drain();
    }

    @Override
    public void onError(Throwable t) {
        error = t;
        done = true;
        drain();
    }

    @Override
    public void onComplete() {
        done = true;
        drain();
    }

    @Override
    public void request(long n) {
        BackpressureHelper.add(requested, n);
        drain();
    }

    @Override
    public void cancel() {
        cancelled = true;
        s.cancel();
    }

    void drain() {
        // see next code example
    }
}
```

Here we extend `AtomicInteger` since the work-in-progress counting happens more often and is worth avoiding the extra indirection. The class extends `Subscription` and it hands itself to the `child` `Subscriber` to capture its `request()`  (and `cancel()`) calls and route it to the main `drain` logic. Some operators need only this, some other operators (such as `observeOn` not only routes the downstream request but also does extra cancellations (cancels the asynchrony providing `Worker` as well) in its `cancel()` method.

**Important**: when implementing operators for `Flowable` and `Observable` in RxJava 2.x, you are not allowed to pass along an upstream `Subscription` or `Disposable` to the child `Subscriber`/`Observer` when the operator logic itself doesn't require hooking the `request`/`cancel`/`dispose` calls. The reason for this is how operator-fusion is implemented on top of `Subscription` and `Disposable` passing through `onSubscribe` in RxJava 2.x (and in **Reactor 3**). See the next section about operator-fusion. There is no fusion in `Single`, `Completable` or `Maybe` (because there is no requesting or unbounded buffering with them) and their operators can pass the upstream `Disposable` along as is.

Next comes the `drain` method whose pattern appears in many operators (with slight variations on how and what the emission does).

```java
void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;

    for (;;) {
        long r = requested.get();
        long e = 0L;
        long f = emitted;
        
        while (e != r) {
            if (cancelled) {
                return;
            }
            boolean d = done;

            if (d) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                    return;
                }
            }

            T v = queue.poll();
            boolean empty = v == null;

            if (d && empty) {
                child.onComplete();
                return;
            }

            if (empty) {
                break;
            }

            child.onNext(v);
            
            e++;
            if (++f == limit) {
               s.request(f);
               f = 0L;
            }
        }

        if (e == r) {
            if (cancelled) {
                return;
            }

            if (done) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                    return;
                }
                if (queue.isEmpty()) {
                    child.onComplete();
                    return;
                }
            }
        }

        if (e != 0L) {
            BackpressureHelper.produced(requested, e);
        }

        emitted = f;
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
```

This particular pattern is called the **stable-request queue-drain**. Another variation doesn't care about request amount stability towards upstream and simply requests the amount it delivered to the child:

```java
void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;

    for (;;) {
        long r = requested.get();
        long e = 0L;
        
        while (e != r) {
            if (cancelled) {
                return;
            }
            boolean d = done;

            if (d) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                    return;
                }
            }

            T v = queue.poll();
            boolean empty = v == null;

            if (d && empty) {
                child.onComplete();
                return;
            }

            if (empty) {
                break;
            }

            child.onNext(v);
            
            e++;
        }

        if (e == r) {
            if (cancelled) {
                return;
            }

            if (done) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                    return;
                }
                if (queue.isEmpty()) {
                    child.onComplete();
                    return;
                }
            }
        }

        if (e != 0L) {
            BackpressureHelper.produced(requested, e);
            s.request(e);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
```

The third variation allows delaying a potential error until the upstream has terminated and all normal elements have been delivered to the child:

```java
final boolean delayError;

void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;

    for (;;) {
        long r = requested.get();
        long e = 0L;
        
        while (e != r) {
            if (cancelled) {
                return;
            }
            boolean d = done;

            if (d && !delayError) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                    return;
                }
            }

            T v = queue.poll();
            boolean empty = v == null;

            if (d && empty) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                } else {
                    child.onComplete();
                }
                return;
            }

            if (empty) {
                break;
            }

            child.onNext(v);
            
            e++;
        }

        if (e == r) {
            if (cancelled) {
                return;
            }

            if (done) {
                if (delayError) {
                    if (queue.isEmpty()) {
                        Throwable ex = error;
                        if (ex != null) {
                            child.onError(ex);
                        } else {
                            child.onComplete();
                        }
                        return;
                    }
                } else {
                    Throwable ex = error;
                    if (ex != null) {
                        child.onError(ex);
                        return;
                    }
                    if (queue.isEmpty()) {
                        child.onComplete();
                        return;
                    }
                }
            }
        }

        if (e != 0L) {
            BackpressureHelper.produced(requested, e);
            s.request(e);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
```

If the downstream cancels the operator, the `queue` may still hold elements which may get referenced longer than expected if the operator chain itself is referenced in some way. On the user level, applying `onTerminateDetach` will forget all references going upstream and downstream and can help with this situation. On the operator level, RxJava 2.x usually calls `clear()` on the `queue` when the sequence is cancelled or ends before the queue is drained naturally. This requires some slight change to the drain loop:

```java
final boolean delayError;

@Override
public void cancel() {
    cancelled = true;
    s.cancel();
    if (getAndIncrement() == 0) {
        queue.clear();    // <----------------------------
    }
}

void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;

    for (;;) {
        long r = requested.get();
        long e = 0L;
        
        while (e != r) {
            if (cancelled) {
                queue.clear();    // <----------------------------
                return;
            }
            boolean d = done;

            if (d && !delayError) {
                Throwable ex = error;
                if (ex != null) {
                    queue.clear();    // <----------------------------
                    child.onError(ex);
                    return;
                }
            }

            T v = queue.poll();
            boolean empty = v == null;

            if (d && empty) {
                Throwable ex = error;
                if (ex != null) {
                    child.onError(ex);
                } else {
                    child.onComplete();
                }
                return;
            }

            if (empty) {
                break;
            }

            child.onNext(v);
            
            e++;
        }

        if (e == r) {
            if (cancelled) {
                queue.clear();    // <----------------------------
                return;
            }

            if (done) {
                if (delayError) {
                    if (queue.isEmpty()) {
                        Throwable ex = error;
                        if (ex != null) {
                            child.onError(ex);
                        } else {
                            child.onComplete();
                        }
                        return;
                    }
                } else {
                    Throwable ex = error;
                    if (ex != null) {
                        queue.clear();    // <----------------------------
                        child.onError(ex);
                        return;
                    }
                    if (queue.isEmpty()) {
                        child.onComplete();
                        return;
                    }
                }
            }
        }

        if (e != 0L) {
            BackpressureHelper.produced(requested, e);
            s.request(e);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
```

Since the queue is single-producer-single-consumer, its `clear()` must be called from a single thread - which is provided by the serialization loop and is enabled by the `getAndIncrement() == 0` "half-loop" inside `cancel()`.

An important note on the order of calls to `done` and the `queue`'s state:

```java
boolean d = done;
T v = queue.poll();
```

and

```java
boolean d = done;
boolean empty = queue.isEmpty();
```

These must happen in the order specified. If they were swapped, it is possible when the drain runs asynchronously to an `onNext`/`onComplete()`, the queue may appear empty at first, then it gets elements followed by `done = true` and a late `done` check in the drain loop may complete the sequence thinking it delivered all values there was.

## Single valued results

Sometimes an operator only emits one single value at some point instead of emitting more or all of its sources. Such operators include `fromCallable`, `reduce`, `any`, `all`, `first`, etc.

The classical queue-drain works here but is a bit of an overkill to allocate objects to store the work-in-progress counter, request accounting and the queue itself. These elements can be reduced to a single state-machine with one state counter object - often inlinded by extending AtomicInteger - and a plain field for storing the single value to be emitted.

The state machine handing the possible concurrent downstream requests and normal completion path is a bit complicated to show here and is quite easy to get wrong.

RxJava 2.x supports this kind of behavior through the (internal) `DeferredScalarSubscription` for operators without an upstream source (`fromCallable`) and the (internal) `DeferredScalarSubscriber` for reduce-like operators with an upstream source.

Using the `DeferredScalarSubscription` is straightforward, one creates it, sends it to the downstream via `onSubscribe` and later on calls `complete(T)` to signal the end with a single value:

```java
DeferredScalarSubscription<Integer> dss = new DeferredScalarSubscription<>(child);
child.onSubscribe(dss);

dss.complete(1);
```

Using the `DeferredScalarSubscriber` requires more coding and extending the class itself:

```java
final class Counter extends DeferredScalarSubscriber<Object, Integer> {
   public Counter(Subscriber<? super Integer> child) {
       super(child);
       value = 0;
       hasValue = true;
   }

   @Override
   public void onNext(Object t) {
       value++;
   }
}
```

By default, the `DeferredScalarSubscriber.onSubscribe()` requests `Long.MAX_VALUE` from the upstream (but the method can be overridden in subclasses).

## Single-element post-complete

Some operators have to modulate a sequence of elements in a 1:1 fashion but when the upstream terminates, they need to produce a final element followed by a terminal event (usually `onComplete`).

```java
final class OnCompleteEndWith implements Subscriber<T>, Subscription {
    final Subscriber<? super T> child;

    final T finalElement;

    Subscription s;

    public OnCompleteEndWith(Subscriber<? super T> child, T finalElement) {
        this.child = child;
        this.finalElement = finalElement;
    }

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        child.onSubscribe(this);
    }

    @Override
    public void onNext(T t) {
        child.onNext(t);
    }

    @Overide
    public void onError(Throwable t) {
        child.onError(t);
    }

    @Override
    public void onComplete() {
        child.onNext(finalElement);
        child.onComplete();
    }

    @Override
    public void request(long n) {
        s.request(n);
    }

    @Override
    public void cancel() {
        s.cancel();
    }
}
```

This works if the downstream request more than the upstream produces + 1, otherwise the call to `onComplete` may overflow the child `Subscriber`.

Heavyweight solutions such as queue-drain or `SubscriptionArbiter` with `ScalarSubscriber` can be used here, however, there is a more elegant solution to the problem.

The idea is that request amounts occupy only 63 bits of a 64 bit (atomic) long type. If we'd mask out the lower 63 bits when working with the amount, we can use the most significant bit to indicate the upstream sequence has finished and then on, any 0 to n request amount change can trigger the emission of the `finalElement`. Since a downstream `request()` can race with an upstream `onComplete`, marking the bit atomically via a compare-and-set ensures correct state transition.

For this, the `OnCompleteEndWith` has to be changed by adding an `AtomicLong` for accounting requests, a long for counting the production, then updating `request()` and `onComplete()` methods:

```java

final class OnCompleteEndWith 
extends AtomicLong
implements FlowableSubscriber<T>, Subscription {
    final Subscriber<? super T> child;

    final T finalElement;

    Subscription s;

    long produced;

    static final class long REQUEST_MASK = Long.MAX_VALUE;  // 0b01111...111L
    static final class long COMPLETE_MASK = Long.MIN_VALUE; // 0b10000...000L

    public OnCompleteEndWith(Subscriber<? super T> child, T finalElement) {
        this.child = child;
        this.finalElement = finalElement;
    }

    @Override
    public void onSubscribe(Subscription s) { ... }

    @Override
    public void onNext(T t) {
        produced++;                // <------------------------
        child.onNext(t);
    }

    @Overide
    public void onError(Throwable t) { ... }

    @Override
    public void onComplete() {
        long p = produced;
        if (p != 0L) {
            produced = 0L;
            BackpressureHelper.produced(this, p);
        }

        for (;;) {
            long current = get();
            if ((current & COMPLETE_MASK) != 0) {
                break;
            }
            if ((current & REQUEST_MASK) != 0) {
                lazySet(Long.MIN_VALUE + 1);
                child.onNext(finalElement); 
                child.onComplete();
                return;
            }
            if (compareAndSet(current, COMPLETE_MASK)) {
                break;
            }
        }
    }

    @Override
    public void request(long n) {
        for (;;) {
            long current = get();
            if ((current & COMPLETE_MASK) != 0) {
                if (compareAndSet(current, COMPLETE_MASK + 1)) {
                    child.onNext(finalElement);
                    child.onComplete();
                }
                break;
            }
            long u = BackpressureHelper.addCap(current, n);
            if (compareAndSet(current, u)) {
                s.request(n);
                break;
            }
        }
    }

    @Override
    public void cancel() { ... }
}
```

RxJava 2 has a couple of operators, `materialize`, `mapNotification`, `onErrorReturn`, that require this type of behavior and for that, the (internal) `SinglePostCompleteSubscriber` class captures the algorithms above:

```java
final class OnCompleteEndWith<T> extends SinglePostCompleteSubscriber<T, T> {
    final Subscriber<? super T> child;

    public OnCompleteEndWith(Subscriber<? super T> child, T finalElement) {
        this.child = child;
        this.value = finalElement;
    }

    @Override
    public void onNext(T t) {
        produced++;                // <------------------------
        child.onNext(t);
    }
    
    @Overide
    public void onError(Throwable t) { ... }

    @Override
    public void onComplete() {
        complete(value);
    }
}
```

## Multi-element post-complete

Certain operators may need to emit multiple elements after the main sequence completes, which may or may not relay elements from the live upstream before its termination. An example operator is `buffer(int, int)` when the skip < size yielding overlapping buffers. In this operator, it is possible when the upstream completes, several overlapping buffers are waiting to be emitted to the child but that has to happen only when the child actually requested more buffers.

The state machine for this case is complicated but RxJava has two (internal) utility methods on `QueueDrainHelper` for dealing with the situation:

```java
<T> void postComplete(Subscriber<? super T> actual,
                      Queue<T> queue,
                      AtomicLong state,
                      BooleanSupplier isCancelled);

<T> boolean postCompleteRequest(long n,
                                Subscriber<? super T> actual,
                                Queue<T> queue,
                                AtomicLong state,
                                BooleanSupplier isCancelled);
```

They take the child `Subscriber`, the queue to drain from, the state holding the current request amount and a callback to see if the downstream cancelled the sequence.

Usage of these methods is as follows:

```java
final class EmitTwice<T> extends AtomicLong
implements FlowableSubscriber<T>, Subscription, BooleanSupplier {
    final Subscriber<? super T> child;

    final ArrayDeque<T> buffer;

    volatile boolean cancelled;

    Subscription s;

    long produced;

    public EmitTwice(Subscriber<? super T> child) {
        this.child = child;
        this.buffer = new ArrayDeque<>();
    }

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        child.onSubscribe(this);
    }

    @Override
    public void onNext(T t) {
        produced++;
        buffer.offer(t);
        child.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        buffer.clear();
        child.onError(t);
    }

    @Override
    public void onComplete() {
        long p = produced;
        if (p != 0L) {
            produced = 0L;
            BackpressureHelper.produced(this, p);
        }
        QueueDrainHelper.postComplete(child, buffer, this, this);
    }

    @Override
    public boolean getAsBoolean() {
        return cancelled;
    }

    @Override
    public void cancel() {
        cancelled = true;
        s.cancel();
    }

    @Override
    public void request(long n) {
        if (!QueueDrainHelper.postCompleteRequest(n, child, buffer, this, this)) {
            s.request(n);
        }
    }
}
```

# Creating operator classes

Creating operator implementations in 2.x is simpler than in 1.x and incurs less allocation as well. You have the choice to implement your operator as a `Subscriber`-transformer to be used via `lift` or as a fully-fledged base reactive class.

## Operator by extending a base reactive class

In 1.x, extending `Observable` was possible but convoluted because you had to implement the `OnSubscribe` interface separately and pass it to `Observable.create()` or to the `Observable(OnSubscribe)` protected constructor.

In 2.x, all base reactive classes are abstract and you can extend them directly without any additional indirection:

```java
public final class FlowableMyOperator extends Flowable<Integer> {
    final Publisher<Integer> source;
 
    public FlowableMyOperator(Publisher<Integer> source) {
        this.source = source;
    }
    
    @Override
    protected void subscribeActual(Subscriber<? super Integer> s) {
        source.map(v -> v + 1).subscribe(s);
    }
}
```

When taking other reactive types as inputs in these operators, it is recommended one defines the base reactive interfaces instead of the abstract classes, allowing better interoperability between libraries (especially with `Flowable` operators and other Reactive-Streams `Publisher`s). To recap, these are the class-interface pairs:

  - `Flowable` - `Publisher` - `FlowableSubscriber`/`Subscriber`
  - `Observable` - `ObservableSource` - `Observer`
  - `Single` - `SingleSource` - `SingleObserver`
  - `Completable` - `CompletableSource` - `CompletableObserver`
  - `Maybe` - `MaybeSource` - `MaybeObserver`

RxJava 2.x locks down `Flowable.subscribe` (and the same methods in the other types) in order to provide runtime hooks into the various flows, therefore, implementors are given the `subscribeActual()` to be overridden. When it is invoked, all relevant hooks and wrappers have been applied. Implementors should avoid throwing unchecked exceptions as the library generally can't deliver it to the respective `Subscriber` due to lifecycle restrictions of the Reactive-Streams specification and sends it to the global error consumer via `RxJavaPlugins.onError`.

Unlike in 1.x, In the example above, the incoming `Subscriber` is simply used directly for subscribing again (but still at most once) without any kind of wrapping. In 1.x, one needs to call `Subscribers.wrap` to avoid double calls to `onStart` and cause unexpected double initialization or double-requesting.

Unless one contributes a new operator to RxJava, working with such classes may become tedious, especially if they are intermediate operators:

```java
new FlowableThenSome(
    new FlowableOther(
        new FlowableMyOperator(Flowable.range(1, 10).map(v -> v * v))
    )
)
```

This is an unfortunate effect of Java lacking extension method support. A possible ease on this burden is by using `compose` to have fluent inline application of the custom operator:

```java
Flowable.range(1, 10).map(v -> v * v)
.compose(f -> new FlowableOperatorWithParameter(f, 10));

Flowable.range(1, 10).map(v -> v * v)
.compose(FlowableMyOperator::new);
```

## Operator targeting lift()

The alternative to the fluent application problem is to have a `Subscription`-transformer implemented instead of extending the whole reactive base class and use the respective type's `lift()` operator to get it into the sequence.

First one has to implement the respective `XOperator` interface:

```java
public final class MyOperator implements FlowableOperator<Integer, Integer> {

    @Override
    public Subscriber<? super Integer> apply(Subscriber<? super Integer> child) {
        return new Op(child);
    }

    static final class Op implements FlowableSubscriber<Integer>, Subscription {
        final Subscriber<? super Integer> child;

        Subscription s;

        public Op(Subscriber<? super Integer> child) {
            this.child = child;
        }

        @Override
        pubic void onSubscribe(Subscription s) {
            this.s = s;
            child.onSubscribe(this);
        }

        @Override
        public void onNext(Integer v) {
            child.onNext(v * v);
        }

        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }

        @Override
        public void onComplete() {
            child.onComplete();
        }

        @Override
        public void cancel() {
            s.cancel();
        }

        @Override
        public void request(long n) {
            s.request(n);
        }
    }
}
```

You may recognize that implementing operators via extension or lifting looks quite similar. In both cases, one usually implements a `FlowableSubscriber` (`Observer`, etc) that takes a downstream `Subscriber`, implements the business logic in the `onXXX` methods and somehow (manually or as part of `lift()`'s lifecycle) gets subscribed to an upstream source.

The benefit of applying the Reactive-Streams design to all base reactive types is that each consumer type is now an interface and can be applied to operators that have to extend some class. This was a pain in 1.x because `Subscriber` and `SingleSubscriber` are classes themselves, plus `Subscriber.request()` is a protected-final method and an operator's `Subscriber` can't implement the `Producer` interface at the same time. In 2.x there is no such problem and one can have both `Subscriber`, `Subscription` or even `Observer` together in the same consumer type.

# Operator fusion

Operator fusion has the premise that certain operators can be combined into one single operator (macro-fusion) or their internal data structures shared between each other (micro-fusion) that allows fewer allocations, lower overhead and better performance.

This advanced concept was invented, worked out and studied in the [Reactive-Streams-Commons](https://github.com/reactor/reactive-streams-commons) research project manned by the leads of RxJava and Project Reactor. Both libraries use the results in their implementation, which look the same but are incompatible due to different classes and packages involved. In addition, RxJava 2.x's approach is a more polished version of the invention due to delays between the two project's development.

Since operator-fusion is optional, you may chose to not bother making your operator fusion-enabled. The `DeferredScalarSubscription` is fusion-enabled and needs no additional development in this regard though.

If you chose to ignore operator-fusion, you still have to follow the requirement of never forwarding a `Subscription`/`Disposable` coming through `onSubscribe` of `Subscriber`/`Observer` as this may break the fusion protocol and may skip your operator's business logic entirely:

```java
final class SomeOp<T> implements Subscriber<T>, Subscription { 
   
    // ...
    Subscription s;

    public void onSubscribe(Subscription s) {
        this.s = s;
        child.onSubscribe(this);                // <---------------------------
    }

    @Override
    public void cancel() {
        s.cancel();
    }

    @Override
    public void request(long n) {
        s.request(n);
    }

    // ...
}
```

Yes, this adds one more indirection between operators but it is still cheap (and would be necessary for the operator anyway) but enables huge performance gains with the right chain of operators.

## Generations

Given this novel approach, a generation number can be assigned to various implementation styles of reactive architectures:

#### Generation 0
These are the classical libraries that either use `java.util.Observable` or are listener based (Java Swing's `ActionListener`). Their common property is that they don't support composition (of events and cancellation). See also **Google Agera**.

#### Generation 1
This is the level of the **Rx.NET** library (even up to 3.x) that supports composition, but has no notion for backpressure and doesn't properly support synchronous cancellation. Many JavaScript libraries such as **RxJS 5** are still on this level. See also **Google gRPC**.

#### Generation 2
This is what **RxJava 1.x** is categorized, it supports composition, backpressure and synchronous cancellation along with the ability to lift an operator into a sequence.

#### Generation 3
This is the level of the Reactive-Streams based libraries such as **Reactor 2** and **Akka-Stream**. They are based upon a specification that evolved out of RxJava but left behind its drawbacks (such as the need to return anything from `subscribe()`). This is incompatible with RxJava 1.x and thus 2.x had to be rewritten from scratch.

#### Generation 4
This level expands upon the Reactive-Streams interfaces with operator-fusion (in a compatible fashion, that is, op-fusion is optional between two stages and works without them). **Reactor 3** and **RxJava 2** are at this level. The material around **Akka-Stream** mentions operator-fusion as well, however, **Akka-Stream** is not a native Reactive-Streams implementation (requires a materializer to get a `Publisher` out) and as such it is only Gen 3.

There are discussions among the 4th generation library providers to have the elements of operator-fusion standardized in Reactive-Streams 2.0 specification (or in a neighboring extension) and have **RxJava 3** and **Reactor 4** work together on that aspect as well.

## Components

### Callable and ScalarCallable

Certain `Flowable` sources, similar to `Single` or `Completable` are known to ever emit zero or one item and that single item is known to be constant or is computed synchronously. Well known examples of this are `just()`, `empty()` and `fromCallable`. Subscribing to these sources, like any other sources, adds the same infrastructure overhead which can often be avoided if the consumer could just pick or have the item calculated on the spot. 

For example, `just` and `empty` appears as the mapping result of a `flatMap` operation:

```java
source.flatMap(v -> {
    if (v % 2 == 0) {
        return just(v);
    }
    return empty();
})
```

Here, if we'd somehow recognize that `empty()` won't emit a value but only `onComplete` we could simply avoid subscribing to it inside `flatMap`, saving on the overhead. Similarly, recognizing that `just` emits exactly one item we can route it differently inside `flatMap` and again, avoiding creating a lot of objects to get to the same single item.

In other times, knowing the emission property can simplify or chose a different operator instead of the applied one. For example, applying `flatMap` to an `empty()` source has no use since there won't be any item to be flattened into a sequence; the whole flattened sequence is going to be empty. Knowing that a source is `just` to `flatMap`, there is no need for the complicated inner mechanisms as there is going to be only one mapped inner source and one can subscribe the downstream's `Subscriber` to it directly.

```java
Flowable.just(1).flatMap(v -> Flowable.range(v, 5)).subscribe(...);

// in some specialized operator:

T value; // from just()

@Override
public void subscribeActual(Subscriber<? super T> s) {
    mapper.apply(value).subscribe(s);
}
```

There could be other sources with these properties, therefore, RxJava 2 uses the `io.reactivex.internal.fusion.ScalarCallable` and `java.util.Callable` interfaces to indicate a source is a constant or sequentially computable. When a source `Flowable` or `Observable` is marked with one of these interfaces, many fusion enabled operators will perform special actions to avoid the overhead of a normal and general source.

We use Java's own and preexisting `java.util.Callable` interface to indicate a synchronously computable source. The `ScalarCallable` is an extension to this interface by which it suppresses the `throws Exception` of `Callable.call()`:

```java
interface Callable<T> {
    T call() throws Exception;
}

interface ScalarCallable<T> extends Callable<T> {
    @Override
    T call();
}
```

The reason for the two separate interfaces is that if a source is constant, like `just`, one can perform assembly-time optimizations with it knowing that each regular `subscribe` invocation would have resulted in the same single value.

`Callable` denotes sources, such as `fromCallable` that indicates the single value has to be calculated at runtime of the flow. By this logic, you can see that `ScalarCallable` is a `Callable` on its own right because the constant can be "calculated" as late as the runtime phase of the flow.

Since Reactive-Streams forbids using `null`s as emission values, we can use `null` in `(Scalar)Callable` marked sources to indicate there is no value to be emitted, thus one can't mistake an user's `null` with the empty indicator `null`. For example, this is how `empty()` is implemented:

```java
final class FlowableEmpty extends Flowable<Object> implements ScalarCallable<Object> {
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        EmptySubscription.complete(s);
    }

    @Override
    public Object call() {
        return null; // interpreted as no value available
    }
}
```

Sources implementing `Callable` may throw checked exceptions from `call()` which is handled by the consumer operators as an indication to signal `onError` in an operator specific manner (such as delayed).

```java
final class FlowableIOException extends Flowable<Object> implements Callable<Object> {
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        EmptySubscription.error(new IOException(), s);
    }

    @Override
    public Object call() throws Exception {
        throw new IOException();
    }
}
```

However, implementors of `ScalarCallable` should avoid throwing any exception and limit the code in `call()` be constant or simple computation that can be legally executed during assembly time.

As the consumer of sources, one may want to deal with such kind of special `Flowable`s or `Observable`s. For example, if you create an operator that can leverage the knowledge of a single element source as its main input, you can check the types and extract the value of a `ScalarCallable` at assembly time right in the operator:

```java
// Flowable.java
public final Flowable<Integer> plusOne() {
    if (this instanceof ScalarCallable) {
        Integer value = ((ScalarCallable<Integer>)this).call();
        if (value == null) {
            return empty();
        }
        return just(value + 1);
    }
    return cast(Integer.class).map(v -> v + 1);
}
```

or as a `FlowableTransformer`:

```java
FlowableTransformer<Integer, Integer> plusOneTransformer = source -> {
    if (source instanceof ScalarCallable) {
        Integer value = ((ScalarCallable<Integer>)source).call();
        if (value == null) {
            return empty();
        }
        return just(value + 1);
    }
    return source.map(v -> v + 1);
};
```

However, it is not mandatory to handle `ScalarCallable`s and `Callable`s separately. Since the former extends the latter, the type check can be deferred till subscription time and handled with the same code path:

```java
final class FlowablePlusOne extends Flowable<Integer> {
    final Publisher<Integer> source;

    FlowablePlusOne(Publisher<Integer> source) {
        this.source = source;
    }

    @Override
    public void subscribeActual(Subscriber<? super Integer> s) {
        if (source instanceof Callable) {
            Integer value;

            try {
                value = ((Callable<Integer>)source).call();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                EmptySubscription.error(ex, s);
                return;
            }

            s.onSubscribe(new ScalarSubscription<Integer>(s, value + 1));
        } else {
            new FlowableMap<>(source, v -> v + 1).subscribe(s);
        }
    }
}
```

### ConditionalSubscriber

TBD

### QueueSubscription and QueueDisposable

TBD

# Example implementations

TBD

## `map` + `filter` hybrid

TBD

## Ordered `merge`

TBD