1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
import math
from typing import Any
from reactivex import Observable
def determine_median(sorted_list):
if len(sorted_list) == 0:
raise Exception("The input sequence was empty")
if len(sorted_list) % 2 == 1:
return sorted_list[int((len(sorted_list) + 1) / 2) - 1]
else:
median_1 = sorted_list[int((len(sorted_list) + 1) / 2) - 1]
median_2 = sorted_list[int((len(sorted_list) + 1) / 2)]
return float(median_1 + median_2) / 2.0
def median(source: Observable) -> Observable:
"""
Calculates the statistical median on numerical emissions. The sequence must be finite.
"""
return source.to_sorted_list().map(lambda l: determine_median(l))
def mode(source: Observable[Any]) -> Observable[Any]:
"""
Returns the most frequently emitted value (or "values" if they have the same number of occurrences).
The sequence must be finite.
"""
return (
source.group_by(lambda v: v)
.flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct)))
.to_sorted_list(lambda t: t[1], reverse=True)
.flat_map(lambda l: Observable.from_(l).take_while(lambda t: t[1] == l[0][1]))
.map(lambda t: t[0])
)
def variance(source: Observable) -> Observable:
"""
Returns the statistical variance of the numerical emissions.
The sequence must be finite.
"""
squared_values = (
source.to_list()
.flat_map(
lambda l: Observable.from_(l)
.average()
.flat_map(lambda avg: Observable.from_(l).map(lambda i: i - avg))
)
.map(lambda i: i * i)
.publish()
.auto_connect(2)
)
return Observable.zip(
squared_values.sum(), squared_values.count(), lambda sum, ct: sum / (ct - 1)
)
def standard_deviation(source: Observable) -> Observable:
"""
Returns the standard deviation of the numerical emissions:
The sequence must be finite.
"""
return source.variance().map(lambda i: math.sqrt(i))
|