1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
|
"""Structure optimization. """
import time
import warnings
from collections.abc import Callable
from math import sqrt
from os.path import isfile
from typing import IO, Any, Dict, List, Optional, Tuple, Union
from ase import Atoms
from ase.calculators.calculator import PropertyNotImplementedError
from ase.filters import UnitCellFilter
from ase.parallel import world
from ase.utils import IOContext, lazyproperty
from ase.utils.abc import Optimizable
DEFAULT_MAX_STEPS = 100_000_000
class RestartError(RuntimeError):
pass
class OptimizableAtoms(Optimizable):
def __init__(self, atoms):
self.atoms = atoms
def get_positions(self):
return self.atoms.get_positions()
def set_positions(self, positions):
self.atoms.set_positions(positions)
def get_forces(self):
return self.atoms.get_forces()
@lazyproperty
def _use_force_consistent_energy(self):
# This boolean is in principle invalidated if the
# calculator changes. This can lead to weird things
# in multi-step optimizations.
try:
self.atoms.get_potential_energy(force_consistent=True)
except PropertyNotImplementedError:
# warnings.warn(
# 'Could not get force consistent energy (\'free_energy\'). '
# 'Please make sure calculator provides \'free_energy\', even '
# 'if equal to the ordinary energy. '
# 'This will raise an error in future versions of ASE.',
# FutureWarning)
return False
else:
return True
def get_potential_energy(self):
force_consistent = self._use_force_consistent_energy
return self.atoms.get_potential_energy(
force_consistent=force_consistent)
def iterimages(self):
# XXX document purpose of iterimages
return self.atoms.iterimages()
def __len__(self):
# TODO: return 3 * len(self.atoms), because we want the length
# of this to be the number of DOFs
return len(self.atoms)
class Dynamics(IOContext):
"""Base-class for all MD and structure optimization classes."""
def __init__(
self,
atoms: Atoms,
logfile: Optional[Union[IO, str]] = None,
trajectory: Optional[str] = None,
append_trajectory: bool = False,
master: Optional[bool] = None,
comm=world,
*,
loginterval: int = 1,
):
"""Dynamics object.
Parameters
----------
atoms : Atoms object
The Atoms object to operate on.
logfile : file object or str
If *logfile* is a string, a file with that name will be opened.
Use '-' for stdout.
trajectory : Trajectory object or str
Attach trajectory object. If *trajectory* is a string a
Trajectory will be constructed. Use *None* for no
trajectory.
append_trajectory : bool
Defaults to False, which causes the trajectory file to be
overwriten each time the dynamics is restarted from scratch.
If True, the new structures are appended to the trajectory
file instead.
master : bool
Defaults to None, which causes only rank 0 to save files. If set to
true, this rank will save files.
comm : Communicator object
Communicator to handle parallel file reading and writing.
loginterval : int, default: 1
Only write a log line for every *loginterval* time steps.
"""
self.atoms = atoms
self.optimizable = atoms.__ase_optimizable__()
self.logfile = self.openfile(file=logfile, comm=comm, mode='a')
self.observers: List[Tuple[Callable, int, Tuple, Dict[str, Any]]] = []
self.nsteps = 0
self.max_steps = 0 # to be updated in run or irun
self.comm = comm
if trajectory is not None:
if isinstance(trajectory, str):
from ase.io.trajectory import Trajectory
mode = "a" if append_trajectory else "w"
trajectory = self.closelater(Trajectory(
trajectory, mode=mode, master=master, comm=comm
))
self.attach(
trajectory,
interval=loginterval,
atoms=self.optimizable,
)
self.trajectory = trajectory
def todict(self) -> Dict[str, Any]:
raise NotImplementedError
def get_number_of_steps(self):
return self.nsteps
def insert_observer(
self, function, position=0, interval=1, *args, **kwargs
):
"""Insert an observer.
This can be used for pre-processing before logging and dumping.
Examples
--------
>>> from ase.build import bulk
>>> from ase.calculators.emt import EMT
>>> from ase.optimize import BFGS
...
...
>>> def update_info(atoms, opt):
... atoms.info["nsteps"] = opt.nsteps
...
...
>>> atoms = bulk("Cu", cubic=True) * 2
>>> atoms.rattle()
>>> atoms.calc = EMT()
>>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt:
... opt.insert_observer(update_info, atoms=atoms, opt=opt)
... opt.run(fmax=0.05, steps=10)
True
"""
if not isinstance(function, Callable):
function = function.write
self.observers.insert(position, (function, interval, args, kwargs))
def attach(self, function, interval=1, *args, **kwargs):
"""Attach callback function.
If *interval > 0*, at every *interval* steps, call *function* with
arguments *args* and keyword arguments *kwargs*.
If *interval <= 0*, after step *interval*, call *function* with
arguments *args* and keyword arguments *kwargs*. This is
currently zero indexed."""
if hasattr(function, "set_description"):
d = self.todict()
d.update(interval=interval)
function.set_description(d)
if not isinstance(function, Callable):
function = function.write
self.observers.append((function, interval, args, kwargs))
def call_observers(self):
for function, interval, args, kwargs in self.observers:
call = False
# Call every interval iterations
if interval > 0:
if (self.nsteps % interval) == 0:
call = True
# Call only on iteration interval
elif interval <= 0:
if self.nsteps == abs(interval):
call = True
if call:
function(*args, **kwargs)
def irun(self, steps=DEFAULT_MAX_STEPS):
"""Run dynamics algorithm as generator.
Parameters
----------
steps : int, default=DEFAULT_MAX_STEPS
Number of dynamics steps to be run.
Yields
------
converged : bool
True if the forces on atoms are converged.
Examples
--------
This method allows, e.g., to run two optimizers or MD thermostats at
the same time.
>>> opt1 = BFGS(atoms)
>>> opt2 = BFGS(StrainFilter(atoms)).irun()
>>> for _ in opt2:
... opt1.run()
"""
# update the maximum number of steps
self.max_steps = self.nsteps + steps
# compute the initial step
self.optimizable.get_forces()
# log the initial step
if self.nsteps == 0:
self.log()
# we write a trajectory file if it is None
if self.trajectory is None:
self.call_observers()
# We do not write on restart w/ an existing trajectory file
# present. This duplicates the same entry twice
elif len(self.trajectory) == 0:
self.call_observers()
# check convergence
is_converged = self.converged()
yield is_converged
# run the algorithm until converged or max_steps reached
while not is_converged and self.nsteps < self.max_steps:
# compute the next step
self.step()
self.nsteps += 1
# log the step
self.log()
self.call_observers()
# check convergence
is_converged = self.converged()
yield is_converged
def run(self, steps=DEFAULT_MAX_STEPS):
"""Run dynamics algorithm.
This method will return when the forces on all individual
atoms are less than *fmax* or when the number of steps exceeds
*steps*.
Parameters
----------
steps : int, default=DEFAULT_MAX_STEPS
Number of dynamics steps to be run.
Returns
-------
converged : bool
True if the forces on atoms are converged.
"""
for converged in Dynamics.irun(self, steps=steps):
pass
return converged
def converged(self):
"""" a dummy function as placeholder for a real criterion, e.g. in
Optimizer """
return False
def log(self, *args):
""" a dummy function as placeholder for a real logger, e.g. in
Optimizer """
return True
def step(self):
"""this needs to be implemented by subclasses"""
raise RuntimeError("step not implemented.")
class Optimizer(Dynamics):
"""Base-class for all structure optimization classes."""
# default maxstep for all optimizers
defaults = {'maxstep': 0.2}
_deprecated = object()
def __init__(
self,
atoms: Atoms,
restart: Optional[str] = None,
logfile: Optional[Union[IO, str]] = None,
trajectory: Optional[str] = None,
append_trajectory: bool = False,
**kwargs,
):
"""
Parameters
----------
atoms: :class:`~ase.Atoms`
The Atoms object to relax.
restart: str
Filename for restart file. Default value is *None*.
logfile: file object or str
If *logfile* is a string, a file with that name will be opened.
Use '-' for stdout.
trajectory: Trajectory object or str
Attach trajectory object. If *trajectory* is a string a
Trajectory will be constructed. Use *None* for no
trajectory.
append_trajectory: bool
Appended to the trajectory file instead of overwriting it.
kwargs : dict, optional
Extra arguments passed to :class:`~ase.optimize.optimize.Dynamics`.
"""
super().__init__(
atoms=atoms,
logfile=logfile,
trajectory=trajectory,
append_trajectory=append_trajectory,
**kwargs,
)
self.restart = restart
self.fmax = None
if restart is None or not isfile(restart):
self.initialize()
else:
self.read()
self.comm.barrier()
def read(self):
raise NotImplementedError
def todict(self):
description = {
"type": "optimization",
"optimizer": self.__class__.__name__,
}
# add custom attributes from subclasses
for attr in ('maxstep', 'alpha', 'max_steps', 'restart',
'fmax'):
if hasattr(self, attr):
description.update({attr: getattr(self, attr)})
return description
def initialize(self):
pass
def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
"""Run optimizer as generator.
Parameters
----------
fmax : float
Convergence criterion of the forces on atoms.
steps : int, default=DEFAULT_MAX_STEPS
Number of optimizer steps to be run.
Yields
------
converged : bool
True if the forces on atoms are converged.
"""
self.fmax = fmax
return Dynamics.irun(self, steps=steps)
def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
"""Run optimizer.
Parameters
----------
fmax : float
Convergence criterion of the forces on atoms.
steps : int, default=DEFAULT_MAX_STEPS
Number of optimizer steps to be run.
Returns
-------
converged : bool
True if the forces on atoms are converged.
"""
self.fmax = fmax
return Dynamics.run(self, steps=steps)
def converged(self, forces=None):
"""Did the optimization converge?"""
if forces is None:
forces = self.optimizable.get_forces()
return self.optimizable.converged(forces, self.fmax)
def log(self, forces=None):
if forces is None:
forces = self.optimizable.get_forces()
fmax = sqrt((forces ** 2).sum(axis=1).max())
e = self.optimizable.get_potential_energy()
T = time.localtime()
if self.logfile is not None:
name = self.__class__.__name__
if self.nsteps == 0:
args = (" " * len(name), "Step", "Time", "Energy", "fmax")
msg = "%s %4s %8s %15s %12s\n" % args
self.logfile.write(msg)
args = (name, self.nsteps, T[3], T[4], T[5], e, fmax)
msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args
self.logfile.write(msg)
self.logfile.flush()
def dump(self, data):
from ase.io.jsonio import write_json
if self.comm.rank == 0 and self.restart is not None:
with open(self.restart, 'w') as fd:
write_json(fd, data)
def load(self):
from ase.io.jsonio import read_json
with open(self.restart) as fd:
try:
from ase.optimize import BFGS
if not isinstance(self, BFGS) and isinstance(
self.atoms, UnitCellFilter
):
warnings.warn(
"WARNING: restart function is untested and may result "
"in unintended behavior. Namely orig_cell is not "
"loaded in the UnitCellFilter. Please test on your own"
" to ensure consistent results."
)
return read_json(fd, always_array=False)
except Exception as ex:
msg = ('Could not decode restart file as JSON. '
'You may need to delete the restart file '
f'{self.restart}')
raise RestartError(msg) from ex
|