File: optimiser.py

package info (click to toggle)
python-cogent 1.4.1-1.2
  • links: PTS, VCS
  • area: non-free
  • in suites: squeeze
  • size: 13,260 kB
  • ctags: 20,087
  • sloc: python: 116,163; ansic: 732; makefile: 74; sh: 9
file content (182 lines) | stat: -rw-r--r-- 6,583 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python
"""
optimiser.py

Contains a base class for numerical optimisers.
"""

import numpy
Float = numpy.core.numerictypes.sctype2char(float)
import random
import logging

from cogent.util.checkpointing import Checkpointer

__author__ = "Andrew Butterfield"
__copyright__ = "Copyright 2007-2009, The Cogent Project"
__credits__ = ["Andrew Butterfield", "Peter Maxwell", "Edward Lang",
                    "Gavin Huttley"]
__license__ = "GPL"
__version__ = "1.4.1"
__maintainer__ = "Gavin Huttley"
__email__ = "gavin.huttley@anu.edu.au"
__status__ = "Production"

LOG = logging.getLogger('cogent')

class ParameterOutOfBoundsError(Exception):
    pass

epsilon = 1e-9 # intended for dealing with rounding errors

# The following functions are used to wrap the optimised function to
# adapt it to the optimiser in various ways.  They can be combined.

def bounded_function(f, lower_bounds, upper_bounds):
    """Returns a function that return +inf on out-of-bounds input rather than
    bothering the real function with invalid input.  This is enough to get some
    unbounded optimisers working on bounded problems"""
    
    def _wrapper(x, **kw):
        if numpy.alltrue(numpy.logical_and(lower_bounds <= x, x <= upper_bounds)):
            return f(x, **kw)
        else:
            raise ParameterOutOfBoundsError((lower_bounds, x, upper_bounds))
    
    return _wrapper

def bounds_exception_catching_function(f, direction):
    """Like bounded_function, except that the out-of-bounds parameter isn't caught
    here, but deeper in the calculation, and reported as an exception"""
    if direction < 0:
        out_of_bounds_value = numpy.inf
        acceptable_inf = numpy.isposinf
    else:
        out_of_bounds_value = -numpy.inf
        acceptable_inf = numpy.isneginf
    
    def _wrapper(x, **kw):
        try:
            result = f(x, **kw)
            if not numpy.isfinite(result):
                if not acceptable_inf(result):
                    LOG.warning('Non-finite f %s from %s' % (result, x))
                    raise ParameterOutOfBoundsError
        except (ArithmeticError, ParameterOutOfBoundsError), detail:
            result = out_of_bounds_value
        return result
    
    return _wrapper


def reversed_function(f):
    """Likelihood needs maximising, but our optimisers are minimisers"""
    def _wrapper(x, **kw):
        return -f(x, **kw)
    
    return _wrapper


class OptimiserBase(object):
    """Maximises (or minimises if 'direction'<0) optimisable_object
    .testoptparvector(x) for x within optimisable_object.getBoundsVectors().
    For some optimisers random numbers are used to choose the search path,
    so for deterministic behaviour provide either 'random_series' or 'seed'.
    """
    # Subclass must provide _setdefaults, setConditions, runInner
    # runInner will get passed a function to minimise, so it does
    # not need to call any OptimiserBase methods.
    
    def __init__(self, f, xinit, bounds, direction=1, random_series=None,
            seed=None, **kw):
        self.restore = True
        self._random_series = random_series or random.Random()
        if seed is not None:
            self._random_series.seed(seed)
        self.__total_evaluations = 0
        # flag to determine the direction. We assume that all wrapped
        # optimiser functions are minimisers.
        self.__reverse_result = direction * self.algorithm_direction < 0
        self.setCheckpointing()
        self._setdefaults()
        self.setConditions(**kw)
        self.setVectorBounds(*bounds)
        self.vector_length = len(xinit)   #risks irrelevance
        self.vector = numpy.array(xinit, Float)
        self.__original_f = f
    
    def setCheckpointing(self, filename = None, interval = 1800, restore = None):
        """
        Set the checkpointing filename and time interval.
        Arguments:
        - filename: name of the file to which data will be written. If None, no
          checkpointing will be done.
        - interval: time expressed in seconds
        - restore: flag to restore from this filenamne or not. will be set to 0 after
          restoration
        """
        self.checkpointer = Checkpointer(filename, interval)
        if restore is not None:
            self.restore = restore
    
    def setVectorBounds(self, lower_bounds = None, upper_bounds = None):
        """Set the bounds within which the vector values can lie."""
        if lower_bounds is not None:
            self.lower_bounds = lower_bounds
        if upper_bounds is not None:
            self.upper_bounds = upper_bounds
    
    def getEvaluationCount(self):
        """Return the total number of evaluations taken."""
        
        return self.__total_evaluations
    
    def run(self, show_progress = True):
        """
        In principle this would call the virtually overriden function, ie be the
        public interface to the protected overridable.
        
        Arguments:
        - show_progress: whether the function values are printed as
          the optimisation proceeds. Default is True.
        
        Returns the optimised function value and the corresponding parameter
        vector.
        """
        
        f = self.__original_f
        vector = self.vector
        
        # adapt the function to the optimiser
        if self.__reverse_result:
            f = reversed_function(f)
        f = bounded_function(f, self.lower_bounds, self.upper_bounds)
        try:
            fval = f(vector)
        except (ArithmeticError, ParameterOutOfBoundsError), detail:
            raise ValueError("Initial parameter values must be valid %s" % repr(detail.args))
        if not numpy.isfinite(fval):
            if numpy.isinf(fval) and self.__reverse_result:
                fval = -1 * fval
            raise ValueError("Initial parameter values must evaluate to a finite likelihood, not %s. %s" % (fval, vector))
        
        f = bounds_exception_catching_function(f, self.algorithm_direction)
        
        (fval, xopt, func_calls, elapsed_time) = self.runInner(f, vector,
                show_progress=show_progress, random_series = self._random_series)
        
        # ensure state of calculator reflects optimised result
        f(xopt)
        
        self.__total_evaluations += func_calls
        self.elapsed_time = elapsed_time
        
        if self.__reverse_result:
            fval = -fval
        
        return fval, xopt
    
    def setVector(self, vector):
        self.vector = vector
        self.__original_f(vector)