File: test_tiktoken.py

package info (click to toggle)
tokenizers 0.20.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 5,480 kB
  • sloc: python: 4,499; javascript: 419; makefile: 124
file content (133 lines) | stat: -rw-r--r-- 5,071 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import time
import argparse
from datasets import load_dataset
from tiktoken.load import load_tiktoken_bpe
import tiktoken
from tokenizers import Tokenizer
from huggingface_hub import hf_hub_download
from typing import Tuple, List
from multiprocessing import Process

MODEL_ID = "meta-llama/Meta-Llama-3.1-8B"
DATASET = "facebook/xnli"
DATASET_CONFIG = "all_languages"
DEFAULT_THREADS = [2**i for i in range(8) if 2**i <= os.cpu_count()]


def format_byte_size(num_bytes: int) -> Tuple[str, str]:
    """Convert bytes to a human-readable format (KB, MB, GB)."""
    num_bytes_f = float(num_bytes)
    for unit in ["B", "KB", "MB", "GB", "TB"]:
        if num_bytes_f < 1024:
            return f"{num_bytes_f:.2f} {unit}", unit
        num_bytes_f /= 1024
    return f"{num_bytes_f:.2f} PB", "PB"


def benchmark_batch(model: str, documents: list[str], num_threads: int, document_length: float) -> None:
    os.environ["RAYON_NUM_THREADS"] = str(num_threads)
    num_bytes = sum(map(len, map(str.encode, documents)))
    readable_size, unit = format_byte_size(num_bytes)
    print(f"==============")
    print(f"num_threads: {num_threads}, data size: {readable_size}, documents: {len(documents)} Avg Length: {document_length:.0f}")
    filename = hf_hub_download(MODEL_ID, "original/tokenizer.model")
    mergeable_ranks = load_tiktoken_bpe(filename)
    pat_str = r"(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\r\n\p{L}\p{N}]?\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+"
    num_reserved_special_tokens = 256
    special_tokens = [
        "<|begin_of_text|>",
        "<|end_of_text|>",
        "<|reserved_special_token_0|>",
        "<|reserved_special_token_1|>",
        "<|reserved_special_token_2|>",
        "<|reserved_special_token_3|>",
        "<|start_header_id|>",
        "<|end_header_id|>",
        "<|reserved_special_token_4|>",
        "<|eot_id|>",  # end of turn
    ] + [
        f"<|reserved_special_token_{i}|>"
        for i in range(5, num_reserved_special_tokens - 5)
    ]
    num_base_tokens = len(mergeable_ranks)
    special_tokens = {
        token: num_base_tokens + i for i, token in enumerate(special_tokens)
    }
    enc = tiktoken.Encoding(
            name=model,
            pat_str=pat_str,
            mergeable_ranks=mergeable_ranks,
            special_tokens=special_tokens,
        )
    out = enc.encode("This is a test")

    hf_enc = Tokenizer.from_pretrained(model)
    out2 = hf_enc.encode("This is a test", add_special_tokens=False).ids

    assert out == out2, "sanity check"

    start = time.perf_counter_ns()
    enc.encode_ordinary_batch(documents, num_threads=num_threads)
    end = time.perf_counter_ns()

    readable_size, unit = format_byte_size(num_bytes / (end - start) * 1e9)
    print(f"tiktoken \t{readable_size}  / s")


    start = time.perf_counter_ns()
    hf_enc.encode_batch_fast(documents)
    end = time.perf_counter_ns()
    readable_size, unit = format_byte_size(num_bytes / (end - start) * 1e9)
    print(f"huggingface \t{readable_size} / s")


def test(model: str, dataset: str, dataset_config: str, threads: List[int]):
    dataset_xnli = load_dataset(dataset, dataset_config)

    input_lengths = [(10, False, True), (10_000, False, True), (10_000, False, False)]

    for num_threads in threads:
        for length, fuse, long in input_lengths:
            documents = []
            for i, item in enumerate(dataset_xnli["train"]):
                if i >= length:
                    break
                if long:
                    documents.append("".join(item["premise"].values()))
                else:
                    documents.append(item["premise"]["en"])
            if fuse:
                documents=["".join(documents)]

            document_length = sum(len(d) for d in documents) / len(documents)

            # Rayon thread pool is global to a process, we need to launch
            # separate processes in order to accurately use the correct number of threads.
            # Otherwise, we're simply running tokenizers in whatever tests comes first.
            # tokenizers does NOT provide a method to change the number of threads during
            # runtime.
            p = Process(target=benchmark_batch, args=(model, documents, num_threads, document_length))
            p.start()
            p.join()

            # benchmark_batch(model, documents, num_threads)


def main():

    parser = argparse.ArgumentParser(
                    prog='bench_tokenizer',
                    description='Getting a feel for speed when tokenizing',
    )
    parser.add_argument('-m', '--model', default=MODEL_ID, type=str)
    parser.add_argument('-d', '--dataset', default=DATASET, type=str)
    parser.add_argument('-ds', '--dataset-config', default=DATASET_CONFIG, type=str)
    parser.add_argument('-t', '--threads', nargs='+', default=DEFAULT_THREADS, type=int)
    args = parser.parse_args()
    test(args.model, args.dataset, args.dataset_config, args.threads)


# Call the function to run the benchmark
if __name__ == "__main__":
    main()