File: _compat.py

package info (click to toggle)
python-pyxs 0.4.2~git20190115.97f14313-4
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 332 kB
  • sloc: python: 1,196; makefile: 93
file content (122 lines) | stat: -rw-r--r-- 3,509 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding: utf-8 -*-
"""
    pyxs._compat
    ~~~~~~~~~~~~

    This module implements compatibility interface for scripts,
    using ``xen.lowlevel.xs``.

    :copyright: (c) 2011 by Selectel.
    :copyright: (c) 2016 by pyxs authors and contributors, see AUTHORS
                for more details.
    :license: LGPL, see LICENSE for more details.
"""

from __future__ import absolute_import

__all__ = ["xs", "Error"]

import errno

from .client import Client
from .exceptions import PyXSError as Error


class xs(object):
    """XenStore client with a backward compatible interface, useful for
    switching from ``xen.lowlevel.xs``.
    """
    def __init__(self):
        self.client = Client()
        self.client.connect()
        self.monitor = self.client.monitor()
        self.token_aliases = {}

    def close(self):
        self.client.close()

    def get_permissions(self, tx_id, path):
        self.client.tx_id = int(tx_id or 0)
        self.client.get_perms(path)

    def set_permissions(self, tx_id, path, perms):
        self.client.tx_id = int(tx_id or 0)
        self.client.set_perms(path, perms)

    def ls(self, tx_id, path):
        self.client.tx_id = int(tx_id or 0)
        try:
            return self.client.list(path)
        except Error as e:
            if e.args[0] == errno.ENOENT:
                return

            raise

    def mkdir(self, tx_id, path):
        self.client.tx_id = int(tx_id or 0)
        self.client.mkdir(path)

    def rm(self, tx_id, path):
        self.client.tx_id = int(tx_id or 0)
        self.client.delete(path)

    def read(self, tx_id, path):
        self.client.tx_id = int(tx_id or 0)
        return self.client.read(path)

    def write(self, tx_id, path, value):
        self.client.tx_id = int(tx_id or 0)
        return self.client.write(path, value)

    def get_domain_path(self, domid):
        return self.client.get_domain_path(domid)

    def introduce_domain(self, domid, mfn, eventchn):
        self.client.introduce_domain(domid, mfn, eventchn)

    def release_domain(self, domid):
        self.client.release_domain(domid)

    def resume_domain(self, domid):
        self.client.resume_domain(domid)

    def set_target(self, domid, target):
        self.client.set_target(domid, target)

    def transaction_start(self):
        return str(self.client.transaction()).encode()

    def transaction_end(self, tx_id, abort=0):
        self.client.tx_id = int(tx_id or 0)
        if abort:
            self.client.rollback()
        else:
            try:
                self.client.commit()
            except Error as e:
                if e.args[0] == errno.EAGAIN:
                    return False

                raise

        return True

    def watch(self, path, token):
        # Even though ``xs.watch`` docstring states that token should be
        # a string, it in fact can be any Python object. We mimic this
        # behaviour here, but it can be a source of hard to find bugs.
        # See http://lists.xen.org/archives/html/xen-devel/2016-03/msg00228
        # for discussion.
        stub = str(id(token)).encode()
        self.token_aliases[stub] = token
        return self.monitor.watch(path, stub)

    def unwatch(self, path, token):
        stub = str(id(token)).encode()
        self.monitor.unwatch(path, stub)
        del self.token_aliases[stub]

    def read_watch(self):
        event = next(self.monitor.wait())
        return event._replace(token=self.token_aliases[event.token])