1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
"""
Estimate a multivariate distribution
====================================
"""
# %%
# In this example we are going to estimate a joint distribution from a multivariate sample by fitting marginals and finding a set of copulas.
#
# While the estimation of marginals is quite straightforward, the estimation of the dependency structure takes several steps:
#
# - find the dependent components
# - estimate a copula on each dependent bloc
# - assemble the estimated copulas
#
# %%
import openturns as ot
import math as m
# %%
# Generate some multivariate data to estimate, with correlation
cop1 = ot.AliMikhailHaqCopula(0.6)
cop2 = ot.ClaytonCopula(2.5)
copula = ot.BlockIndependentCopula([cop1, cop2])
marginals = [
ot.Uniform(5.0, 6.0),
ot.Arcsine(),
ot.Normal(-40.0, 3.0),
ot.Triangular(100.0, 150.0, 300.0),
]
distribution = ot.JointDistribution(marginals, copula)
sample = distribution.getSample(10000).getMarginal([0, 2, 3, 1])
# %%
# Estimate marginals
dimension = sample.getDimension()
marginalFactories = []
for factory in ot.DistributionFactory.GetContinuousUniVariateFactories():
if str(factory).startswith("Histogram"):
# ~ non-parametric
continue
marginalFactories.append(factory)
estimated_marginals = [
ot.FittingTest.BestModelBIC(sample.getMarginal(i), marginalFactories)[0]
for i in range(dimension)
]
estimated_marginals
# %%
# Find connected components of a graph defined from its adjacency matrix
# %%
def find_neighbours(head, covariance, to_visit, visited):
visited[head] = 1
to_visit.remove(head)
current_component = [head]
for i in to_visit:
# If i is connected to head and has not yet been visited
if covariance[head, i] > 0:
# Add i to the current component
component = find_neighbours(i, covariance, to_visit, visited)
current_component += component
return current_component
def connected_components(covariance):
N = covariance.getDimension()
to_visit = list(range(N))
visited = [0] * N
all_components = []
for head in range(N):
if visited[head] == 0:
component = find_neighbours(head, covariance, to_visit, visited)
all_components.append(sorted(component))
return all_components
# %%
# Estimate the copula
# -------------------
#
# First find the dependent components : we compute the `Spearman` correlation
# %%
C = sample.computeSpearmanCorrelation()
print(C)
# %%
# We filter and consider only significantly non-zero correlations.
# %%
epsilon = 2.0 / m.sqrt(sample.getSize())
for j in range(dimension):
for i in range(j):
C[i, j] = 1.0 if abs(C[i, j]) > epsilon else 0.0
print(C)
# %%
# Note that we can apply the `HypothesisTest.Spearman` test. As the null hypothesis of the test is the `independence`, we must take the complementary of the binary measure as follow:
#
# >>> M = ot.SymmetricMatrix(dimension)
# >>> for i in range(dimension):
# >>> M[i,i] = 1
# >>> for j in range(i):
# >>> M[i, j] = 1 - ot.HypothesisTest.Spearman(sample[:,i], sample[:,j]).getBinaryQualityMeasure()
#
# %%
# Now we find the independent blocs:
# %%
blocs = connected_components(C)
blocs
# %%
# For each dependent block, we estimate the most accurate parameteric copula.
#
# To do this, we first need to transform the sample in such a way as to keep the copula intact but make all marginal samples follow the uniform distribution on [0,1].
# %%
copula_sample = ot.Sample(sample.getSize(), sample.getDimension())
copula_sample.setDescription(sample.getDescription())
for index in range(sample.getDimension()):
copula_sample[:, index] = estimated_marginals[index].computeCDF(sample[:, index])
# %%
copulaFactories = []
for factory in ot.DistributionFactory.GetContinuousMultiVariateFactories():
if not factory.build().isCopula():
continue
if factory.getImplementation().getClassName() == "BernsteinCopulaFactory":
continue
copulaFactories.append(factory)
estimated_copulas = [
ot.FittingTest.BestModelBIC(copula_sample.getMarginal(bloc), copulaFactories)[0]
for bloc in blocs
]
estimated_copulas
# %%
# Finally we assemble the copula
# %%
estimated_copula_perm = ot.BlockIndependentCopula(estimated_copulas)
# %%
# Take care of the order of each bloc vs the order of original components !
# %%
permutation = []
for bloc in blocs:
permutation.extend(bloc)
inverse_permutation = [-1] * dimension
for i in range(dimension):
inverse_permutation[permutation[i]] = i
estimated_copula = estimated_copula_perm.getMarginal(inverse_permutation)
estimated_copula
# %%
# We build joint distribution from marginal distributions and dependency structure:
# %%
estimated_distribution = ot.JointDistribution(estimated_marginals, estimated_copula)
estimated_distribution
|