1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
|
# (c) 2002 LOGILAB S.A. (Paris, FRANCE).
# http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place - Suite 330, Boston, MA 02111-1307
# USA.
"""The code of the constraint propagation algorithms"""
from __future__ import generators
__revision__ = '$Id: propagation.py,v 1.36 2005/07/13 08:48:22 ludal Exp $'
from logilab.constraint.interfaces import DomainInterface, ConstraintInterface
import bisect, operator
from time import strftime
from logilab.constraint.psyco_wrapper import Psyobj
try:
enumerate = enumerate
except NameError:
from logilab.common.compat import enumerate
class ConsistencyFailure(Exception):
"""The repository is not in a consistent state"""
pass
class Repository(Psyobj):
"""Stores variables, domains and constraints
Propagates domain changes to constraints
Manages the constraint evaluation queue"""
def __init__(self, variables, domains, constraints = None):
# encode unicode
for i, var in enumerate(variables):
if type(var) == type(u''):
variables[i] = var.encode()
self._variables = variables # list of variable names
self._domains = domains # maps variable name to domain object
self._constraints = [] # list of constraint objects
# self._queue = [] # queue of constraints waiting to be processed
self._variableListeners = {}
for var in self._variables:
self._variableListeners[var] = []
assert self._domains.has_key(var)
for constr in constraints or ():
self.addConstraint(constr)
def __repr__(self):
return '<Repository nb_constraints=%d domains=%s>' % \
(len(self._constraints), self._domains)
def vcg_draw(self, filename, title='Constraints graph'):
""" draw a constraints graph readable by vcg
"""
from logilab.common.vcgutils import VCGPrinter, EDGE_ATTRS
stream = open(filename, 'w')
printer = VCGPrinter(stream)
printer.open_graph(title=title, textcolor='black'
# layoutalgorithm='dfs',
# manhattan_edges='yes'
# port_sharing='no'
# late_edge_labels='yes'
)
for var in self._variables:
printer.node(var, shape='ellipse')
type_colors = {}
color_index = 2
i = 0
for constraint in self._constraints:
key = constraint.type
if not type_colors.has_key(key):
type_colors[key] = color_index
color_index += 1
affected_vars = constraint.affectedVariables()
if len(affected_vars) <= 1:
continue
if len(affected_vars) == 2:
var1 = affected_vars[0]
var2 = affected_vars[1]
printer.edge(var1, var2, arrowstyle='none',
color=EDGE_ATTRS['color'][type_colors[key]])
continue
n_id = 'N_aire%i' % i
i += 1
printer.node(n_id, shape='triangle')
for var1 in affected_vars:
printer.edge(var1, n_id, arrowstyle='none',
color=EDGE_ATTRS['color'][type_colors[key]])
# print legend
for node_type, color in type_colors.items():
printer.node(node_type, shape='box',
color=EDGE_ATTRS['color'][color])
printer.close_graph()
stream.close()
def addConstraint(self, constraint):
if isinstance(constraint, BasicConstraint):
# Basic constraints are processed just once
# because they are straight away entailed
var = constraint.getVariable()
constraint.narrow({var: self._domains[var]})
else:
self._constraints.append(constraint)
for var in constraint.affectedVariables():
self._variableListeners[var].append(constraint)
def _removeConstraint(self, constraint):
self._constraints.remove(constraint)
for var in constraint.affectedVariables():
try:
self._variableListeners[var].remove(constraint)
except ValueError:
raise ValueError('Error removing constraint from listener',
var,
self._variableListeners[var],
constraint)
def getDomains(self):
return self._domains
def distribute(self, distributor, verbose=0):
"""Create new repository using the distributor and self """
for domains in distributor.distribute(self._domains, verbose):
yield Repository(self._variables, domains, self._constraints)
# alf 20041216 -- I tried the following to avoid the cost of the
# creation of new Repository objects. It resulted in functional, but
# slightly slower code. If you want to try to improve it, I keep the
# commented out version in the source, but the version above stays
# active as it is both simpler and faster.
## backup_constraints = self._constraints[:]
## for domains in distributor.distribute(self._domains, verbose):
## self._domains = domains
## self._constraints = []
## self._queue = []
## for var in self._variables:
## self._variableListeners[var] = []
## for constraint in backup_constraints:
## self.addConstraint(constraint)
## yield self
def consistency(self, verbose=0):
"""Prunes the domains of the variables
This method calls constraint.narrow() and queues constraints
that are affected by recent changes in the domains.
Returns True if a solution was found"""
if verbose:
print strftime('%H:%M:%S'), '** Consistency **'
_queue = [ (constr.estimateCost(self._domains),
constr) for constr in self._constraints ]
_affected_constraints = {}
while True:
if not _queue:
# refill the queue if some constraints have been affected
_queue = [ (constr.estimateCost(self._domains),
constr) for constr in _affected_constraints ]
if not _queue:
break
_queue.sort()
_affected_constraints.clear()
if verbose > 2:
print strftime('%H:%M:%S'), 'Queue', _queue
cost, constraint = _queue.pop(0)
if verbose > 1:
print strftime('%H:%M:%S'),
print 'Trying to entail constraint',
print constraint, '[cost:%d]' % cost
entailed = constraint.narrow(self._domains)
for var in constraint.affectedVariables():
# affected constraints are listeners of
# affected variables of this constraint
dom = self._domains[var]
if not dom.hasChanged():
continue
if verbose > 1 :
print strftime('%H:%M:%S'),
print ' -> New domain for variable', var, 'is', dom
for constr in self._variableListeners[var]:
if constr is not constraint:
_affected_constraints[constr]=True
dom.resetFlags()
if entailed:
if verbose:
print strftime('%H:%M:%S'),
print "--> Entailed constraint", constraint
self._removeConstraint(constraint)
if constraint in _affected_constraints:
del _affected_constraints[constraint]
for domain in self._domains.itervalues():
if domain.size() != 1:
return 0
return 1
class Solver(Psyobj):
def __init__(self, distributor=None):
if distributor is None:
from logilab.constraint.distributors import DefaultDistributor
distributor = DefaultDistributor()
self._distributor = distributor
self.verbose = 1
self.max_depth = 0
def solve_one(self, repository, verbose=0):
"""Generates only one solution"""
self.verbose = verbose
self.max_depth = 0
try:
# XXX FIXME: this is a workaround a bug in psyco-1.4
## return self._solve(repository).next()
return self._solve(repository, 0).next()
except StopIteration:
return
def solve_best(self, repository, cost_func, verbose=0):
"""Generates solution with an improving cost"""
self.verbose = verbose
self.max_depth = 0
best_cost = None
# XXX FIXME: this is a workaround a bug in psyco-1.4
## for solution in self._solve(repository):
for solution in self._solve(repository, 0):
cost = cost_func(**solution)
if best_cost is None or cost <= best_cost:
best_cost = cost
yield solution, cost
def solve_all(self, repository, verbose=0):
"""Generates all solutions"""
self.verbose = verbose
self.max_depth = 0
for solution in self._solve(repository):
yield solution
def solve(self, repository, verbose=0):
self.max_depth = 0
solutions = []
for solution in self.solve_all(repository, verbose):
solutions.append(solution)
return solutions
def _solve(self, repository, recursion_level=0):
"""main generator"""
solve = self._solve
verbose = self.verbose
if recursion_level > self.max_depth:
self.max_depth = recursion_level
if verbose:
print strftime('%H:%M:%S'),
print '*** [%d] Solve called with repository' % recursion_level,
print repository
try:
foundSolution = repository.consistency(verbose)
except ConsistencyFailure, exc:
if verbose:
print strftime('%H:%M:%S'), exc
pass
else:
if foundSolution:
solution = {}
for variable, domain in repository.getDomains().items():
solution[variable] = domain.getValues()[0]
if verbose:
print strftime('%H:%M:%S'), '### Found Solution', solution
print '-'*80
yield solution
else:
for repo in repository.distribute(self._distributor,
verbose):
for solution in solve(repo, recursion_level+1):
if solution is not None:
yield solution
if recursion_level == 0 and self.verbose:
print strftime('%H:%M:%S'),'Finished search'
print strftime('%H:%M:%S'),
print 'Maximum recursion depth = ', self.max_depth
class BasicConstraint(Psyobj):
"""A BasicConstraint, which is never queued by the Repository
A BasicConstraint affects only one variable, and will be entailed
on the first call to narrow()"""
__implements__ = ConstraintInterface
def __init__(self, variable, reference, operator):
"""variables is a list of variables on which
the constraint is applied"""
self._variable = variable
self._reference = reference
self._operator = operator
def __repr__(self):
return '<%s %s %s>'% (self.__class__, self._variable, self._reference)
def isVariableRelevant(self, variable):
return variable == self._variable
def estimateCost(self, domains):
return 0 # get in the first place in the queue
def affectedVariables(self):
return [self._variable]
def getVariable(self):
return self._variable
def narrow(self, domains):
domain = domains[self._variable]
operator = self._operator
ref = self._reference
try:
for val in domain.getValues() :
if not operator(val, ref) :
domain.removeValue(val)
except ConsistencyFailure:
raise ConsistencyFailure('inconsistency while applying %s' % \
repr(self))
return 1
class AbstractDomain(Psyobj):
"""Implements the functionnality related to the changed flag.
Can be used as a starting point for concrete domains"""
__implements__ = DomainInterface
def __init__(self):
self.__changed = 0
def resetFlags(self):
self.__changed = 0
def hasChanged(self):
return self.__changed
def _valueRemoved(self):
"""The implementation of removeValue should call this method"""
self.__changed = 1
if self.size() == 0:
raise ConsistencyFailure()
class AbstractConstraint(Psyobj):
__implements__ = ConstraintInterface
def __init__(self, variables):
"""variables is a list of variables which appear in the formula"""
self._variables = variables
def affectedVariables(self):
""" Return a list of all variables affected by this constraint """
return self._variables
def isVariableRelevant(self, variable):
return variable in self._variables
def estimateCost(self, domains):
"""Return an estimate of the cost of the narrowing of the constraint"""
return reduce(operator.mul,
[domains[var].size() for var in self._variables])
|