File: state.py

package info (click to toggle)
pius 3.0.0-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 432 kB
  • sloc: python: 2,963; perl: 178; makefile: 2; sh: 1
file content (160 lines) | stat: -rw-r--r-- 5,395 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# vim:shiftwidth=4:tabstop=4:expandtab:textwidth=80:softtabstop=4:ai:
import json
import os
import shutil

from libpius.util import PiusUtil


class SignState:
    # States for our intention/state on other keys
    kSIGNED = "SIGNED"
    kWILL_NOT_SIGN = "WILL_NOT_SIGN"
    kNOT_SIGNED = "NOT_SIGNED"

    # States for other's intention on our keys
    kIGNORE = "IGNORE"
    kNO_IGNORE = "NO_IGNORE"

    # Direction
    kOUTBOUND = "OUTBOUND"
    kINBOUND = "INBOUND"

    kSIGNED_KEYS_DB_NAME = "signed_keys"

    # metadata for current format
    kFILE_METADATA = {"_meta": {"version": 3}}

    def __init__(self):
        self.signed_keys_db = os.path.join(
            PiusUtil.statedir(), self.kSIGNED_KEYS_DB_NAME
        )
        self.state = {}
        self._load()
        self.modified = False

    def _load(self):
        self.state = self.load_signed_keys()

    def _validate_value(self, direction, val):
        if direction == self.kOUTBOUND:
            assert val in [self.kSIGNED, self.kWILL_NOT_SIGN, self.kNOT_SIGNED]
        else:
            assert val in [self.kIGNORE, self.kNO_IGNORE, None]

    def __iter__(self):
        return self.state.__iter__()

    def signed(self, key):
        return (
            key in self.state
            and self.state[key][self.kOUTBOUND] == self.kSIGNED
        )

    def will_not_sign(self, key):
        return (
            key in self.state
            and self.state[key][self.kOUTBOUND] == self.kWILL_NOT_SIGN
        )

    def expect_sig(self, key):
        return not (
            key in self.state and self.state[key][self.kINBOUND] == self.kIGNORE
        )

    def update_outbound(self, key, val):
        return self.update(key, self.kOUTBOUND, val)

    def update_inbound(self, key, val):
        return self.update(key, self.kINBOUND, val)

    def update(self, key, direction, val):
        # we don't store NOT_SIGNED, it's meaningless.
        if direction == self.kOUTBOUND and val == self.kNOT_SIGNED:
            return
        self._validate_value(direction, val)
        if key not in self.state:
            self.state[key] = {self.kOUTBOUND: None, self.kINBOUND: None}
        self.state[key][direction] = val
        self.modified = True

    def save(self):
        self.store_signed_keys(self.state)

    def convert_from_v2(self, signstate):
        newstate = {}
        for k, v in signstate.items():
            newstate[k] = {self.kOUTBOUND: v, self.kINBOUND: None}
        return newstate

    def convert_from_v1(self, data):
        """
    If it is not JSON, it's the original format of one signed key per line
    """
        return dict(
            (key, {self.kOUTBOUND: "SIGNED", self.kINBOUND: None})
            for key in data.strip().split("\n")
        )

    def load_signed_keys(self):
        PiusUtil.handle_path_migration(
            self.signed_keys_db,
            [
                os.path.join(x, self.kSIGNED_KEYS_DB_NAME)
                for x in PiusUtil.previous_statedirs()
            ],
        )

        if not os.path.exists(self.signed_keys_db):
            return dict()
        with open(self.signed_keys_db, "r") as fp:
            data = fp.read()
            # We have had multiple versions of the state file.
            #
            # v1 was just one key per line, each key was "signed"
            #
            # v2 was a hash taking the form:
            #   key: (SIGNED | WILL_NOTSIGN | NOT_SIGNED)
            #
            # v3 was the first one with a version identifier in it. It includes
            # a `meta` entry for the version and any other future metadata.
            # The data itself takes the format of:
            #   key: {
            #     'OUTBOUND': (SIGNED | WILL_NOTSIGN | NOT_SIGNED | None),
            #     'OUTBOUND': (None | Ignore),
            #   }
            try:
                signstate = json.loads(data)
                if "_meta" in signstate and signstate["_meta"]["version"] == 3:
                    PiusUtil.debug("Loading v3 PIUS statefile")
                    del (signstate["_meta"])
                elif "_meta" not in signstate:
                    PiusUtil.debug("Loading v2 PIUS statefile")
                    signstate = self.convert_from_v2(signstate)
            except ValueError:
                PiusUtil.debug("Loading v1 PIUS statefile")
                signstate = self.convert_from_v1(data)
        return signstate

    def store_signed_keys(self, signstate):
        # re-read in the list and merge it...
        prev_signstate = self.load_signed_keys()
        # merge the two with the one we're passed in winning
        prev_signstate.update(signstate)
        prev_signstate.update(self.kFILE_METADATA)
        self.write_file(prev_signstate)

    def write_file(self, data):
        """Separated out for easier unittesting"""
        if not os.path.exists(PiusUtil.statedir()):
            os.mkdir(PiusUtil.statedir(), 0o750)
        if not os.path.isdir(PiusUtil.statedir()):
            print(
                "WARNING: There is a %s which is not a directory."
                " Not storing state." % PiusUtil.statedir()
            )
            return
        if os.path.exists(self.signed_keys_db):
            shutil.copy(self.signed_keys_db, self.signed_keys_db + ".save")
        with open(self.signed_keys_db, "w") as fp:
            fp.write(json.dumps(data))