File: 240-opt-list-meta.py

package info (click to toggle)
libnbd 1.24.0-2.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,956 kB
  • sloc: ansic: 55,158; ml: 12,325; sh: 8,811; python: 4,757; makefile: 3,038; perl: 165; cpp: 24
file content (140 lines) | stat: -rw-r--r-- 3,750 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# libnbd Python bindings
# Copyright Red Hat
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import nbd
import os

nbdkit = os.getenv("NBDKIT", "nbdkit")

count = 0
seen = False


def f(user_data, name):
    global count
    global seen
    assert user_data == 42
    count = count + 1
    if name == nbd.CONTEXT_BASE_ALLOCATION:
        seen = True


def must_fail(f, *args, **kwds):
    try:
        f(*args, **kwds)
        assert False
    except nbd.Error:
        pass


# Get into negotiating state.
h = nbd.NBD()
h.set_opt_mode(True)
h.connect_command([nbdkit, "-s", "--exit-with-parent", "-v",
                   "memory", "size=1M"])

# First pass: empty query should give at least "base:allocation".
count = 0
seen = False
r = h.opt_list_meta_context(lambda *args: f(42, *args))
assert r == count
assert r >= 1
assert seen is True
max = count

# Second pass: bogus query has no response.
count = 0
seen = False
h.add_meta_context("x-nosuch:")
r = h.opt_list_meta_context(lambda *args: f(42, *args))
assert r == 0
assert r == count
assert seen is False

# Third pass: specific query should have one match.
count = 0
seen = False
h.add_meta_context(nbd.CONTEXT_BASE_ALLOCATION)
assert h.get_nr_meta_contexts() == 2
assert h.get_meta_context(1) == nbd.CONTEXT_BASE_ALLOCATION
r = h.opt_list_meta_context(lambda *args: f(42, *args))
assert r == 1
assert count == 1
assert seen is True

# Fourth pass: opt_list_meta_context is stateless, so it should
# not wipe status learned during opt_info
count = 0
seen = False
must_fail(h.can_meta_context, nbd.CONTEXT_BASE_ALLOCATION)
must_fail(h.get_size)
h.opt_info()
assert h.get_size() == 1024*1024
assert h.can_meta_context(nbd.CONTEXT_BASE_ALLOCATION) is True
h.clear_meta_contexts()
h.add_meta_context("x-nosuch:")
r = h.opt_list_meta_context(lambda *args: f(42, *args))
assert r == 0
assert count == 0
assert seen is False
assert h.get_size() == 1048576
assert h.can_meta_context(nbd.CONTEXT_BASE_ALLOCATION) is True

# Final pass: "base:" query should get at least "base:allocation"
count = 0
seen = False
h.add_meta_context("base:")
r = h.opt_list_meta_context(lambda *args: f(42, *args))
assert r >= 1
assert r <= max
assert r == count
assert seen is True

h.opt_abort()

# Repeat but this time without structured replies. Deal gracefully
# with older servers that don't allow the attempt.
h = nbd.NBD()
h.set_opt_mode(True)
h.set_request_structured_replies(False)
h.connect_command([nbdkit, "-s", "--exit-with-parent", "-v",
                   "memory", "size=1M"])
bytes = h.stats_bytes_sent()

try:
    count = 0
    seen = False
    r = h.opt_list_meta_context(lambda *args: f(42, *args))
    assert r == count
    assert r >= 1
    assert seen is True
except nbd.Error as ex:
    assert h.stats_bytes_sent() > bytes
    print("ignoring failure from old server: %s" % ex.string)

# Now enable structured replies, and a retry should pass.
r = h.opt_structured_reply()
assert r is True

count = 0
seen = False
r = h.opt_list_meta_context(lambda *args: f(42, *args))
assert r == count
assert r >= 1
assert seen is True

h.opt_abort()