1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
test(() => {
const results = [];
const indices = [];
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
const mapped = source.map((value, i) => {
indices.push(i);
return value * 2;
});
assert_true(mapped instanceof Observable, "map() returns an Observable");
assert_array_equals(results, [], "Does not map until subscribed (values)");
assert_array_equals(indices, [], "Does not map until subscribed (indices)");
mapped.subscribe({
next: (value) => results.push(value),
error: () => results.push('error'),
complete: () => results.push('complete'),
});
assert_array_equals(results, [2, 4, 6, 'complete']);
assert_array_equals(indices, [0, 1, 2]);
}, "map(): Maps values correctly");
test(() => {
const error = new Error("error");
const results = [];
let teardownCalled = false;
const source = new Observable((subscriber) => {
subscriber.addTeardown(() => teardownCalled = true);
subscriber.next(1);
assert_false(teardownCalled,
"Teardown not called until until map unsubscribes due to error");
subscriber.next(2);
assert_true(teardownCalled, "Teardown called once map unsubscribes due to error");
assert_false(subscriber.active, "Unsubscription makes Subscriber inactive");
subscriber.next(3);
subscriber.complete();
});
const mapped = source.map((value) => {
if (value === 2) {
throw error;
}
return value * 2;
});
mapped.subscribe({
next: (value) => results.push(value),
error: (error) => results.push(error),
complete: () => results.push("complete"),
});
assert_array_equals(results, [2, error],
"Mapper errors are emitted to Observer error() handler");
}, "map(): Mapper errors are emitted to Observer error() handler");
test(() => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
subscriber.next(2);
});
let mapperCalls = 0;
const results = [];
source.map(v => {
mapperCalls++;
return v * 2;
}).subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push('complete'),
});
assert_equals(mapperCalls, 1, "Mapper is not called after complete()");
assert_array_equals(results, [2, "complete"]);
}, "map(): Passes complete() through from source Observable");
test(() => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.error('error');
subscriber.next(2);
});
let mapperCalls = 0;
const results = [];
source.map(v => {
mapperCalls++;
return v * 2;
}).subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push('complete'),
});
assert_equals(mapperCalls, 1, "Mapper is not called after error()");
assert_array_equals(results, [2, "error"]);
}, "map(): Passes error() through from source Observable");
// This is mostly ensuring that the ordering in
// https://wicg.github.io/observable/#dom-subscriber-complete is consistent.
//
// That is, the `Subscriber#complete()` method *first* closes itself and signals
// abort on its own `Subscriber#signal()` and *then* calls whatever supplied
// completion algorithm exists. In the case of `map()`, the "supplied completion
// algorithm" is simply a set of internal observer steps that call
// `Subscriber#complete()` on the *outer* mapper's Observer. This means the
// outer Observer is notified of completion *after* the source Subscriber's
// signal is aborted / torn down.
test(() => {
const results = [];
const source = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('source teardown'));
subscriber.signal.addEventListener('abort',
() => results.push('source abort event'));
subscriber.complete();
});
source.map(() => results.push('mapper called')).subscribe({
complete: () => results.push('map observable complete'),
});
assert_array_equals(results,
['source abort event', 'source teardown', 'map observable complete']);
}, "map(): Upon source completion, source Observable teardown sequence " +
"happens before downstream mapper complete() is called");
test(() => {
const results = [];
let sourceSubscriber = null;
const source = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('source teardown'));
sourceSubscriber = subscriber;
subscriber.next(1);
});
const controller = new AbortController();
source.map(v => v * 2).subscribe({
next: v => {
results.push(v);
// Triggers unsubscription to `source`.
controller.abort();
// Does nothing, since `source` is already torn down.
sourceSubscriber.next(100);
},
complete: () => results.push('mapper complete'),
error: e => results.push('mapper error'),
}, {signal: controller.signal});
assert_array_equals(results, [2, 'source teardown']);
}, "map(): Map observable unsubscription causes source Observable " +
"unsubscription. Mapper Observer's complete()/error() are not called");
|