File: ua_convert.py

package info (click to toggle)
python-fake-useragent 2.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,788 kB
  • sloc: python: 596; makefile: 146
file content (218 lines) | stat: -rwxr-xr-x 7,059 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Author: Melroy van den Berg

"""Description: Convert the user-agents.json file to JSONlines and directly remaps the keys."""
import argparse
import gzip
import json
from collections.abc import Iterable
from multiprocessing.pool import Pool
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional, TypedDict

import requests
from ua_parser import parse

from fake_useragent.utils import BrowserUserAgentData, find_browser_json_path

DEFAULT_URL = (
    "https://raw.githubusercontent.com/intoli/user-agents/main/src/user-agents.json.gz"
)


class SourceItem(TypedDict):
    """The schema for the source item that the source file must (at least) follow."""

    userAgent: str
    """The user agent string."""
    weight: float
    """Sampling probability for this user agent when random sampling. Currently has no effect."""
    deviceCategory: str
    """The device type for this user agent."""
    platform: str
    """System name for the user agent."""


def download_and_extract(source_url: str) -> list[SourceItem]:
    """Download the user-agents.json file from the given URL and extract it if necessary.

    Args:
        source_url (str): The URL to the user-agents.json file.

    Returns:
        list[SourceItem]: The source file loaded as a list of `SourceItem`s. In reality, the
            returned elements have more keys than the `SourceItem` schema, but we only use the
            keys defined in the schema.
    """
    response = requests.get(source_url, timeout=10)
    response.raise_for_status()

    if source_url.endswith(".gz"):
        with NamedTemporaryFile("wb") as temp_file:
            temp_file.write(response.content)

            with gzip.open(temp_file.name, "rb") as intermediate:
                contents = intermediate.read()
    else:
        contents = response.content

    return json.loads(contents)


def process_item(item: SourceItem) -> Optional[BrowserUserAgentData]:
    """Process a single item and return the transformed item."""
    # Parse the user agent string
    ua_result = parse(item["userAgent"])
    # Example output:
    # Result(
    #     user_agent=UserAgent(
    #         family="Mobile Safari", major="16", minor="2", patch=None, patch_minor=None
    #     ),
    #     os=OS(family="iOS", major="16", minor="2", patch=None, patch_minor=None),
    #     device=Device(family="iPhone", brand="Apple", model="iPhone"),
    #     string="Mozilla/5.0 (iPhone; CPU iPhone OS 16_2 like Mac OS X)
    #       AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Mobile/15E148 Safari/604.1",
    # )

    if not ua_result.user_agent:
        return None  # Skip this user-agent string

    browser_version = ".".join(
        part
        for part in [
            ua_result.user_agent.major,
            ua_result.user_agent.minor,
            ua_result.user_agent.patch,
            ua_result.user_agent.patch_minor,
        ]
        if part is not None
    )
    major_minor_version = ".".join(
        part
        for part in [
            ua_result.user_agent.major,
            ua_result.user_agent.minor,
        ]
        if part is not None
    )
    # The major_minor_version gets converted to a float to make it easier to compare
    if major_minor_version:
        browser_version_major_minor = float(major_minor_version)
    else:
        return None  # Skip this user-agent string

    if ua_result.os:
        os_version = ".".join(
            part
            for part in [
                ua_result.os.major,
                ua_result.os.minor,
                ua_result.os.patch,
                ua_result.os.patch_minor,
            ]
            if part is not None
        )
    else:
        os_version = None

    return {
        "useragent": item["userAgent"],
        "percent": item["weight"] * 100,
        "type": item["deviceCategory"],
        "device_brand": ua_result.device.brand if ua_result.device else None,
        "browser": ua_result.user_agent.family if ua_result.user_agent else None,
        "browser_version": browser_version,
        "browser_version_major_minor": browser_version_major_minor,
        "os": ua_result.os.family if ua_result.os else None,
        "os_version": os_version,
        "platform": item["platform"],
    }


def convert_useragents_formats(
    data: Iterable[SourceItem],
) -> list[BrowserUserAgentData]:
    """Convert the lines in Intoli's format to a JSONL file in our format.

    Args:
        data (Iterable[SourceItem]): The updated user agent data in Intoli's format,
            from their [user-agents](https://github.com/intoli/user-agents) library.

    Returns:
        list[BrowserUserAgentData]: The user agent data in our format.
    """
    with Pool() as pool:
        print(f"Using pool with {pool._processes} processes.")  # type: ignore[reportAttributeAccessIssue]; Pool has this attribute.
        results = pool.map(process_item, data)
    return [result for result in results if result is not None]


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Convert Intoli's user agent data to our JSONL format."
    )

    input_group = parser.add_argument_group(
        "Input source", "Define where to get the source data from."
    )
    exclusive_group = input_group.add_mutually_exclusive_group(required=True)
    exclusive_group.add_argument(
        "-i",
        "--input",
        help="Input JSON file path (default: %(const)s)",
        nargs="?",
        const=Path("user-agents.json"),
        type=Path,
    )
    exclusive_group.add_argument(
        "-d",
        "--download",
        help=(
            "Download source file from URL. Supports gzipped and non-gzipped files "
            "(default: %(const)s)"
        ),
        nargs="?",
        const=DEFAULT_URL,
        type=str,
    )

    parser.add_argument(
        "-o",
        "--output",
        help="Output JSONL file. Default overwrites current package file (default: %(default)s)",
        default=find_browser_json_path(),
        type=Path,
    )

    parser.add_argument(
        "-l",
        "--parse-limit",
        help="How many of the fetched user agent lines to parse (default: %(default)s)",
        default=None,
        type=lambda limit: None if limit is None else int(limit),
    )

    args = parser.parse_args()

    if args.download:
        print(f"Downloading data from {args.download}")
        data = download_and_extract(args.download)
    else:
        print(f"Reading data from {args.input}")
        with open(args.input, "r") as f:
            data = json.load(f)

    if args.parse_limit:
        print(f"Parsing only the first {args.parse_limit} items")
        data = data[: args.parse_limit]

    print("Processing data...")
    jsonl_converted = convert_useragents_formats(data)

    print(f"Writing data to {args.output}")
    with open(args.output, "w") as f:
        for item in jsonl_converted:
            f.write(json.dumps(item) + "\n")
    print("Done!")