1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
|
# -*- coding: utf-8 -*-
"""
This module contains a MOArchiving3obj class for storing a set of non-dominated points in 3
objective space and efficiently calculating hypervolume with respect to the given reference point.
"""
from moarchiving.moarchiving import BiobjectiveNondominatedSortedList as MOArchive2obj
from moarchiving.moarchiving_utils import (DLNode, ArchiveSortedList, compute_area_simple, remove_from_z,
restart_list_y, lexicographic_less, one_contribution_3_obj,
weakly_dominates, strictly_dominates)
from moarchiving.moarchiving_parent import MOArchiveParent
import math
import warnings as _warnings
try:
from sortedcontainers import SortedList
except ImportError:
pass
try:
import fractions
except ImportError:
_warnings.warn('`fractions` module not installed, arbitrary precision hypervolume computation not available')
inf = float('inf')
class MOArchive3obj(MOArchiveParent):
""" Class for storing a set of non-dominated points in 3 objective space and efficiently calculating
hypervolume with respect to the given reference point.
The archive is implemented as a doubly linked list, and can be modified using functions
add and remove. Points of the archive can be accessed as a list of points order by the third
coordinate using function points_list.
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive([[1, 2, 3], [3, 2, 1]])
>>> list(moa) # returns the list of points in the archive sorted by the third coordinate
[[3, 2, 1], [1, 2, 3]]
>>> moa.add([2, 2, 2]) # add a new point to the archive
True
>>> moa.add([3, 3, 3])
False
>>> moa = get_mo_archive([[1, 2, 3], [2, 3, 4], [3, 2, 1]],
... reference_point=[4, 4, 4], infos=["A", "B", "C"])
>>> moa.infos # returns the list of infos for each point in the archive
['C', 'A']
>>> moa.hypervolume
Fraction(10, 1)
>>> get_mo_archive.hypervolume_final_float_type = float
>>> get_mo_archive.hypervolume_computation_float_type = float
>>> moa2 = get_mo_archive([[1, 2, 3], [2, 3, 4], [3, 2, 1]], reference_point=[4, 4, 4])
>>> moa2.hypervolume
10.0
"""
try:
hypervolume_final_float_type = fractions.Fraction
hypervolume_computation_float_type = fractions.Fraction
except:
hypervolume_final_float_type = float
hypervolume_computation_float_type = float
def __init__(self, list_of_f_vals=None, reference_point=None, infos=None,
hypervolume_final_float_type=None,
hypervolume_computation_float_type=None):
""" Create a new 3 objective archive object.
f-vals beyond the `reference_point` are pruned away. The `reference_point` is also used
to compute the hypervolume.
infos are an optional list of additional information about the points in the archive.
"""
hypervolume_final_float_type = MOArchive3obj.hypervolume_final_float_type \
if hypervolume_final_float_type is None else hypervolume_final_float_type
hypervolume_computation_float_type = MOArchive3obj.hypervolume_computation_float_type \
if hypervolume_computation_float_type is None else hypervolume_computation_float_type
super().__init__(list_of_f_vals=list_of_f_vals,
reference_point=reference_point,
infos=infos,
n_obj=3,
hypervolume_final_float_type=hypervolume_final_float_type,
hypervolume_computation_float_type=hypervolume_computation_float_type)
self._removed = []
self.preprocessing()
hv = self._set_HV()
self._length = len(list(self))
if hv is not None and hv > 0:
self._hypervolume_plus = self._hypervolume
else:
if list_of_f_vals is None or len(list_of_f_vals) == 0:
self._hypervolume_plus = -inf
else:
self._hypervolume_plus = -min([self.distance_to_hypervolume_area(f)
for f in list_of_f_vals])
def add(self, f_vals, info=None, update_hypervolume=True):
""" Adds a new point to the archive, and updates the hypervolume if needed.
Returns True if the point was added, False if it was dominated
by another point in the archive
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive(reference_point=[4, 4, 4])
>>> moa.add([2, 3, 4])
False
>>> moa.add([1, 2, 3])
True
>>> list(moa)
[[1, 2, 3]]
>>> moa.add([3, 2, 1])
True
>>> list(moa)
[[3, 2, 1], [1, 2, 3]]
>>> moa.add([2, 2, 2])
True
>>> list(moa)
[[3, 2, 1], [2, 2, 2], [1, 2, 3]]
"""
if len(f_vals) != self.n_obj:
raise ValueError(f"argument `f_vals` must be of length {self.n_obj}, was ``{f_vals}``")
if self.reference_point is not None and self.hypervolume_plus is not None:
dist_to_hv_area = self.distance_to_hypervolume_area(f_vals)
if -dist_to_hv_area > self._hypervolume_plus:
self._hypervolume_plus = -dist_to_hv_area
# q is the current point (so that we are consistent with the paper),
# stop is the head of the list, and first_iter is a flag to check if we are at the
# first iteration (since the first and last points are the same)
q = self.head
stop = self.head
first_iter = True
# Add 0.0 for 3d points so that it matches the original C code and create a new node object
if self.n_obj == 3:
f_vals = f_vals + [0.0]
u = DLNode(x=f_vals, info=info)
di = self.n_obj - 1
# loop over all the points in the archive and save the best candidates for cx and cy,
# and check if the new point is dominated by any of the points in the archive
dominated = False
best_cx_candidates = None
best_cy_candidates = None
inserted = False
removed = []
while q != stop or first_iter:
first_iter = False
# check if the new point is dominated by the current point
if all(q.x[i] <= u.x[i] for i in range(self.n_obj)):
dominated = True
break
# check if the new point dominates the current point
if all(u.x[i] <= q.x[i] for i in range(self.n_obj)):
q_next = q.next[di]
remove_from_z(q, archive_dim=self.n_obj)
removed.append(q.x[:3])
q = q_next
continue
"""
1) Set u.cx to the point q ∈ Q with the smallest q_x > u_x
such that q_y < u_y and q <L u. If such a point is not
unique, the alternative with the smallest q_y is preferred
"""
if lexicographic_less(q.x, u.x) and q.x[0] > u.x[0] and q.x[1] < u.x[1]:
if best_cx_candidates is None or q.x[0] < best_cx_candidates.x[0]:
best_cx_candidates = q
elif q.x[0] == best_cx_candidates.x[0] and q.x[1] < best_cx_candidates.x[1]:
best_cx_candidates = q
if lexicographic_less(q.x, u.x) and q.x[0] < u.x[0] and q.x[1] > u.x[1]:
if best_cy_candidates is None or q.x[1] < best_cy_candidates.x[1]:
best_cy_candidates = q
elif q.x[1] == best_cy_candidates.x[1] and q.x[0] < best_cy_candidates.x[0]:
best_cy_candidates = q
"""
2) For q ∈ Q, set q.cx to u iff u_y < q_y and u <L q, and
either q_x < u_x < (q.cx)_x or u_x = (q.cx)_x and u_y ≤
(q.cx)_y.
"""
if u.x[1] < q.x[1] and lexicographic_less(u.x, q.x):
if (q.x[0] < u.x[0] < q.closest[0].x[0] or
(u.x[0] == q.closest[0].x[0] and u.x[1] <= q.closest[0].x[1])):
q.closest[0] = u
if u.x[0] < q.x[0] and lexicographic_less(u.x, q.x):
if (q.x[1] < u.x[1] < q.closest[1].x[1] or
(u.x[1] == q.closest[1].x[1] and u.x[0] <= q.closest[1].x[0])):
q.closest[1] = u
"""
3) Insert u into Q immediately before the point q ∈ Q with
the lexicographically smallest q such that u <L q.
"""
# If the point is not dominated by any other point in the archive so far,
# then it also won't be dominated by the points that come after it in the archive,
# as the points are sorted in lexicographic order
if lexicographic_less(u.x, q.x) and not inserted and not dominated:
u.next[di] = q
u.prev[di] = q.prev[di]
q.prev[di].next[di] = u
q.prev[di] = u
inserted = True
q = q.next[di]
if not dominated:
u.closest[0] = best_cx_candidates
u.closest[1] = best_cy_candidates
self._length += 1 - len(removed)
self._removed = removed
self._kink_points = None
if update_hypervolume and not dominated:
self._set_HV()
return not dominated
def remove(self, f_vals):
""" Removes a point from the archive, and updates the hypervolume.
If the point is not found, it raises a ValueError.
Returns the info of the removed point.
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive([[1, 2, 3], [2, 2, 2], [3, 2, 1]], reference_point=[4, 4, 4],
... infos=["A", "B", "C"])
>>> moa.remove([2, 2, 2])
'B'
>>> list(moa)
[[3, 2, 1], [1, 2, 3]]
>>> moa.remove([1, 2, 3])
'A'
>>> list(moa)
[[3, 2, 1]]
"""
di = self.n_obj - 1 # Dimension index for sorting (z-axis in 3D)
current = self.head.next[di]
stop = self.head.prev[di]
# Using SortedList to manage nodes by their y-coordinate, supporting custom sorting needs
T = SortedList(key=lambda node: (node.x[1], node.x[0]))
# Include sentinel nodes to manage edge conditions
T.add(self.head) # self.head is a left sentinel
T.add(self.head.prev[di]) # right sentinel
remove_node = None
while current != stop:
if current.x[:3] == f_vals:
remove_node = current
current = current.next[di]
continue
T.add(current)
# Remove nodes dominated by the current node
nodes_to_remove = [node for node in T if node != current and
strictly_dominates(current.x, node.x, n_obj=2)]
for node in nodes_to_remove:
T.remove(node)
if current.closest[0].x[:3] == f_vals:
# For every p ∈ Q \ {u} such that p.cx = u, set p.cx to the
# point q ∈ Q \ {u} with the smallest q_x > p_x such that
# q_y < p_y and q <L p
current.closest[1] = current.closest[1]
cx_candidates = [node for node in T if node.x[0] > current.x[0] and node.x[1] < current.x[1]]
if cx_candidates:
current.closest[0] = min(cx_candidates, key=lambda node: node.x[0])
else:
current.closest[0] = self.head
if current.closest[1].x[:3] == f_vals:
# For every p ∈ Q \ {u} such that p.cy = u, set p.cy to the
# point q ∈ Q \ {u} with the smallest q_y > p_y such that
# q_x < p_x and q <L p
current.closest[1] = current.closest[1]
cy_candidates = [node for node in T if node.x[1] > current.x[1] and node.x[0] < current.x[0]]
if cy_candidates:
current.closest[1] = min(cy_candidates, key=lambda node: node.x[1])
else:
current.closest[1] = self.head.prev[di]
current = current.next[di]
if remove_node is not None:
remove_from_z(remove_node, archive_dim=self.n_obj)
self._kink_points = None
self._set_HV()
self._length -= 1
return remove_node.info
else:
raise ValueError(f"Point {f_vals} not found in the archive")
def add_list(self, list_of_f_vals, infos=None, add_method="compare"):
""" Adds a list of points to the archive, and updates the hypervolume.
points are added with the `add_method` method:
- compare: compares the number of points to add with the number of points in the archive
and uses the most efficient method based on that
- one_by_one: adds the points one by one to the archive
- reinit: reinitializes the archive with the new points
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive(reference_point=[4, 4, 4])
>>> moa.add_list([[2, 3, 3], [1, 2, 3]], infos=["A", "B"])
>>> list(moa), moa.infos
([[1, 2, 3]], ['B'])
>>> moa.add_list([[3, 2, 1], [2, 2, 2], [3, 3, 3]], infos=["C", "D", "E"])
>>> list(moa), moa.infos
([[3, 2, 1], [2, 2, 2], [1, 2, 3]], ['C', 'D', 'B'])
>>> moa.add_list([[1, 1, 1]])
>>> list(moa), moa.infos
([[1, 1, 1]], [None])
"""
s = len(list_of_f_vals)
if add_method == "compare":
n = len(self)
add_method = "one_by_one" if s == 1 or (n > 0 and s < math.log2(n) / 2) else "reinit"
if infos is None:
infos = [None] * s
if add_method == "one_by_one":
for f_val, info in zip(list_of_f_vals, infos):
self.add(f_val, info=info, update_hypervolume=False)
self._set_HV()
elif add_method == "reinit":
self.__init__(list(self) + list_of_f_vals, self.reference_point, self.infos + infos)
else:
raise ValueError(f"Unknown add method: {add_method}, "
f"should be one of: 'compare', 'one_by_one', 'reinit'")
def copy(self):
""" Returns a copy of the archive
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive([[1, 2, 3], [2, 2, 2], [3, 2, 1]], reference_point=[4, 4, 4],
... infos=["A", "B", "C"])
>>> moa2 = moa.copy()
>>> list(moa2), moa2.infos
([[3, 2, 1], [2, 2, 2], [1, 2, 3]], ['C', 'B', 'A'])
>>> moa.remove([2, 2, 2])
'B'
>>> moa2.add([1.5, 1.5, 1.5], "D")
True
>>> list(moa2), moa2.infos
([[3, 2, 1], [1.5, 1.5, 1.5], [1, 2, 3]], ['C', 'D', 'A'])
>>> list(moa), moa.infos
([[3, 2, 1], [1, 2, 3]], ['C', 'A'])
"""
return MOArchive3obj(list(self), self.reference_point, self.infos)
def _get_kink_points(self):
""" Function that returns the kink points of the archive.
Kink point are calculated by making a sweep of the archive, where the state is one
2 objective archive of all possible kink points found so far, and another 2 objective
archive which stores the non-dominated points so far in the sweep
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive([[1, 2, 3], [2, 2, 2], [3, 2, 1]], reference_point=[4, 4, 4])
>>> moa._get_kink_points()
[[4, 4, 1], [3, 4, 2], [2, 4, 3], [1, 4, 4], [4, 2, 4]]
"""
if self.reference_point is None:
ref_point = [inf] * self.n_obj
else:
ref_point = self.reference_point
# initialize the two states, one for points and another for kink points
points_state = MOArchive2obj([[ref_point[0], -inf], [-inf, ref_point[1]]])
kink_candidates = MOArchive2obj([ref_point[:2]])
# initialize the point dictionary, which will store the third coordinate of the points
point_dict = {
tuple(ref_point[:2]): -inf
}
kink_points = []
for point in self:
# add the point to the kink state to get the dominated kink points, then take it out
if kink_candidates.add(point[:2]) is not None:
removed = kink_candidates._removed.copy()
for removed_point in removed:
z = point_dict[tuple(removed_point)]
if z < point[2] and point[0] < removed_point[0] and point[1] < removed_point[1]:
kink_points.append([removed_point[0], removed_point[1], point[2]])
kink_candidates._removed.clear()
kink_candidates.remove(point[:2])
# add the point to the point state, and get two new kink point candidates
idx = points_state.add(point[:2])
for i in range(2):
p = [points_state[idx + i][0], points_state[idx - 1 + i][1]]
point_dict[tuple(p)] = point[2]
kink_candidates.add(p)
# add all the remaining kink points to the list
for point in kink_candidates:
kink_points.append([point[0], point[1], ref_point[2]])
return kink_points
def hypervolume_improvement(self, f_vals):
""" Returns the hypervolume improvement of adding a point to the archive
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive([[1, 2, 3], [3, 2, 1]], reference_point=[4, 4, 4])
>>> moa.hypervolume_improvement([2, 2, 2])
2.0
>>> moa.hypervolume_improvement([3, 3, 4])
-1.0
"""
if f_vals in self:
return 0
if self.dominates(f_vals):
return -1 * self.distance_to_pareto_front(f_vals)
return one_contribution_3_obj(self.head, DLNode(x=f_vals),
self.hypervolume_computation_float_type)
def compute_hypervolume(self, reference_point=None):
""" Compute the hypervolume of the current state of archive
>>> from moarchiving.get_archive import get_mo_archive
>>> moa = get_mo_archive([[1, 2, 3], [3, 2, 1]], reference_point=[4, 4, 4])
>>> moa.compute_hypervolume()
10.0
"""
if reference_point is not None:
_warnings.warn("Reference point given at the initialization is used")
Fc = self.hypervolume_computation_float_type
p = self.head
area = Fc(0)
volume = Fc(0)
restart_list_y(self.head)
p = p.next[2].next[2]
stop = self.head.prev[2]
while p != stop:
if p.ndomr < 1:
p.cnext[0] = p.closest[0]
p.cnext[1] = p.closest[1]
area += compute_area_simple(p.x, 1, p.cnext[0], p.cnext[0].cnext[1], Fc)
p.cnext[0].cnext[1] = p
p.cnext[1].cnext[0] = p
else:
remove_from_z(p, archive_dim=self.n_obj)
volume += area * (Fc(p.next[2].x[2]) - Fc(p.x[2]))
p = p.next[2]
return self.hypervolume_final_float_type(volume)
def preprocessing(self):
""" Preprocessing step to determine the closest points in x and y directions,
as described in the paper and implemented in the original C code. """
di = self.n_obj - 1
t = ArchiveSortedList(iterable=[self.head, self.head.next[di]],
key=lambda node: (node.x[1], node.x[0]))
p = self.head.next[di].next[di]
stop = self.head.prev[di]
while p != stop:
s = t.outer_delimiter_x(p)
if weakly_dominates(s.x, p.x, self.n_obj) or weakly_dominates(t.next_y(s).x, p.x, self.n_obj):
p.ndomr = 1
p = p.next[di]
continue
t.remove_dominated_y(p, s)
p.closest[0] = s
p.closest[1] = t.next_y(s)
t.add_y(p, s)
p = p.next[di]
t.clear()
if __name__ == "__main__":
import doctest
print('doctest.testmod() in moarchiving3obj.py')
print(doctest.testmod())
|