File: utils.py

package info (click to toggle)
python-os-xenapi 0.3.4-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 1,012 kB
  • sloc: python: 8,137; sh: 2,154; makefile: 45
file content (518 lines) | stat: -rw-r--r-- 17,052 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
# Copyright (c) 2012 OpenStack Foundation
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace
# which means the Nova xenapi plugins must use only Python 2.4 features

"""Various utilities used by XenServer plugins."""

try:
    import cPickle as pickle
except ImportError:
    import pickle

import errno
import logging
import os
import shutil
import signal
import subprocess
import tempfile

import XenAPIPlugin

LOG = logging.getLogger(__name__)
CHUNK_SIZE = 8192


class CommandNotFound(Exception):
    pass


def delete_if_exists(path):
    try:
        os.unlink(path)
    except OSError as e:  # noqa
        if e.errno == errno.ENOENT:
            LOG.warning("'%s' was already deleted, skipping delete", path)
        else:
            raise


def _link(src, dst):
    LOG.info("Hard-linking file '%s' -> '%s'", src, dst)
    os.link(src, dst)


def _rename(src, dst):
    LOG.info("Renaming file '%s' -> '%s'", src, dst)
    try:
        os.rename(src, dst)
    except OSError as e:  # noqa
        if e.errno == errno.EXDEV:
            LOG.error("Invalid cross-device link.  Perhaps %s and %s should "
                      "be symlinked on the same filesystem?", src, dst)
        raise


def make_subprocess(cmdline, stdout=False, stderr=False, stdin=False,
                    universal_newlines=False, close_fds=True, env=None):
    """Make a subprocess according to the given command-line string"""
    LOG.info("Running cmd '%s'", " ".join(cmdline))
    kwargs = {}
    kwargs['stdout'] = stdout and subprocess.PIPE or None
    kwargs['stderr'] = stderr and subprocess.PIPE or None
    kwargs['stdin'] = stdin and subprocess.PIPE or None
    kwargs['universal_newlines'] = universal_newlines
    kwargs['close_fds'] = close_fds
    kwargs['env'] = env
    try:
        proc = subprocess.Popen(cmdline, **kwargs)
    except OSError as e:  # noqa
        if e.errno == errno.ENOENT:
            raise CommandNotFound
        else:
            raise
    return proc


class SubprocessException(Exception):
    def __init__(self, cmdline, ret, out, err):
        Exception.__init__(self, "'%s' returned non-zero exit code: "
                           "retcode=%i, out='%s', stderr='%s'"
                           % (cmdline, ret, out, err))
        self.cmdline = cmdline
        self.ret = ret
        self.out = out
        self.err = err


def finish_subprocess(proc, cmdline, cmd_input=None, ok_exit_codes=None):
    """Ensure that the process returned a zero exit code indicating success"""
    if ok_exit_codes is None:
        ok_exit_codes = [0]
    out, err = proc.communicate(cmd_input)

    ret = proc.returncode
    if ret not in ok_exit_codes:
        LOG.error("Command '%(cmdline)s' with process id '%(pid)s' expected "
                  "return code in '%(ok)s' but got '%(rc)s': %(err)s",
                  {'cmdline': cmdline, 'pid': proc.pid, 'ok': ok_exit_codes,
                   'rc': ret, 'err': err})
        raise SubprocessException(' '.join(cmdline), ret, out, err)
    return out


def run_command(cmd, cmd_input=None, ok_exit_codes=None):
    """Abstracts out the basics of issuing system commands.

    If the command returns anything in stderr, an exception is raised with
    that information. Otherwise, the output from stdout is returned.

    cmd_input is passed to the process on standard input.
    """
    proc = make_subprocess(cmd, stdout=True, stderr=True, stdin=True,
                           close_fds=True)
    return finish_subprocess(proc, cmd, cmd_input=cmd_input,
                             ok_exit_codes=ok_exit_codes)


def try_kill_process(proc):
    """Sends the given process the SIGKILL signal."""
    pid = proc.pid
    LOG.info("Killing process %s", pid)
    try:
        os.kill(pid, signal.SIGKILL)
    except Exception:
        LOG.exception("Failed to kill %s", pid)


def make_staging_area(sr_path):
    """The staging area is a place we temporarily store and manipulate VHDs.

    The use of the staging area is different for upload and download:

    Download
    ========

    When we download the tarball, the VHDs contained within will have names
    like "snap.vhd" and "image.vhd". We need to assign UUIDs to them before
    moving them into the SR. However, since 'image.vhd' may be a base_copy, we
    need to link it to 'snap.vhd' (using vhd-util modify) before moving both
    into the SR (otherwise the SR.scan will cause 'image.vhd' to be deleted).
    The staging area gives us a place to perform these operations before they
    are moved to the SR, scanned, and then registered with XenServer.

    Upload
    ======

    On upload, we want to rename the VHDs to reflect what they are, 'snap.vhd'
    in the case of the snapshot VHD, and 'image.vhd' in the case of the
    base_copy. The staging area provides a directory in which we can create
    hard-links to rename the VHDs without affecting what's in the SR.

    NOTE
    ====

    The staging area is created as a subdirectory within the SR in order to
    guarantee that it resides within the same filesystem and therefore permit
    hard-linking and cheap file moves.
    """
    staging_path = tempfile.mkdtemp(dir=sr_path)
    return staging_path


def cleanup_staging_area(staging_path):
    """Remove staging area directory

    On upload, the staging area contains hard-links to the VHDs in the SR;
    it's safe to remove the staging-area because the SR will keep the link
    count > 0 (so the VHDs in the SR will not be deleted).
    """
    if os.path.exists(staging_path):
        shutil.rmtree(staging_path)


def _handle_old_style_images(staging_path):
    """Rename files to conform to new image format, if needed.

    Old-Style:

        snap.vhd -> image.vhd -> base.vhd

    New-Style:

        0.vhd -> 1.vhd -> ... (n-1).vhd

    The New-Style format has the benefit of being able to support a VDI chain
    of arbitrary length.
    """
    file_num = 0
    for filename in ('snap.vhd', 'image.vhd', 'base.vhd'):
        path = os.path.join(staging_path, filename)
        if os.path.exists(path):
            _rename(path, os.path.join(staging_path, "%d.vhd" % file_num))
            file_num += 1

    # Rename any format of name to 0.vhd when there is only single one
    contents = os.listdir(staging_path)
    if len(contents) == 1:
        filename = contents[0]
        if filename != '0.vhd' and filename.endswith('.vhd'):
            _rename(
                os.path.join(staging_path, filename),
                os.path.join(staging_path, '0.vhd'))


def _assert_vhd_not_hidden(path):
    """Sanity check to ensure that only appropriate VHDs are marked as hidden.

    If this flag is incorrectly set, then when we move the VHD into the SR, it
    will be deleted out from under us.
    """
    query_cmd = ["vhd-util", "query", "-n", path, "-f"]
    out = run_command(query_cmd)

    for line in out.splitlines():
        if line.lower().startswith('hidden'):
            value = line.split(':')[1].strip()
            if value == "1":
                raise Exception(
                    "VHD %s is marked as hidden without child" % path)


def _vhd_util_check(vdi_path):
    check_cmd = ["vhd-util", "check", "-n", vdi_path, "-p"]
    out = run_command(check_cmd, ok_exit_codes=[0, 22])
    first_line = out.splitlines()[0].strip()
    return out, first_line


def _validate_vhd(vdi_path):
    """This checks for several errors in the VHD structure.

    Most notably, it checks that the timestamp in the footer is correct, but
    may pick up other errors also.

    This check ensures that the timestamps listed in the VHD footer aren't in
    the future.  This can occur during a migration if the clocks on the two
    Dom0's are out-of-sync. This would corrupt the SR if it were imported, so
    generate an exception to bail.
    """
    out, first_line = _vhd_util_check(vdi_path)

    if 'invalid' in first_line:
        LOG.warning("VHD invalid, attempting repair.")
        repair_cmd = ["vhd-util", "repair", "-n", vdi_path]
        run_command(repair_cmd)
        out, first_line = _vhd_util_check(vdi_path)

    if 'invalid' in first_line:
        if 'footer' in first_line:
            part = 'footer'
        elif 'header' in first_line:
            part = 'header'
        else:
            part = 'setting'

        details = first_line.split(':', 1)
        if len(details) == 2:
            details = details[1]
        else:
            details = first_line

        extra = ''
        if 'timestamp' in first_line:
            extra = (" ensure source and destination host machines have "
                     "time set correctly")

        LOG.info("VDI Error details: %s", out)

        raise Exception(
            "VDI '%(vdi_path)s' has an invalid %(part)s: '%(details)s'"
            "%(extra)s" % {'vdi_path': vdi_path, 'part': part,
                           'details': details, 'extra': extra})

    LOG.info("VDI is valid: %s", vdi_path)


def _validate_vdi_chain(vdi_path):
    """Check VDI chain

    This check ensures that the parent pointers on the VHDs are valid
    before we move the VDI chain to the SR. This is *very* important
    because a bad parent pointer will corrupt the SR causing a cascade of
    failures.
    """
    def get_parent_path(path):
        query_cmd = ["vhd-util", "query", "-n", path, "-p"]
        out = run_command(query_cmd, ok_exit_codes=[0, 22])
        first_line = out.splitlines()[0].strip()

        if first_line.endswith(".vhd"):
            return first_line
        elif 'has no parent' in first_line:
            return None
        elif 'query failed' in first_line:
            raise Exception("VDI '%s' not present which breaks"
                            " the VDI chain, bailing out" % path)
        else:
            raise Exception("Unexpected output '%s' from vhd-util" % out)

    cur_path = vdi_path
    while cur_path:
        _validate_vhd(cur_path)
        cur_path = get_parent_path(cur_path)


def _validate_sequenced_vhds(staging_path):
    # This check ensures that the VHDs in the staging area are sequenced
    # properly from 0 to n-1 with no gaps.
    seq_num = 0
    filenames = os.listdir(staging_path)
    for filename in filenames:
        if not filename.endswith('.vhd'):
            continue

        # Ignore legacy swap embedded in the image, generated on-the-fly now
        if filename == "swap.vhd":
            continue

        vhd_path = os.path.join(staging_path, "%d.vhd" % seq_num)
        if not os.path.exists(vhd_path):
            raise Exception("Corrupt image. Expected seq number: %d. Files: %s"
                            % (seq_num, filenames))

        seq_num += 1


def import_vhds(sr_path, staging_path, uuid_stack):
    """Move VHDs from staging area into the SR.

    The staging area is necessary because we need to perform some fixups
    (assigning UUIDs, relinking the VHD chain) before moving into the SR,
    otherwise the SR manager process could potentially delete the VHDs out from
    under us.

    Returns: A dict of imported VHDs:

        {'root': {'uuid': 'ffff-aaaa'}}
    """
    _handle_old_style_images(staging_path)
    _validate_sequenced_vhds(staging_path)

    files_to_move = []

    # Collect sequenced VHDs and assign UUIDs to them
    seq_num = 0
    while True:
        orig_vhd_path = os.path.join(staging_path, "%d.vhd" % seq_num)
        if not os.path.exists(orig_vhd_path):
            break

        # Rename (0, 1 .. N).vhd -> aaaa-bbbb-cccc-dddd.vhd
        vhd_uuid = uuid_stack.pop()
        vhd_path = os.path.join(staging_path, "%s.vhd" % vhd_uuid)
        _rename(orig_vhd_path, vhd_path)

        if seq_num == 0:
            leaf_vhd_path = vhd_path
            leaf_vhd_uuid = vhd_uuid

        files_to_move.append(vhd_path)
        seq_num += 1

    # Re-link VHDs, in reverse order, from base-copy -> leaf
    parent_path = None
    for vhd_path in reversed(files_to_move):
        if parent_path:
            # Link to parent
            modify_cmd = ["vhd-util", "modify", "-n", vhd_path,
                          "-p", parent_path]
            run_command(modify_cmd)

        parent_path = vhd_path

    # Sanity check the leaf VHD
    _assert_vhd_not_hidden(leaf_vhd_path)
    _validate_vdi_chain(leaf_vhd_path)

    # Move files into SR
    for orig_path in files_to_move:
        new_path = os.path.join(sr_path, os.path.basename(orig_path))
        _rename(orig_path, new_path)

    imported_vhds = dict(root=dict(uuid=leaf_vhd_uuid))
    return imported_vhds


def prepare_staging_area(sr_path, staging_path, vdi_uuids, seq_num=0):
    """Hard-link VHDs into staging area."""
    for vdi_uuid in vdi_uuids:
        source = os.path.join(sr_path, "%s.vhd" % vdi_uuid)
        link_name = os.path.join(staging_path, "%d.vhd" % seq_num)
        _link(source, link_name)
        seq_num += 1


def create_tarball(fileobj, path, callback=None, compression_level=None):
    """Create a tarball from a given path.

    :param fileobj: a file-like object holding the tarball byte-stream.
                    If None, then only the callback will be used.
    :param path: path to create tarball from
    :param callback: optional callback to call on each chunk written
    :param compression_level: compression level, e.g., 9 for gzip -9.
    """
    tar_cmd = ["tar", "-zc", "--directory=%s" % path, "."]
    env = os.environ.copy()
    if compression_level and 1 <= compression_level <= 9:
        env["GZIP"] = "-%d" % compression_level
    tar_proc = make_subprocess(tar_cmd, stdout=True, stderr=True, env=env)

    try:
        while True:
            chunk = tar_proc.stdout.read(CHUNK_SIZE)
            if chunk == '':
                break

            if callback:
                callback(chunk)

            if fileobj:
                fileobj.write(chunk)
    except Exception:
        try_kill_process(tar_proc)
        raise

    finish_subprocess(tar_proc, tar_cmd)


def extract_tarball(fileobj, path, callback=None):
    """Extract a tarball to a given path.

    :param fileobj: a file-like object holding the tarball byte-stream
    :param path: path to extract tarball into
    :param callback: optional callback to call on each chunk read
    """
    tar_cmd = ["tar", "-zx", "--directory=%s" % path]
    tar_proc = make_subprocess(tar_cmd, stderr=True, stdin=True)

    try:
        while True:
            chunk = fileobj.read(CHUNK_SIZE)
            if chunk == '':
                break

            if callback:
                callback(chunk)

            tar_proc.stdin.write(chunk)

            # NOTE(tpownall): If we do not poll for the tar process exit
            # code when tar has exited pre maturely there is the chance
            # that tar will become a defunct zombie child under glance plugin
            # and re parented under init forever waiting on the stdin pipe to
            # close.  Polling for the exit code allows us to break the pipe.
            returncode = tar_proc.poll()
            tar_pid = tar_proc.pid
            if returncode is not None:
                LOG.error("tar extract with process id '%(pid)s' "
                          "exited early with '%(rc)s'",
                          {'pid': tar_pid, 'rc': returncode})
                raise SubprocessException(
                    ' '.join(tar_cmd), returncode, "", "")

    except SubprocessException:
        # no need to kill already dead process
        raise
    except Exception:
        LOG.exception("Failed while sending data to tar pid: %s", tar_pid)
        try_kill_process(tar_proc)
        raise

    finish_subprocess(tar_proc, tar_cmd)


def make_dev_path(dev, partition=None, base='/dev'):
    """Return a path to a particular device.

    >>> make_dev_path('xvdc')
    /dev/xvdc

    >>> make_dev_path('xvdc', 1)
    /dev/xvdc1
    """
    path = os.path.join(base, dev)
    if partition:
        path += str(partition)
    return path


def _handle_serialization(func):
    def wrapped(session, params):
        params = pickle.loads(params['params'])
        rv = func(session, *params['args'], **params['kwargs'])
        return pickle.dumps(rv)
    return wrapped


def register_plugin_calls(*funcs):
    """Wrapper around XenAPIPlugin.dispatch which handles pickle serialization.

    """
    wrapped_dict = {}
    for func in funcs:
        wrapped_dict[func.__name__] = _handle_serialization(func)
    XenAPIPlugin.dispatch(wrapped_dict)