1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
|
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @uqfoundation)
# Copyright (c) 2018-2024 The Uncertainty Quantification Foundation.
# License: 3-clause BSD. The full license text is available at:
# - https://github.com/uqfoundation/mystic/blob/master/LICENSE
"""
calculate graphical distance of (interpolated) function from a dataset
converters for klepto.archives.dir_archive and mystic.math.legacydata.dataset
read (params,cost) from LoggingMonitor archive or klepto.archive.dir_archive
"""
def _getitem(data, axis):
"""get the selected axis of the tuple-valued dataset
Inputs:
data: a mystic.math.legacydata.dataset of i points, M inputs, N outputs
axis: int, the desired index the tuple-valued dataset [0,N]
"""
if len(data.values) and type(data.values[0]) not in (tuple,list):
msg = "cannot get axis %s for single-valued dataset.values" % axis
raise ValueError(msg)
# assumes axis is an int; select values corresponding to axis
from mystic.math.legacydata import dataset, datapoint as datapt
ds = dataset()
for pt in data:
ds.append(datapt(pt.position, pt.value[axis], pt.id, pt.cone.slopes))
return ds
# somewhat hacky include of tuple-valued dataset
def interpolate(data, step=None, **kwds):
"""generate interpolated function y=f(x) from data (x,y)
Inputs:
data: mystic.math.legacydata.dataset of i points, M inputs, N outputs
step: int, stepsize for interpolation (default: do not skip any points)
Additional Inputs:
method: string for kind of interpolator
maxpts: int, maximum number of points (x,z) to use from the monitor
noise: float, amplitude of gaussian noise to remove duplicate x
extrap: if True, extrapolate a bounding box (can reduce # of nans)
arrays: if True, return a numpy array; otherwise don't return arrays
axis: int in [0,N], index of z on which to interpolate (all, by default)
NOTE:
if scipy is not installed, will use np.interp for 1D (non-rbf),
or mystic's rbf otherwise. default method is 'nearest' for
1D and 'linear' otherwise. method can be one of ('rbf','linear',
'nearest','cubic','inverse','gaussian','quintic','thin_plate').
NOTE:
additional keyword arguments (epsilon, smooth, norm) are avaiable
for use with a Rbf interpolator. See mystic.math.interpolate.Rbf
for more details. if initialization produces a singlular matrix,
try non-zero smooth.
"""
# interpolate for each member of tuple-valued data, unless axis provided
axis = kwds.pop('axis', None) # axis for tuple-valued data
if axis is None:
if len(data.values) and type(data.values[0]) in (tuple,list):
# iterate over each axis, build a 'combined' interpf
def objective(x, axis=None):
fs = objective.__axis__
if axis is None:
return tuple(fi(x) for fi in fs)
return fs[axis](x)
objective.__axis__ = [interpolate(data, axis=ax, **kwds) for ax,val in enumerate(data.values[0])]
return objective
# else: data is single-valued
else: # axis is not None
data = _getitem(data, axis)
#XXX: what if dataset is empty? (i.e. len(data.values) == 0)
from interpolator import interpolate as interp
ii = interp(data.coords[::step], z=data.values[::step], **kwds)
from mystic.math.interpolate import _to_objective
function = _to_objective(ii.function)
function.__axis__ = axis #XXX: bad idea: list of funcs, or int/None ?
return function
def distance(data, function=None, hausdorff=True, **kwds):
"""get graphical distance between function y=f(x) and a dataset
Inputs:
data: a mystic.math.legacydata.dataset of i points, M inputs, N outputs
function: a function y=f(*x) of data (x,y)
hausdorff: if True, use Hausdorff norm
Additional Inputs:
method: string for kind of interpolator
maxpts: int, maximum number of points (x,z) to use from the monitor
noise: float, amplitude of gaussian noise to remove duplicate x
extrap: if True, extrapolate a bounding box (can reduce # of nans)
arrays: if True, return a numpy array; otherwise don't return arrays
axis: int in [0,N], index of z on which to interpolate (all, by default)
NOTE:
if scipy is not installed, will use np.interp for 1D (non-rbf),
or mystic's rbf otherwise. default method is 'nearest' for
1D and 'linear' otherwise. method can be one of ('rbf','linear',
'nearest','cubic','inverse','gaussian','quintic','thin_plate').
NOTE:
data and function may provide tuple-valued or single-valued output.
Distance will be measured component-wise, resulting in a tuple of
distances, unless an 'axis' is selected. If an axis is selected,
then return distance for the selected component (i.e. axis) only.
"""
""" #FIXME: the following is generally true (doesn't account for 'fax')
# function is multi-value & has its components
# data is multi-value
# axis is None => apply function component-wise to data
# axis is int => apply function component-wise to single axis
# data is single-value
# axis is None => ERROR: unknown which function component to apply
# axis is int => apply selected function component to data
# function is single-value (int or None)
# data is multi-value
# axis is None => ERROR: unknown which axis to apply function
# axis is int => apply function to selected axis of data
# data is single-value
# axis is None => apply function to data
# axis is int => ERROR: can't take axis of single-valued data
# function is None
# data is multi-value
# axis is None => [fmv] => apply function component-wise to data
# axis is int => [fsv] => apply function to selected axis of data
# data is single-value
# axis is None => [fsv] => apply function to data
# axis is int => ERROR: can't take axis of single-valued data
"""
axis = kwds.get('axis', None) # axis for tuple-valued data and/or function
if function is None:
function = interpolate(data, **kwds)
import mystic.math.distance as md #FIXME: simplify axis vs fax
from mystic.math.interpolate import _to_objective
from numbers import Integral
fax = getattr(function, '__axis__', None) # axis for tuple-valued function
if axis is None:
if len(data.values) and type(data.values[0]) in (tuple,list):
if type(fax) is list: # multi-value func, multi-value data
import numpy as np
return np.array([md.graphical_distance(_to_objective(j), _getitem(data, i), hausdorff=hausdorff) for i,j in enumerate(fax)])
elif isinstance(fax, Integral):# single-value func, multi-value data
return md.graphical_distance(_to_objective(function), _getitem(data, fax), hausdorff=hausdorff)
else: # single-value func, multi-value data
msg = "axis required for multi-valued dataset.values"
raise ValueError(msg)
else:
if type(fax) is list: # multi-value func, singe-value data
msg = "axis required for multi-valued function"
raise ValueError(msg)
else: # single-value func, singe-value data
return md.graphical_distance(_to_objective(function), data, hausdorff=hausdorff)
else:
if len(data.values) and type(data.values[0]) in (tuple,list):
if type(fax) is list: # multi-value func, multi-value data
return md.graphical_distance(_to_objective(fax[axis]), _getitem(data, axis), hausdorff=hausdorff)
elif isinstance(fax, Integral) and fax != axis: # single-value func, multi-value data
msg = "inconsistent axis for multi-valued dataset.values"
raise ValueError(msg)
else: # single-value func, multi-value data
return md.graphical_distance(_to_objective(function), _getitem(data, axis), hausdorff=hausdorff)
else:
if type(fax) is list: # multi-value func, singe-value data
return md.graphical_distance(_to_objective(fax[axis]), data, hausdorff=hausdorff)
elif isinstance(fax, Integral):
if fax == axis: # single-value func, singe-value data
return md.graphical_distance(_to_objective(function), data, hausdorff=hausdorff)
msg = "inconsistent axis for multi-valued function"
raise ValueError(msg)
else: # single-value func, singe-value data
_getitem(data, axis) # raise ValueError
return NotImplemented # should never get here
# reader for logfile(s)
def read_logfile(filename):
"""read 'parameters' and 'cost' from LoggingMonitor archive
Inputs:
filename: str path to location of klepto.archives.dir_archive
"""
from mystic.munge import read_trajectories
return read_trajectories(filename, iter=False)
# reader for archive(s)
def read_archive(filename, axis=None): #NOTE: could return iterators
"""read 'parameters' and 'cost' from klepto.dir_archive
Inputs:
filename: str path to location of klepto.archives.dir_archive
axis: int, the desired index the tuple-valued dataset [0,N]
"""
from klepto.archives import dir_archive
arch = dir_archive(filename, cached=True)
return for_monitor(arch, axis=axis)
def for_monitor(archive, inverted=False, axis=None):
"""convert klepto.dir_archive to param,cost for a monitor
Inputs:
archive: a klepto.archive instance
inverted: if True, invert the z-axis of the Monitor (i.e. z => -z)
axis: int, the desired index the tuple-valued dataset [0,N]
"""
arxiv = getattr(archive, '__archive__', archive)
# param = list(arxiv.keys())
# param = list(list(k[-1]) for k in arxiv.keys())
param = list(list(k) for k in arxiv.keys())
if inverted:
if axis is None:
cost = list(tuple(-i for i in k) for k in arxiv.values())
else:
cost = list(-k[axis] for k in arxiv.values())
else:
cost = arxiv.values()
if axis is None:
cost = cost if type(cost) is list else list(cost)
else:
cost = list(k[axis] for k in arxiv.values())
return (param, cost)
def from_archive(cache, data=None, **kwds):
'''convert a klepto.archive to a mystic.math.legacydata.dataset
Inputs:
cache: a klepto.archive instance
data: a mystic.math.legacydata.dataset of i points, M inputs, N outputs
axis: int, the desired index the tuple-valued dataset [0,N]
ids: a list of ids for the data, or a function ids(x,y) to generate ids
'''
axis = kwds.get('axis', None)
ids = kwds.get('ids', _iargsort) #XXX: better None?
import numpy as np #FIXME: accept lipshitz coeffs as input
if data is None:
from mystic.math.legacydata import dataset
data = dataset()
# import klepto as kl
# ca = kl.archives.dir_archive('__objective_5D_cache__', cached=False)
# k = keymap=kl.keymaps.keymap()
# c = kl.inf_cache(keymap=k, tol=1, ignore=('**','out'), cache=ca)
# memo = c(lambda *args, **kwds: kwds['out'])
# cache = memo.__cache__()
y = np.array(list(cache.items()), dtype=object).T #NOTE: cache.items() slow
if not len(y): return data
if callable(ids): #XXX: don't repeat tolist
ids = ids(y[0].tolist(), y[1].tolist(), axis=axis)
if axis is None: #XXX: dataset should be single valued
return data.load(y[0].tolist(), y[1].tolist(), ids=ids)
return data.load(y[0].tolist(), [i[axis] for i in y[1]], ids=ids)
as_dataset = from_archive #NOTE: just an alias
def argsort(x, axis=None):
"""generate a list of indices i that sort x, so x[i] is in increasing order
Inputs:
x: an array of shape (npts, dim) or (npts,)
axis: int in [0,dim], the primary index of x to sort
NOTE:
if axis=None and dim > 1, the 0th index will be used as the primary
"""
if not len(x):
return []
import numpy as np
from mystic import isiterable
if axis is None:
if isiterable(x[0]): # multi-value data, defaulting to axis=0
return np.argsort(x, axis=0).T[0].tolist() #XXX: better to error?
else: # single-value data
return np.argsort(x).tolist()
if isiterable(x[0]): # multi-value data
return np.argsort(np.array(x)[:,axis]).tolist()
msg = "cannot get axis %s for single-valued data" % axis
raise ValueError(msg)
def iargsort(x, axis=None):
"""generate a list of indices that, when sorted, sort x in increasing order
Inputs:
x: an array of shape (npts, dim) or (npts,)
axis: int in [0,dim], the primary index of x to sort
NOTE:
if axis=None and dim > 1, the 0th index will be used as the primary
"""
import numpy as np
i = argsort(x, axis=axis)
ii = np.empty_like(i)
ii[i] = np.arange(len(i))
return ii.tolist()
def counting(x):
"""generate a list of counting indices of len(x)'
Inputs:
x: an array of shape (npts, dim) or (npts,)
"""
return list(range(len(x)))
# interface f(x,y,axis) for use with ids (in from_archive)
_argsort = lambda x,y,axis=None: argsort(x,axis=axis)
_argsort.__doc__ = argsort.__doc__
_iargsort = lambda x,y,axis=None: iargsort(x,axis=axis)
_iargsort.__doc__ = iargsort.__doc__
_counting = lambda x,y,axis=None: counting(x)
_counting.__doc__ = counting.__doc__
|