File: torrent.py

package info (click to toggle)
mat2 0.14.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,428 kB
  • sloc: python: 3,758; makefile: 7
file content (128 lines) | stat: -rw-r--r-- 4,340 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import logging
from typing import Union, Dict, List, Tuple

from . import abstract


class TorrentParser(abstract.AbstractParser):
    mimetypes = {'application/x-bittorrent', }
    allowlist = {b'announce', b'announce-list', b'info'}

    def __init__(self, filename):
        super().__init__(filename)
        with open(self.filename, 'rb') as f:
            self.dict_repr = _BencodeHandler().bdecode(f.read())
        if self.dict_repr is None:
            raise ValueError

    def get_meta(self) -> Dict[str, Union[str, Dict]]:
        metadata = {}
        for key, value in self.dict_repr.items():
            if key not in self.allowlist:
                metadata[key.decode('utf-8')] = value
        return metadata

    def remove_all(self) -> bool:
        cleaned = dict()
        for key, value in self.dict_repr.items():
            if key in self.allowlist:
                cleaned[key] = value
        with open(self.output_filename, 'wb') as f:
            f.write(_BencodeHandler().bencode(cleaned))
        self.dict_repr = cleaned  # since we're stateful
        return True


class _BencodeHandler:
    """
    Since bencode isn't that hard to parse,
    mat2 comes with its own parser, based on the spec
    https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
    """
    def __init__(self):
        self.__decode_func = {
            ord('d'): self.__decode_dict,
            ord('i'): self.__decode_int,
            ord('l'): self.__decode_list,
        }
        for i in range(0, 10):
            self.__decode_func[ord(str(i))] = self.__decode_string

        self.__encode_func = {
            bytes: self.__encode_string,
            dict: self.__encode_dict,
            int: self.__encode_int,
            list: self.__encode_list,
        }

    @staticmethod
    def __decode_int(s: bytes) -> Tuple[int, bytes]:
        s = s[1:]
        next_idx = s.index(b'e')
        if s.startswith(b'-0'):
            raise ValueError  # negative zero doesn't exist
        elif s.startswith(b'0') and next_idx != 1:
            raise ValueError  # no leading zero except for zero itself
        return int(s[:next_idx]), s[next_idx+1:]

    @staticmethod
    def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
        colon = s.index(b':')
        # FIXME Python3 is broken here, the call to `ord` shouldn't be needed,
        # but apparently it is. This is utterly idiotic.
        if (s[0] == ord('0') or s[0] == '0') and colon != 1:
            raise ValueError
        str_len = int(s[:colon])
        s = s[1:]
        return s[colon:colon+str_len], s[colon+str_len:]

    def __decode_list(self, s: bytes) -> Tuple[List, bytes]:
        ret = list()
        s = s[1:]  # skip leading `l`
        while s[0] != ord('e'):
            value, s = self.__decode_func[s[0]](s)
            ret.append(value)
        return ret, s[1:]

    def __decode_dict(self, s: bytes) -> Tuple[Dict, bytes]:
        ret = dict()
        s = s[1:]  # skip leading `d`
        while s[0] != ord(b'e'):
            key, s = self.__decode_string(s)
            ret[key], s = self.__decode_func[s[0]](s)
        return ret, s[1:]

    @staticmethod
    def __encode_int(x: bytes) -> bytes:
        return b'i' + bytes(str(x), 'utf-8') + b'e'

    @staticmethod
    def __encode_string(x: bytes) -> bytes:
        return bytes((str(len(x))), 'utf-8') + b':' + x

    def __encode_list(self, x: str) -> bytes:
        ret = b''
        for i in x:
            ret += self.__encode_func[type(i)](i)
        return b'l' + ret + b'e'

    def __encode_dict(self, x: dict) -> bytes:
        ret = b''
        for key, value in sorted(x.items()):
            ret += self.__encode_func[type(key)](key)
            ret += self.__encode_func[type(value)](value)
        return b'd' + ret + b'e'

    def bencode(self, s: Union[Dict, List, bytes, int]) -> bytes:
        return self.__encode_func[type(s)](s)

    def bdecode(self, s: bytes) -> Union[Dict, None]:
        try:
            ret, trail = self.__decode_func[s[0]](s)
        except (IndexError, KeyError, ValueError) as e:
            logging.warning("Not a valid bencoded string: %s", e)
            return None
        if trail != b'':
            logging.warning("Invalid bencoded value (data after valid prefix)")
            return None
        return ret