File: test_history.py

package info (click to toggle)
ipython 8.35.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,696 kB
  • sloc: python: 42,461; sh: 376; makefile: 243
file content (306 lines) | stat: -rw-r--r-- 11,698 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# coding: utf-8
"""Tests for the IPython tab-completion machinery.
"""
#-----------------------------------------------------------------------------
# Module imports
#-----------------------------------------------------------------------------

# stdlib
import io
import sys
import tempfile
from datetime import datetime
from pathlib import Path

from tempfile import TemporaryDirectory
# our own packages
from traitlets.config.loader import Config

from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges


def test_proper_default_encoding():
    assert sys.getdefaultencoding() == "utf-8"

def test_history():
    ip = get_ipython()
    with TemporaryDirectory() as tmpdir:
        tmp_path = Path(tmpdir)
        hist_manager_ori = ip.history_manager
        hist_file = tmp_path / "history.sqlite"
        try:
            ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
            hist = ["a=1", "def f():\n    test = 1\n    return test", "b='€Æ¾÷ß'"]
            for i, h in enumerate(hist, start=1):
                ip.history_manager.store_inputs(i, h)

            ip.history_manager.db_log_output = True
            # Doesn't match the input, but we'll just check it's stored.
            ip.history_manager.output_hist_reprs[3] = "spam"
            ip.history_manager.store_output(3)

            assert ip.history_manager.input_hist_raw == [""] + hist

            # Detailed tests for _get_range_session
            grs = ip.history_manager._get_range_session
            assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1]))
            assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:]))
            assert list(grs(output=True)) == list(
                zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"]))
            )

            # Check whether specifying a range beyond the end of the current
            # session results in an error (gh-804)
            ip.run_line_magic("hist", "2-500")

            # Check that we can write non-ascii characters to a file
            ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1"))
            ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2"))
            ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3"))
            ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4"))

            # New session
            ip.history_manager.reset()
            newcmds = ["z=5", "class X(object):\n    pass", "k='p'", "z=5"]
            for i, cmd in enumerate(newcmds, start=1):
                ip.history_manager.store_inputs(i, cmd)
            gothist = ip.history_manager.get_range(start=1, stop=4)
            assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds))
            # Previous session:
            gothist = ip.history_manager.get_range(-1, 1, 4)
            assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist))

            newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]

            # Check get_hist_tail
            gothist = ip.history_manager.get_tail(5, output=True,
                                                    include_latest=True)
            expected = [(1, 3, (hist[-1], "spam"))] \
                + [(s, n, (c, None)) for (s, n, c) in newhist]
            assert list(gothist) == expected

            gothist = ip.history_manager.get_tail(2)
            expected = newhist[-3:-1]
            assert list(gothist) == expected

            # Check get_hist_search

            gothist = ip.history_manager.search("*test*")
            assert list(gothist) == [(1, 2, hist[1])]

            gothist = ip.history_manager.search("*=*")
            assert list(gothist) == [
                (1, 1, hist[0]),
                (1, 2, hist[1]),
                (1, 3, hist[2]),
                newhist[0],
                newhist[2],
                newhist[3],
            ]

            gothist = ip.history_manager.search("*=*", n=4)
            assert list(gothist) == [
                (1, 3, hist[2]),
                newhist[0],
                newhist[2],
                newhist[3],
            ]

            gothist = ip.history_manager.search("*=*", unique=True)
            assert list(gothist) == [
                (1, 1, hist[0]),
                (1, 2, hist[1]),
                (1, 3, hist[2]),
                newhist[2],
                newhist[3],
            ]

            gothist = ip.history_manager.search("*=*", unique=True, n=3)
            assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]]

            gothist = ip.history_manager.search("b*", output=True)
            assert list(gothist) == [(1, 3, (hist[2], "spam"))]

            # Cross testing: check that magic %save can get previous session.
            testfilename = (tmp_path / "test.py").resolve()
            ip.run_line_magic("save", str(testfilename) + " ~1/1-3")
            with io.open(testfilename, encoding="utf-8") as testfile:
                assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n"

            # Duplicate line numbers - check that it doesn't crash, and
            # gets a new session
            ip.history_manager.store_inputs(1, "rogue")
            ip.history_manager.writeout_cache()
            assert ip.history_manager.session_number == 3

            # Check that session and line values are not just max values
            sessid, lineno, entry = newhist[-1]
            assert lineno > 1
            ip.history_manager.reset()
            lineno = 1
            ip.history_manager.store_inputs(lineno, entry)
            gothist = ip.history_manager.search("*=*", unique=True)
            hist = list(gothist)[-1]
            assert sessid < hist[0]
            assert hist[1:] == (lineno, entry)
        finally:
            # Ensure saving thread is shut down before we try to clean up the files
            ip.history_manager.save_thread.stop()
            # Forcibly close database rather than relying on garbage collection
            ip.history_manager.db.close()
            # Restore history manager
            ip.history_manager = hist_manager_ori


def test_extract_hist_ranges():
    instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
    expected = [(0, 1, 2),  # 0 == current session
                (2, 3, 4),
                (-4, 5, 7),
                (-4, 7, 10),
                (-9, 2, None),  # None == to end
                (-8, 1, None),
                (-7, 1, 6),
                (-10, 1, None)]
    actual = list(extract_hist_ranges(instr))
    assert actual == expected


def test_extract_hist_ranges_empty_str():
    instr = ""
    expected = [(0, 1, None)]  # 0 == current session, None == to end
    actual = list(extract_hist_ranges(instr))
    assert actual == expected


def test_magic_rerun():
    """Simple test for %rerun (no args -> rerun last line)"""
    ip = get_ipython()
    ip.run_cell("a = 10", store_history=True)
    ip.run_cell("a += 1", store_history=True)
    assert ip.user_ns["a"] == 11
    ip.run_cell("%rerun", store_history=True)
    assert ip.user_ns["a"] == 12

def test_timestamp_type():
    ip = get_ipython()
    info = ip.history_manager.get_session_info()
    assert isinstance(info[1], datetime)

def test_hist_file_config():
    cfg = Config()
    tfile = tempfile.NamedTemporaryFile(delete=False)
    cfg.HistoryManager.hist_file = Path(tfile.name)
    try:
        hm = HistoryManager(shell=get_ipython(), config=cfg)
        assert hm.hist_file == cfg.HistoryManager.hist_file
    finally:
        try:
            Path(tfile.name).unlink()
        except OSError:
            # same catch as in testing.tools.TempFileMixin
            # On Windows, even though we close the file, we still can't
            # delete it.  I have no clue why
            pass

def test_histmanager_disabled():
    """Ensure that disabling the history manager doesn't create a database."""
    cfg = Config()
    cfg.HistoryAccessor.enabled = False

    ip = get_ipython()
    with TemporaryDirectory() as tmpdir:
        hist_manager_ori = ip.history_manager
        hist_file = Path(tmpdir) / "history.sqlite"
        cfg.HistoryManager.hist_file = hist_file
        try:
            ip.history_manager = HistoryManager(shell=ip, config=cfg)
            hist = ["a=1", "def f():\n    test = 1\n    return test", "b='€Æ¾÷ß'"]
            for i, h in enumerate(hist, start=1):
                ip.history_manager.store_inputs(i, h)
            assert ip.history_manager.input_hist_raw == [""] + hist
            ip.history_manager.reset()
            ip.history_manager.end_session()
        finally:
            ip.history_manager = hist_manager_ori

    # hist_file should not be created
    assert hist_file.exists() is False


def test_get_tail_session_awareness():
    """Test .get_tail() is:
        - session specific in HistoryManager
        - session agnostic in HistoryAccessor
    same for .get_last_session_id()
    """
    ip = get_ipython()
    with TemporaryDirectory() as tmpdir:
        tmp_path = Path(tmpdir)
        hist_file = tmp_path / "history.sqlite"
        get_source = lambda x: x[2]
        hm1 = None
        hm2 = None
        ha = None
        try:
            # hm1 creates a new session and adds history entries,
            # ha catches up
            hm1 = HistoryManager(shell=ip, hist_file=hist_file)
            hm1_last_sid = hm1.get_last_session_id
            ha = HistoryAccessor(hist_file=hist_file)
            ha_last_sid = ha.get_last_session_id

            hist1 = ["a=1", "b=1", "c=1"]
            for i, h in enumerate(hist1 + [""], start=1):
                hm1.store_inputs(i, h)
            assert list(map(get_source, hm1.get_tail())) == hist1
            assert list(map(get_source, ha.get_tail())) == hist1
            sid1 = hm1_last_sid()
            assert sid1 is not None
            assert ha_last_sid() == sid1

            # hm2 creates a new session and adds entries,
            # ha catches up
            hm2 = HistoryManager(shell=ip, hist_file=hist_file)
            hm2_last_sid = hm2.get_last_session_id

            hist2 = ["a=2", "b=2", "c=2"]
            for i, h in enumerate(hist2 + [""], start=1):
                hm2.store_inputs(i, h)
            tail = hm2.get_tail(n=3)
            assert list(map(get_source, tail)) == hist2
            tail = ha.get_tail(n=3)
            assert list(map(get_source, tail)) == hist2
            sid2 = hm2_last_sid()
            assert sid2 is not None
            assert ha_last_sid() == sid2
            assert sid2 != sid1

            # but hm1 still maintains its point of reference
            # and adding more entries to it doesn't change others
            # immediate perspective
            assert hm1_last_sid() == sid1
            tail = hm1.get_tail(n=3)
            assert list(map(get_source, tail)) == hist1

            hist3 = ["a=3", "b=3", "c=3"]
            for i, h in enumerate(hist3 + [""], start=5):
                hm1.store_inputs(i, h)
            tail = hm1.get_tail(n=7)
            assert list(map(get_source, tail)) == hist1 + [""] + hist3
            tail = hm2.get_tail(n=3)
            assert list(map(get_source, tail)) == hist2
            tail = ha.get_tail(n=3)
            assert list(map(get_source, tail)) == hist2
            assert hm1_last_sid() == sid1
            assert hm2_last_sid() == sid2
            assert ha_last_sid() == sid2
        finally:
            if hm1:
                hm1.save_thread.stop()
                hm1.db.close()
            if hm2:
                hm2.save_thread.stop()
                hm2.db.close()
            if ha:
                ha.db.close()