File: ranges.py

package info (click to toggle)
virtnbdbackup 2.42-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 672 kB
  • sloc: python: 4,426; makefile: 9
file content (108 lines) | stat: -rw-r--r-- 3,743 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python3
"""
Copyright (C) 2023  Michael Ablassmeier <abi@grinser.de>

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
import os
import logging
import json
from typing import List, Dict, Tuple, IO
from libvirtnbdbackup import common as lib
from libvirtnbdbackup import output
from libvirtnbdbackup.output.exceptions import OutputException
from libvirtnbdbackup.exceptions import RestoreError
from libvirtnbdbackup.sparsestream.exceptions import StreamFormatException


def _parse(stream, sTypes, reader) -> Tuple[List, Dict]:
    """Read block offsets from backup stream image"""
    try:
        kind, start, length = stream.readFrame(reader)
        meta = stream.loadMetadata(reader.read(length))
    except StreamFormatException as errmsg:
        logging.error("Unable to read metadata header: %s", errmsg)
        raise RestoreError from errmsg

    if lib.isCompressed(meta):
        logging.error("Mapping compressed images currently not supported.")
        raise RestoreError

    assert reader.read(len(sTypes.TERM)) == sTypes.TERM

    dataRanges: List = []
    count: int = 0
    while True:
        kind, start, length = stream.readFrame(reader)
        if kind == sTypes.STOP:
            dataRanges[-1]["nextBlockOffset"] = None
            break

        blockInfo = {}
        blockInfo["count"] = count
        blockInfo["offset"] = reader.tell()
        blockInfo["originalOffset"] = start
        blockInfo["nextOriginalOffset"] = start + length
        blockInfo["length"] = length
        blockInfo["data"] = kind == sTypes.DATA
        blockInfo["file"] = os.path.abspath(reader.name)
        blockInfo["inc"] = meta["incremental"]

        if kind == sTypes.DATA:
            reader.seek(length, os.SEEK_CUR)
            assert reader.read(len(sTypes.TERM)) == sTypes.TERM

        nextBlockOffset = reader.tell() + sTypes.FRAME_LEN
        blockInfo["nextBlockOffset"] = nextBlockOffset
        dataRanges.append(blockInfo)
        count += 1

    return dataRanges, meta


def get(args, stream, sTypes, dataFiles: List) -> List:
    """Get data ranges for each file specified"""
    dataRanges = []
    for dFile in dataFiles:
        try:
            reader = output.openfile(dFile, "rb")
        except OutputException as e:
            logging.error("[%s]: [%s]", dFile, e)
            raise RestoreError from e

        Range, meta = _parse(stream, sTypes, reader)
        if Range is False or meta is False:
            logging.error("Unable to read meta header from backup file.")
            raise RestoreError("Invalid header")
        dataRanges.extend(Range)

        if args.verbose is True:
            logging.info(json.dumps(dataRanges, indent=4))
        else:
            logging.info(
                "Parsed [%s] block offsets from file [%s]", len(dataRanges), dFile
            )
        reader.close()

    return dataRanges


def dump(tfile: IO, dataRanges: List) -> bool:
    """Dump block map to temporary file"""
    try:
        tfile.write(json.dumps(dataRanges, indent=4).encode())
        return True
    except OSError as e:
        logging.error("Unable to write blockmap file: %s", e)
        return False