1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
#!/usr/bin/env python
import openturns as ot
import openturns.experimental as otexp
import openturns.testing as ott
import os
ot.TESTPREAMBLE()
# ot.Log.Show(ot.Log.INFO)
# Time grid parameters
T = 3.0
NT = 32
tg = ot.RegularGrid(0.0, T / NT, NT)
# Toy function to link input processes to the output process
in_dim = 4
out_dim = 1
spatial_dim = 1
class pyf2p(ot.OpenTURNSPythonFieldToPointFunction):
def __init__(self, mesh):
super(pyf2p, self).__init__(mesh, in_dim, out_dim)
self.setInputDescription(["x1", "x2", "x3", "x4"])
self.setOutputDescription(["g"])
def _exec(self, X):
Xs = ot.Sample(X)
x1, x2, x3, x4 = Xs.computeMean()
y = x1 + x2 + x3 - x4 + x1 * x2 - x3 * x4 - 0.1 * x1 * x2 * x3
return [y]
f = ot.FieldToPointFunction(pyf2p(tg))
# First process: elementary process based on a bivariate random vector
f1 = ot.SymbolicFunction(["t"], ["sin(t)"])
f2 = ot.SymbolicFunction(["t"], ["cos(t)^2"])
myBasis = ot.Basis([f1, f2])
coefDis = ot.Normal([1.0] * 2, [0.6] * 2, ot.CorrelationMatrix(2))
p1 = ot.FunctionalBasisProcess(coefDis, myBasis, tg)
# Second process: smooth Gaussian process
p2 = ot.GaussianProcess(ot.SquaredExponential([1.0], [T / 4.0]), tg)
# Third process: elementary process based on a bivariate random vector
randomParameters = ot.JointDistribution([ot.Uniform(), ot.Normal()])
p3 = ot.FunctionalBasisProcess(
randomParameters,
ot.Basis(
[ot.SymbolicFunction(["t"], ["1", "0"]), ot.SymbolicFunction(["t"], ["0", "1"])]
),
)
X = ot.AggregatedProcess([p1, p2, p3])
X.setMesh(tg)
N = 1000
x = X.getSample(N)
y = f(x)
# run algo
algo = otexp.FieldToPointFunctionalChaosAlgorithm(x, y)
algo.setThreshold(4e-2)
algo.setRecompress(True)
ot.ResourceMap.SetAsUnsignedInteger("FunctionalChaosAlgorithm-BasisSize", N)
ot.ResourceMap.SetAsBool("FunctionalChaosAlgorithm-Sparse", True)
algo.run()
result = algo.getResult()
# check metamodel
metamodel = result.getFieldToPointMetaModel()
assert metamodel.getInputDimension() == x.getDimension(), "wrong in dim"
assert metamodel.getOutputDimension() == y.getDimension(), "wrong out dim"
# test single evaluation
xm = x.computeMean()
print("f(xm)=", f(xm))
fhat_xm = metamodel(xm)
print("f^(xm)=", fhat_xm)
ott.assert_almost_equal(fhat_xm, [1.09018], 1e-3, 1e-3)
# test residual
residuals = result.getFCEResult().getResiduals()
print("residuals", residuals)
assert residuals.norm() < 5e-3, "residual too large"
# check modes retained
kl_results = result.getInputKLResultCollection()
n_modes = [len(res.getEigenvalues()) for res in kl_results]
print(f"n_modes={n_modes}")
assert sum(n_modes) == 6, "wrong modes"
# rerun with block indices
blockIndices = [[0], [1], [2, 3]]
algo.setBlockIndices(blockIndices)
algo.run()
result = algo.getResult()
# test residual
residuals = result.getFCEResult().getResiduals()
print("residuals", residuals)
assert residuals.norm() < 5e-3, "residual too large"
# check modes retained
kl_results = result.getInputKLResultCollection()
n_modes = [len(res.getEigenvalues()) for res in kl_results]
print(f"n_modes={n_modes}")
assert sum(n_modes) == 6, "wrong modes"
# serialization
if ot.PlatformInfo.HasFeature("libxml2"):
study = ot.Study()
fname = "study_p2f_result.xml"
study.setStorageManager(ot.XMLStorageManager(fname))
study.add("algo", algo)
study.save()
study = ot.Study()
study.setStorageManager(ot.XMLStorageManager(fname))
study.load()
algo = otexp.FieldToPointFunctionalChaosAlgorithm()
study.fillObject("algo", algo)
print(algo)
os.remove(fname)
|