File: test_chunking.py

package info (click to toggle)
graypy 2.1.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 312 kB
  • sloc: python: 1,175; sh: 114; makefile: 3
file content (172 lines) | stat: -rw-r--r-- 5,079 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""pytests for various GELF UDP message chunkers"""

import json
import logging
import struct
import zlib

import pytest

from graypy.handler import (
    GELFTruncatingChunker,
    GELFWarningChunker,
    BaseGELFChunker,
    BaseGELFHandler,
    SYSLOG_LEVELS,
    GELFChunkOverflowWarning,
    GELFTruncationFailureWarning,
)


@pytest.mark.parametrize(
    "gelf_chunker", [BaseGELFChunker, GELFWarningChunker, GELFTruncatingChunker]
)
def test_gelf_chunking(gelf_chunker):
    """Test various GELF chunkers"""
    message = b"12345"
    header = b"\x1e\x0f"
    chunks = list(gelf_chunker(chunk_size=2).chunk_message(message))
    expected = [
        (struct.pack("b", 0), struct.pack("b", 3), b"12"),
        (struct.pack("b", 1), struct.pack("b", 3), b"34"),
        (struct.pack("b", 2), struct.pack("b", 3), b"5"),
    ]

    assert len(chunks) == len(expected)

    for index, chunk in enumerate(chunks):
        expected_index, expected_chunks_count, expected_chunk = expected[index]
        assert header == chunk[:2]
        assert expected_index == chunk[10:11]
        assert expected_chunks_count == chunk[11:12]
        assert expected_chunk == chunk[12:]


def rebuild_gelf_bytes_from_udp_chunks(chunks):
    gelf_bytes = b""
    bsize = len(chunks[0])
    for chunk in chunks:
        if len(chunk) < bsize:
            gelf_bytes += chunk[-(bsize - len(chunk)) :]
        else:
            gelf_bytes += chunk[((2 + struct.calcsize("QBB")) - len(chunk)) :]
    return gelf_bytes


@pytest.mark.parametrize(
    "gelf_chunker", [BaseGELFChunker, GELFWarningChunker, GELFTruncatingChunker]
)
def test_gelf_chunkers(gelf_chunker):
    message = BaseGELFHandler().makePickle(
        logging.LogRecord(
            "test_gelf_chunkers", logging.INFO, None, None, "1" * 10, None, None
        )
    )
    chunks = list(gelf_chunker(chunk_size=2).chunk_message(message))
    assert len(chunks) <= 128


@pytest.mark.parametrize(
    "gelf_chunker", [BaseGELFChunker, GELFWarningChunker, GELFTruncatingChunker]
)
def test_gelf_chunkers_overflow(gelf_chunker):
    message = BaseGELFHandler().makePickle(
        logging.LogRecord(
            "test_gelf_chunkers_overflow",
            logging.INFO,
            None,
            None,
            "1" * 1000,
            None,
            None,
        )
    )
    chunks = list(gelf_chunker(chunk_size=1).chunk_message(message))
    assert len(chunks) <= 128


def test_chunk_overflow_truncate_uncompressed():
    message = BaseGELFHandler(compress=False).makePickle(
        logging.LogRecord(
            "test_chunk_overflow_truncate_uncompressed",
            logging.INFO,
            None,
            None,
            "1" * 1000,
            None,
            None,
        )
    )
    with pytest.warns(GELFChunkOverflowWarning):
        chunks = list(
            GELFTruncatingChunker(chunk_size=2, compress=False).chunk_message(message)
        )
    assert len(chunks) <= 128
    payload = rebuild_gelf_bytes_from_udp_chunks(chunks).decode("UTF-8")
    glef_json = json.loads(payload)
    assert glef_json["_chunk_overflow"] is True
    assert glef_json["short_message"] in "1" * 1000
    assert glef_json["level"] == SYSLOG_LEVELS.get(logging.ERROR, logging.ERROR)


def test_chunk_overflow_truncate_compressed():
    message = BaseGELFHandler(compress=True).makePickle(
        logging.LogRecord(
            "test_chunk_overflow_truncate_compressed",
            logging.INFO,
            None,
            None,
            "123412345" * 5000,
            None,
            None,
        )
    )
    with pytest.warns(GELFChunkOverflowWarning):
        chunks = list(
            GELFTruncatingChunker(chunk_size=2, compress=True).chunk_message(message)
        )
    assert len(chunks) <= 128
    payload = zlib.decompress(rebuild_gelf_bytes_from_udp_chunks(chunks)).decode(
        "UTF-8"
    )
    glef_json = json.loads(payload)
    assert glef_json["_chunk_overflow"] is True
    assert glef_json["short_message"] in "123412345" * 5000
    assert glef_json["level"] == SYSLOG_LEVELS.get(logging.ERROR, logging.ERROR)


def test_chunk_overflow_truncate_fail():
    message = BaseGELFHandler().makePickle(
        logging.LogRecord(
            "test_chunk_overflow_truncate_fail",
            logging.INFO,
            None,
            None,
            "1" * 1000,
            None,
            None,
        )
    )
    with pytest.warns(GELFTruncationFailureWarning):
        list(GELFTruncatingChunker(1).chunk_message(message))


def test_chunk_overflow_truncate_fail_large_inherited_field():
    message = BaseGELFHandler(
        facility="this is a really long facility" * 5000
    ).makePickle(
        logging.LogRecord(
            "test_chunk_overflow_truncate_fail",
            logging.INFO,
            None,
            None,
            "reasonable message",
            None,
            None,
        )
    )
    with pytest.warns(GELFTruncationFailureWarning):
        list(GELFTruncatingChunker(2).chunk_message(message))