File: t_FieldToPointFunctionalChaosAlgorithm_std.py

package info (click to toggle)
openturns 1.26-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 67,708 kB
  • sloc: cpp: 261,605; python: 67,030; ansic: 4,378; javascript: 406; sh: 185; xml: 164; makefile: 101
file content (133 lines) | stat: -rwxr-xr-x 3,847 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python

import openturns as ot
import openturns.testing as ott
import os

ot.TESTPREAMBLE()
# ot.Log.Show(ot.Log.INFO)

# Time grid parameters
T = 3.0
NT = 32
tg = ot.RegularGrid(0.0, T / NT, NT)

# Toy function to link input processes to the output process
in_dim = 4
out_dim = 1
spatial_dim = 1


class pyf2p(ot.OpenTURNSPythonFieldToPointFunction):
    def __init__(self, mesh):
        super(pyf2p, self).__init__(mesh, in_dim, out_dim)
        self.setInputDescription(["x1", "x2", "x3", "x4"])
        self.setOutputDescription(["g"])

    def _exec(self, X):
        Xs = ot.Sample(X)
        x1, x2, x3, x4 = Xs.computeMean()
        y = x1 + x2 + x3 - x4 + x1 * x2 - x3 * x4 - 0.1 * x1 * x2 * x3
        return [y]


f = ot.FieldToPointFunction(pyf2p(tg))

# First process: elementary process based on a bivariate random vector
f1 = ot.SymbolicFunction(["t"], ["sin(t)"])
f2 = ot.SymbolicFunction(["t"], ["cos(t)^2"])
myBasis = ot.Basis([f1, f2])
coefDis = ot.Normal([1.0] * 2, [0.6] * 2, ot.CorrelationMatrix(2))
p1 = ot.FunctionalBasisProcess(coefDis, myBasis, tg)

# Second process: smooth Gaussian process
p2 = ot.GaussianProcess(ot.SquaredExponential([1.0], [T / 4.0]), tg)

# Third process: elementary process based on a bivariate random vector
randomParameters = ot.JointDistribution([ot.Uniform(), ot.Normal()])
p3 = ot.FunctionalBasisProcess(
    randomParameters,
    ot.Basis(
        [ot.SymbolicFunction(["t"], ["1", "0"]), ot.SymbolicFunction(["t"], ["0", "1"])]
    ),
)

X = ot.AggregatedProcess([p1, p2, p3])
X.setMesh(tg)

N = 1000
x = X.getSample(N)
y = f(x)


# run algo
algo = ot.FieldToPointFunctionalChaosAlgorithm(x, y)
algo.setThreshold(4e-2)
algo.setRecompress(True)
ot.ResourceMap.SetAsUnsignedInteger("FunctionalChaosAlgorithm-BasisSize", N)
ot.ResourceMap.SetAsBool("FunctionalChaosAlgorithm-Sparse", True)
algo.run()
result = algo.getResult()

# check metamodel
metamodel = result.getFieldToPointMetaModel()
assert metamodel.getInputDimension() == x.getDimension(), "wrong in dim"
assert metamodel.getOutputDimension() == y.getDimension(), "wrong out dim"

# test single evaluation
xm = x.computeMean()
print("f(xm)=", f(xm))
fhat_xm = metamodel(xm)
print("f^(xm)=", fhat_xm)
ott.assert_almost_equal(fhat_xm, [1.09018], 1e-3, 1e-3)

# test MSE
resultFCE = result.getFCEResult()
xFCE = resultFCE.getInputSample()
yFCE = resultFCE.getOutputSample()
validation = ot.MetaModelValidation(yFCE, resultFCE.getMetaModel()(xFCE))
mse = validation.computeMeanSquaredError()
print("MSE", mse)
assert mse.norm() < 1e-2, "MSE too large"

# check modes retained
kl_results = result.getInputKLResultCollection()
n_modes = [len(res.getEigenvalues()) for res in kl_results]
print(f"n_modes={n_modes}")
assert sum(n_modes) == 6, "wrong modes"

# rerun with block indices
blockIndices = [[0], [1], [2, 3]]
algo.setBlockIndices(blockIndices)
algo.run()
result = algo.getResult()

# test MSE
resultFCE = result.getFCEResult()
xFCE = resultFCE.getInputSample()
yFCE = resultFCE.getOutputSample()
validation = ot.MetaModelValidation(yFCE, resultFCE.getMetaModel()(xFCE))
mse = validation.computeMeanSquaredError()
print("MSE", mse)
assert mse.norm() < 1e-2, "MSE too large"

# check modes retained
kl_results = result.getInputKLResultCollection()
n_modes = [len(res.getEigenvalues()) for res in kl_results]
print(f"n_modes={n_modes}")
assert sum(n_modes) == 6, "wrong modes"

# serialization
if ot.PlatformInfo.HasFeature("libxml2"):
    study = ot.Study()
    fname = "study_p2f_result.xml"
    study.setStorageManager(ot.XMLStorageManager(fname))
    study.add("algo", algo)
    study.save()
    study = ot.Study()
    study.setStorageManager(ot.XMLStorageManager(fname))
    study.load()
    algo = ot.FieldToPointFunctionalChaosAlgorithm()
    study.fillObject("algo", algo)
    print(algo)
    os.remove(fname)