File: stats_kstest2.py

package info (click to toggle)
pyferret 7.6.5-10
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 138,136 kB
  • sloc: fortran: 240,609; ansic: 25,235; python: 24,026; sh: 1,618; makefile: 1,123; pascal: 569; csh: 307; awk: 18
file content (128 lines) | stat: -rw-r--r-- 5,005 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Performs a two-sided Kolmogorov-Smirnov test that two samples
come from the same continuous probability distribution.
"""

from __future__ import print_function

import numpy
import pyferret
import scipy.stats


def ferret_init(id):
    """
    Initialization for the stats_kstest2 PyEF
    """
    axes_values = [ pyferret.AXIS_DOES_NOT_EXIST ] * pyferret.MAX_FERRET_NDIM
    axes_values[0] = pyferret.AXIS_CUSTOM
    false_influences = [ False ] * pyferret.MAX_FERRET_NDIM
    retdict = { "numargs": 2,
                "descript": "Returns two-sided Kolmogorov-Smirnov test stat. and prob. " \
                            "that two samples comes from the same prob. distrib.",
                "axes": axes_values,
                "argnames": ( "SAMPLEA", "SAMPLEB", ),
                "argdescripts": ( "First sample data array",
                                  "Second sample data array", ),
                "argtypes": ( pyferret.FLOAT_ARRAY, pyferret.FLOAT_ARRAY, ),
                "influences": ( false_influences, false_influences, ),
              }
    return retdict


def ferret_custom_axes(id):
    """
    Define custom axis of the stats_kstest2 Ferret PyEF
    """
    axis_defs = [ None ] * pyferret.MAX_FERRET_NDIM
    axis_defs[0] = ( 1, 2, 1, "KS,P", False )
    return axis_defs


def ferret_compute(id, result, resbdf, inputs, inpbdfs):
    """
    Performs a two-sided Kolmogorov-Smirnov test that two samples come
    from the same continuous probability distribution.  The samples are
    given in inputs[0] and inputs[1].  The test statistic value and
    two-tailed probability are returned in result.  Undefined data given
    in each sample are removed (independently from each other) before
    performing the test.  Note that the samples do not need to be the
    same size; thus there are no restrictions on the relative dimensions
    of sample arrays.
    """
    badmask = ( numpy.fabs(inputs[0] - inpbdfs[0]) < 1.0E-5 )
    badmask = numpy.logical_or(badmask, numpy.isnan(inputs[0]))
    goodmask = numpy.logical_not(badmask)
    sampa = inputs[0][goodmask]
    badmask = ( numpy.fabs(inputs[1] - inpbdfs[1]) < 1.0E-5 )
    badmask = numpy.logical_or(badmask, numpy.isnan(inputs[1]))
    goodmask = numpy.logical_not(badmask)
    sampb = inputs[1][goodmask]
    fitparams = scipy.stats.ks_2samp(sampa, sampb)
    result[:] = resbdf
    # Kolmogorov-Smirnov test statistic
    result[0] = fitparams[0]
    # probability
    result[1] = fitparams[1]


#
# The rest of this is just for testing this module at the command line
#
if __name__ == "__main__":
    # make sure ferret_init and ferret_custom_axes do not have problems
    info = ferret_init(0)
    info = ferret_custom_axes(0)

    # Get two random samples from the same distribution
    ydim = 250
    zdim = 150
    distf = scipy.stats.weibull_min(2.0, 5.0)
    sampa = distf.rvs(ydim * zdim)
    sampb = distf.rvs(ydim * zdim)

    # Get a distribution from a different distribution
    sampu = scipy.stats.uniform(loc=7.0, scale=5.0).rvs(ydim * zdim)

    # setup for the call to ferret_compute
    inpbdfs = numpy.array([-9999.0, -8888.0], dtype=numpy.float64)
    resbdf = numpy.array([-7777.0], dtype=numpy.float64)
    arraya = numpy.empty((1, ydim, zdim, 1, 1, 1), dtype=numpy.float64, order='F')
    arrayb = numpy.empty((ydim, 1, 1, zdim, 1, 1), dtype=numpy.float64, order='F')
    arrayu = numpy.empty((ydim, 1, 1, zdim, 1, 1), dtype=numpy.float64, order='F')
    index = 0
    for j in range(ydim):
        for k in range(zdim):
            if (index % 23) == 3:
                arraya[0, j, k, 0, 0, 0] = inpbdfs[0]
            else:
                arraya[0, j, k, 0, 0, 0] = sampa[index]
            if (index % 53) == 3:
                arrayb[j, 0, 0, k, 0, 0] = inpbdfs[1]
                arrayu[j, 0, 0, k, 0, 0] = inpbdfs[1]
            else:
                arrayb[j, 0, 0, k, 0, 0] = sampb[index]
                arrayu[j, 0, 0, k, 0, 0] = sampu[index]
            index += 1
    resultb = -6666.0 * numpy.ones((2, 1, 1, 1, 1, 1), dtype=numpy.float64, order='F')
    resultu = -6666.0 * numpy.ones((2, 1, 1, 1, 1, 1), dtype=numpy.float64, order='F')

    # call ferret_compute with the samples from the same distribution and check the results
    ferret_compute(0, resultb, resbdf, (arraya, arrayb), inpbdfs)
    resultb = resultb.reshape(-1)
    print("from same dist result: %s" % str(resultb))
    if (resultb[0] < 0.00) or (resultb[0] > 0.01) or \
       (resultb[1] < 0.10) or (resultb[1] > 1.00):
        raise ValueError("Unexpected result")

    # call ferret_compute with data from different distributions and check the results
    ferret_compute(0, resultu, resbdf, (sampa, sampu), inpbdfs)
    resultu = resultu.reshape(-1)
    print("from diff dist result: %s" % str(resultu))
    if (resultu[0] < 0.98) or (resultu[0] > 1.00) or \
       (resultu[1] < 0.00) or (resultu[1] > 0.01):
        raise ValueError("Unexpected result")

    # All successful
    print("Success")