File: test-atomictempfile.py

package info (click to toggle)
mercurial 4.8.2-1%2Bdeb10u1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 41,932 kB
  • sloc: python: 150,616; ansic: 39,675; tcl: 3,715; lisp: 1,448; sh: 1,285; makefile: 569; cpp: 291; xml: 36; sql: 30
file content (125 lines) | stat: -rw-r--r-- 4,308 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from __future__ import absolute_import

import glob
import os
import shutil
import stat
import tempfile
import unittest

from mercurial import (
    pycompat,
    util,
)
atomictempfile = util.atomictempfile

if pycompat.ispy3:
    xrange = range

class testatomictempfile(unittest.TestCase):
    def setUp(self):
        self._testdir = tempfile.mkdtemp(b'atomictempfiletest')
        self._filename = os.path.join(self._testdir, b'testfilename')

    def tearDown(self):
        shutil.rmtree(self._testdir, True)

    def testsimple(self):
        file = atomictempfile(self._filename)
        self.assertFalse(os.path.isfile(self._filename))
        tempfilename = file._tempname
        self.assertTrue(tempfilename in glob.glob(
            os.path.join(self._testdir, b'.testfilename-*')))

        file.write(b'argh\n')
        file.close()

        self.assertTrue(os.path.isfile(self._filename))
        self.assertTrue(tempfilename not in glob.glob(
            os.path.join(self._testdir, b'.testfilename-*')))

    # discard() removes the temp file without making the write permanent
    def testdiscard(self):
        file = atomictempfile(self._filename)
        (dir, basename) = os.path.split(file._tempname)

        file.write(b'yo\n')
        file.discard()

        self.assertFalse(os.path.isfile(self._filename))
        self.assertTrue(basename not in os.listdir(b'.'))

    # if a programmer screws up and passes bad args to atomictempfile, they
    # get a plain ordinary TypeError, not infinite recursion
    def testoops(self):
        with self.assertRaises(TypeError):
            atomictempfile()

    # checkambig=True avoids ambiguity of timestamp
    def testcheckambig(self):
        def atomicwrite(checkambig):
            f = atomictempfile(self._filename, checkambig=checkambig)
            f.write(b'FOO')
            f.close()

        # try some times, because reproduction of ambiguity depends on
        # "filesystem time"
        for i in xrange(5):
            atomicwrite(False)
            oldstat = os.stat(self._filename)
            if oldstat[stat.ST_CTIME] != oldstat[stat.ST_MTIME]:
                # subsequent changing never causes ambiguity
                continue

            repetition = 3

            # repeat atomic write with checkambig=True, to examine
            # whether st_mtime is advanced multiple times as expected
            for j in xrange(repetition):
                atomicwrite(True)
            newstat = os.stat(self._filename)
            if oldstat[stat.ST_CTIME] != newstat[stat.ST_CTIME]:
                # timestamp ambiguity was naturally avoided while repetition
                continue

            # st_mtime should be advanced "repetition" times, because
            # all atomicwrite() occurred at same time (in sec)
            oldtime = (oldstat[stat.ST_MTIME] + repetition) & 0x7fffffff
            self.assertTrue(newstat[stat.ST_MTIME] == oldtime)
            # no more examination is needed, if assumption above is true
            break
        else:
            # This platform seems too slow to examine anti-ambiguity
            # of file timestamp (or test happened to be executed at
            # bad timing). Exit silently in this case, because running
            # on other faster platforms can detect problems
            pass

    def testread(self):
        with open(self._filename, 'wb') as f:
            f.write(b'foobar\n')
        file = atomictempfile(self._filename, mode=b'rb')
        self.assertTrue(file.read(), b'foobar\n')
        file.discard()

    def testcontextmanagersuccess(self):
        """When the context closes, the file is closed"""
        with atomictempfile(b'foo') as f:
            self.assertFalse(os.path.isfile(b'foo'))
            f.write(b'argh\n')
        self.assertTrue(os.path.isfile(b'foo'))

    def testcontextmanagerfailure(self):
        """On exception, the file is discarded"""
        try:
            with atomictempfile(b'foo') as f:
                self.assertFalse(os.path.isfile(b'foo'))
                f.write(b'argh\n')
                raise ValueError
        except ValueError:
            pass
        self.assertFalse(os.path.isfile(b'foo'))

if __name__ == '__main__':
    import silenttestrunner
    silenttestrunner.main(__name__)