1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
#!/usr/bin/env python
import openturns as ot
import openturns.testing as ott
import os
ot.TESTPREAMBLE()
# ot.Log.Show(ot.Log.INFO)
# Time grid parameters
T = 3.0
NT = 32
tg = ot.RegularGrid(0.0, T / NT, NT)
# Toy function to link input processes to the output process
in_dim = 4
out_dim = 1
spatial_dim = 1
class pyf2p(ot.OpenTURNSPythonFieldToPointFunction):
def __init__(self, mesh):
super(pyf2p, self).__init__(mesh, in_dim, out_dim)
self.setInputDescription(["x1", "x2", "x3", "x4"])
self.setOutputDescription(["g"])
def _exec(self, X):
Xs = ot.Sample(X)
x1, x2, x3, x4 = Xs.computeMean()
y = x1 + x2 + x3 - x4 + x1 * x2 - x3 * x4 - 0.1 * x1 * x2 * x3
return [y]
f = ot.FieldToPointFunction(pyf2p(tg))
# First process: elementary process based on a bivariate random vector
f1 = ot.SymbolicFunction(["t"], ["sin(t)"])
f2 = ot.SymbolicFunction(["t"], ["cos(t)^2"])
myBasis = ot.Basis([f1, f2])
coefDis = ot.Normal([1.0] * 2, [0.6] * 2, ot.CorrelationMatrix(2))
p1 = ot.FunctionalBasisProcess(coefDis, myBasis, tg)
# Second process: smooth Gaussian process
p2 = ot.GaussianProcess(ot.SquaredExponential([1.0], [T / 4.0]), tg)
# Third process: elementary process based on a bivariate random vector
randomParameters = ot.JointDistribution([ot.Uniform(), ot.Normal()])
p3 = ot.FunctionalBasisProcess(
randomParameters,
ot.Basis(
[ot.SymbolicFunction(["t"], ["1", "0"]), ot.SymbolicFunction(["t"], ["0", "1"])]
),
)
X = ot.AggregatedProcess([p1, p2, p3])
X.setMesh(tg)
N = 1000
x = X.getSample(N)
y = f(x)
# run algo
algo = ot.FieldToPointFunctionalChaosAlgorithm(x, y)
algo.setThreshold(4e-2)
algo.setRecompress(True)
ot.ResourceMap.SetAsUnsignedInteger("FunctionalChaosAlgorithm-BasisSize", N)
ot.ResourceMap.SetAsBool("FunctionalChaosAlgorithm-Sparse", True)
algo.run()
result = algo.getResult()
# check metamodel
metamodel = result.getFieldToPointMetaModel()
assert metamodel.getInputDimension() == x.getDimension(), "wrong in dim"
assert metamodel.getOutputDimension() == y.getDimension(), "wrong out dim"
# test single evaluation
xm = x.computeMean()
print("f(xm)=", f(xm))
fhat_xm = metamodel(xm)
print("f^(xm)=", fhat_xm)
ott.assert_almost_equal(fhat_xm, [1.09018], 1e-3, 1e-3)
# test MSE
resultFCE = result.getFCEResult()
xFCE = resultFCE.getInputSample()
yFCE = resultFCE.getOutputSample()
validation = ot.MetaModelValidation(yFCE, resultFCE.getMetaModel()(xFCE))
mse = validation.computeMeanSquaredError()
print("MSE", mse)
assert mse.norm() < 1e-2, "MSE too large"
# check modes retained
kl_results = result.getInputKLResultCollection()
n_modes = [len(res.getEigenvalues()) for res in kl_results]
print(f"n_modes={n_modes}")
assert sum(n_modes) == 6, "wrong modes"
# rerun with block indices
blockIndices = [[0], [1], [2, 3]]
algo.setBlockIndices(blockIndices)
algo.run()
result = algo.getResult()
# test MSE
resultFCE = result.getFCEResult()
xFCE = resultFCE.getInputSample()
yFCE = resultFCE.getOutputSample()
validation = ot.MetaModelValidation(yFCE, resultFCE.getMetaModel()(xFCE))
mse = validation.computeMeanSquaredError()
print("MSE", mse)
assert mse.norm() < 1e-2, "MSE too large"
# check modes retained
kl_results = result.getInputKLResultCollection()
n_modes = [len(res.getEigenvalues()) for res in kl_results]
print(f"n_modes={n_modes}")
assert sum(n_modes) == 6, "wrong modes"
# serialization
if ot.PlatformInfo.HasFeature("libxml2"):
study = ot.Study()
fname = "study_p2f_result.xml"
study.setStorageManager(ot.XMLStorageManager(fname))
study.add("algo", algo)
study.save()
study = ot.Study()
study.setStorageManager(ot.XMLStorageManager(fname))
study.load()
algo = ot.FieldToPointFunctionalChaosAlgorithm()
study.fillObject("algo", algo)
print(algo)
os.remove(fname)
|