File: speciation.py

package info (click to toggle)
deap 1.3.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 3,500 kB
  • sloc: python: 8,558; ansic: 1,054; cpp: 592; makefile: 94; sh: 5
file content (181 lines) | stat: -rw-r--r-- 6,672 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#    This file is part of DEAP.
#
#    DEAP is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as
#    published by the Free Software Foundation, either version 3 of
#    the License, or (at your option) any later version.
#
#    DEAP is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public
#    License along with DEAP. If not, see <http://www.gnu.org/licenses/>.

"""Implementation of the Speciation Particle Swarm Optimization algorithm as
presented in *Li, Blackwell, and Branke, 2006, Particle Swarm with Speciation
and Adaptation in a Dynamic Environment.*
"""

import itertools
import math
import operator
import random

import numpy

try:
    from itertools import imap
except:
    # Python 3 nothing to do
    pass
else:
    map = imap

from deap import base
from deap.benchmarks import movingpeaks
from deap import creator
from deap import tools

scenario = movingpeaks.SCENARIO_2

NDIM = 5
BOUNDS = [scenario["min_coord"], scenario["max_coord"]]

mpb = movingpeaks.MovingPeaks(dim=NDIM, **scenario)

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Particle", list, fitness=creator.FitnessMax, speed=list, 
    best=None, bestfit=creator.FitnessMax)

def generate(pclass, dim, pmin, pmax, smin, smax):
    part = pclass(random.uniform(pmin, pmax) for _ in range(dim)) 
    part.speed = [random.uniform(smin, smax) for _ in range(dim)]
    return part

def convert_quantum(swarm, rcloud, centre):
    dim = len(swarm[0])
    for part in swarm:
        position = [random.gauss(0, 1) for _ in range(dim)]
        dist = math.sqrt(sum(x**2 for x in position))
        
        # Gaussian distribution
        # u = abs(random.gauss(0, 1.0/3.0))
        # part[:] = [(rcloud * x * u**(1.0/dim) / dist) + c for x, c in zip(position, centre)]
        
        # UVD distribution
        # u = random.random()
        # part[:] = [(rcloud * x * u**(1.0/dim) / dist) + c for x, c in zip(position, centre)]
        
        # NUVD distribution
        u = abs(random.gauss(0, 1.0/3.0))
        part[:] = [(rcloud * x * u / dist) + c for x, c in zip(position, centre)]
        
        del part.fitness.values
        del part.bestfit.values
        part.best = None

    return swarm

def updateParticle(part, best, chi, c):
    ce1 = (c*random.uniform(0, 1) for _ in range(len(part)))
    ce2 = (c*random.uniform(0, 1) for _ in range(len(part)))
    ce1_p = map(operator.mul, ce1, map(operator.sub, best, part))
    ce2_g = map(operator.mul, ce2, map(operator.sub, part.best, part))
    a = map(operator.sub,
                      map(operator.mul,
                                    itertools.repeat(chi),
                                    map(operator.add, ce1_p, ce2_g)),
                      map(operator.mul,
                                     itertools.repeat(1-chi),
                                     part.speed))
    part.speed = list(map(operator.add, part.speed, a))
    part[:] = list(map(operator.add, part, part.speed))
    
toolbox = base.Toolbox()
toolbox.register("particle", generate, creator.Particle, dim=NDIM,
    pmin=BOUNDS[0], pmax=BOUNDS[1], smin=-(BOUNDS[1] - BOUNDS[0])/2.0,
    smax=(BOUNDS[1] - BOUNDS[0])/2.0)
toolbox.register("swarm", tools.initRepeat, list, toolbox.particle)
toolbox.register("update", updateParticle, chi=0.729843788, c=2.05)
toolbox.register("convert", convert_quantum)
toolbox.register("evaluate", mpb)

def main(verbose=True):
    NPARTICLES = 100
    RS = (BOUNDS[1] - BOUNDS[0]) / (50**(1.0/NDIM))    # between 1/20 and 1/10 of the domain's range
    PMAX = 10
    RCLOUD = 1.0    # 0.5 times the move severity

    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", numpy.mean)
    stats.register("std", numpy.std)
    stats.register("min", numpy.min)
    stats.register("max", numpy.max)
    
    logbook = tools.Logbook()
    logbook.header = "gen", "nswarm", "evals", "error", "offline_error", "avg", "max"
    
    swarm = toolbox.swarm(n=NPARTICLES)
    
    generation = 0
    while mpb.nevals < 5e5:
        # Evaluate each particle in the swarm
        for part in swarm:
            part.fitness.values = toolbox.evaluate(part)
            if not part.best or part.bestfit < part.fitness:
                part.best = toolbox.clone(part[:])         # Get the position
                part.bestfit.values = part.fitness.values  # Get the fitness
        

        # Sort swarm into species, best individual comes first
        sorted_swarm = sorted(swarm, key=lambda ind: ind.bestfit, reverse=True)
        species = []
        while sorted_swarm:
            found = False
            for s in species:
                dist = math.sqrt(sum((x1 - x2)**2 for x1, x2 in zip(sorted_swarm[0].best, s[0].best)))
                if dist <= RS:
                    found = True
                    s.append(sorted_swarm[0])
                    break
            if not found:
                species.append([sorted_swarm[0]])
            sorted_swarm.pop(0)
        
        record = stats.compile(swarm)
        logbook.record(gen=generation, evals=mpb.nevals, nswarm=len(species),
                       error=mpb.currentError(), offline_error=mpb.offlineError(), **record)

        if verbose:
            print(logbook.stream)

        # Detect change
        if any(s[0].bestfit.values != toolbox.evaluate(s[0].best) for s in species):
            # Convert particles to quantum particles
            for s in species:
                s[:] = toolbox.convert(s, rcloud=RCLOUD, centre=s[0].best)

        else:
            # Replace exceeding particles in a species with new particles
            for s in species:
                if len(s) > PMAX:
                    n = len(s) - PMAX
                    del s[PMAX:]
                    s.extend(toolbox.swarm(n=n))
            
            # Update particles that have not been reinitialized
            for s in species[:-1]:
                for part in s[:PMAX]:
                    toolbox.update(part, s[0].best)
                    del part.fitness.values
        
        # Return all but the worst species' updated particles to the swarm
        # The worst species is replaced by new particles
        swarm = list(itertools.chain(toolbox.swarm(n=len(species[-1])), *species[:-1]))
        generation += 1
        
if __name__ == '__main__':
    main()