File: db.py

package info (click to toggle)
python-werkzeug 2.2.2-3%2Bdeb12u1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,248 kB
  • sloc: python: 22,177; javascript: 304; makefile: 32; xml: 16; sh: 10
file content (67 lines) | stat: -rw-r--r-- 1,668 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""A simple object database. As long as the server is not running in
multiprocess mode that's good enough.
"""
import dbm
from pickle import dumps
from pickle import loads
from threading import Lock


class Database:
    def __init__(self, filename):
        self.filename = filename
        self._fs = dbm.open(filename, "cf")
        self._local = {}
        self._lock = Lock()

    def __getitem__(self, key):
        with self._lock:
            return self._load_key(key)

    def _load_key(self, key):
        if key in self._local:
            return self._local[key]
        rv = loads(self._fs[key])
        self._local[key] = rv
        return rv

    def __setitem__(self, key, value):
        self._local[key] = value

    def __delitem__(self, key):
        with self._lock:
            self._local.pop(key, None)
            if key in self._fs:
                del self._fs[key]

    def __del__(self):
        self.close()

    def __contains__(self, key):
        with self._lock:
            try:
                self._load_key(key)
            except KeyError:
                pass
            return key in self._local

    def setdefault(self, key, factory):
        with self._lock:
            try:
                rv = self._load_key(key)
            except KeyError:
                self._local[key] = rv = factory()
            return rv

    def sync(self):
        with self._lock:
            for key, value in self._local.items():
                self._fs[key] = dumps(value, 2)
            self._fs.sync()

    def close(self):
        try:
            self.sync()
            self._fs.close()
        except Exception:
            pass