File: controller.py

package info (click to toggle)
gringo 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 32,128 kB
  • sloc: cpp: 210,867; ansic: 37,507; python: 11,271; yacc: 825; javascript: 627; sh: 368; xml: 364; makefile: 102
file content (143 lines) | stat: -rwxr-xr-x 4,599 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python

import atexit
import os
import readline
import signal
from threading import Condition

from clingo import Control, Function, Number


class Controller:
    def __init__(self):
        histfile = os.path.join(os.path.expanduser("~"), ".controller")
        try:
            readline.read_history_file(histfile)
        except IOError:
            pass
        readline.parse_and_bind("tab: complete")

        def complete(commands, text, state):
            matches = []
            if state == 0:
                matches = [c for c in commands if c.startswith(text)]
            return matches[state] if state < len(matches) else None

        readline.set_completer(
            lambda text, state: complete(
                ["more_pigeon_please", "less_pigeon_please", "solve", "exit"],
                text,
                state,
            )
        )
        atexit.register(readline.write_history_file, histfile)
        self.solving = False
        self.condition = Condition()

    def register_solver(self, solver):
        self.solver = solver

    def interrupt(self, a, b):
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.solver.stop()

    def on_finish(self, ret):
        self.message = (
            "finish: " + str(ret) + (" (INTERRUPTED)" if ret.interrupted else "")
        )
        self.condition.acquire()
        self.solving = False
        self.condition.notify()
        self.condition.release()

    def run(self):
        print("")
        print("this prompt accepts the following commands:")
        print("  solve              - start solving")
        print("  exit/EOF           - terminate the solver")
        print("  Ctrl-C             - interrupt current search")
        print("  less_pigeon_please - select an easy problem")
        print("  more_pigeon_please - select a difficult problem")
        print("")

        pyInt = signal.getsignal(signal.SIGINT)
        while True:
            signal.signal(signal.SIGINT, pyInt)
            try:
                try:
                    input_ = raw_input
                except NameError:
                    input_ = input
                line = input_("> ")
                signal.signal(signal.SIGINT, signal.SIG_IGN)
            except EOFError:
                signal.signal(signal.SIGINT, signal.SIG_IGN)
                line = "exit"
                print(line)
            except KeyboardInterrupt:
                signal.signal(signal.SIGINT, signal.SIG_IGN)
                print
                continue
            if line == "solve":
                print("Solving...")
                self.solving = True
                self.solver.start(self.on_finish)
                signal.signal(signal.SIGINT, self.interrupt)
                self.condition.acquire()
                while self.solving:
                    # NOTE: we need a timeout to catch signals
                    self.condition.wait(1000)
                self.condition.release()
                self.solver.finish()
                for model in self.solver.models:
                    print("model: " + model)
                print(self.message)
            elif line == "exit":
                break
            elif line == "less_pigeon_please":
                self.solver.set_more_pigeon(False)
            elif line == "more_pigeon_please":
                self.solver.set_more_pigeon(True)
            else:
                print("unknown command: " + line)


class Solver:
    def __init__(self):
        self.k = 0
        self.prg = Control()
        self.prg.load("client.lp")
        self.prg.ground([("pigeon", []), ("sleep", [Number(self.k)])])
        self.prg.assign_external(Function("sleep", [Number(self.k)]), True)
        self.ret = None
        self.models = []

    def on_model(self, model):
        self.models.append(str(model))

    def start(self, on_finish):
        if self.ret is not None and not self.ret.unknown():
            self.k = self.k + 1
            self.prg.ground([("sleep", [self.k])])
            self.prg.release_external(Function("sleep", [Number(self.k - 1)]))
            self.prg.assign_external(Function("sleep", [Number(self.k)]), True)
        self.future = self.prg.solve(
            on_model=self.on_model, on_finish=on_finish, async_=True
        )

    def stop(self):
        self.future.cancel()

    def finish(self):
        ret = self.future.get()
        return ret

    def set_more_pigeon(self, more):
        self.prg.assign_external(Function("p"), more)


st = Solver()
ct = Controller()
ct.register_solver(st)
ct.run()