File: TestRunner.py

package info (click to toggle)
offlineimap3 0.0~git20211018.e64c254%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,240 kB
  • sloc: python: 8,089; sh: 586; makefile: 81
file content (253 lines) | stat: -rw-r--r-- 10,135 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# Copyright (C) 2012- Sebastian Spaeth & contributors
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
import os
import re
import shutil
import subprocess
import tempfile
import random
import imaplib2 as imaplib
from offlineimap.CustomConfig import CustomConfigParser
from . import default_conf

random.seed()


class OLITestLib:
    cred_file = None
    testdir = None
    """Absolute path of the current temporary test directory"""
    cmd = None
    """command that will be executed to invoke offlineimap"""

    def __init__(self, cred_file=None, cmd='offlineimap'):
        """

        :param cred_file: file of the configuration
            snippet for authenticating against the test IMAP server(s).
        :param cmd: command that will be executed to invoke offlineimap"""
        OLITestLib.cred_file = cred_file
        if not os.path.isfile(cred_file):
            raise UserWarning("Please copy 'credentials.conf.sample' to '%s' "
                              "and set your credentials there." % cred_file)
        OLITestLib.cmd = cmd

    @classmethod
    def create_test_dir(cls, suffix=''):
        """Creates a test directory and places OLI config there

        Note that this is a class method. There can only be one test
        directory at a time. OLITestLib is not suited for running
        several tests in parallel.  The user is responsible for
        cleaning that up herself."""
        assert cls.cred_file is not None
        # creating temporary dir for testing in same dir as credentials.conf
        cls.testdir = os.path.abspath(
            tempfile.mkdtemp(prefix='tmp_%s_' % suffix,
                             dir=os.path.dirname(cls.cred_file)))
        cls.write_config_file()
        return cls.testdir

    @classmethod
    def get_default_config(cls):
        """Creates a default ConfigParser file and returns it

        The returned config can be manipulated and then saved with
        write_config_file()"""
        # TODO, only do first time and cache then for subsequent calls?
        assert cls.cred_file is not None
        assert cls.testdir is not None
        config = CustomConfigParser()
        config.readfp(default_conf)
        default_conf.seek(0)  # rewind config_file to start
        config.read(cls.cred_file)
        config.set("general", "metadata", cls.testdir)
        return config

    @classmethod
    def write_config_file(cls, config=None):
        """Creates a OLI configuration file

        It is created in testdir (so create_test_dir has to be called
        earlier) using the credentials information given (so they had
        to be set earlier). Failure to do either of them will raise an
        AssertionException. If config is None, a default one will be
        used via get_default_config, otherwise it needs to be a config
        object derived from that."""
        if config is None:
            config = cls.get_default_config()
        localfolders = os.path.join(cls.testdir, 'mail')
        config.set("Repository Maildir", "localfolders", localfolders)
        with open(os.path.join(cls.testdir, 'offlineimap.conf'), "wt") as f:
            config.write(f)

    @classmethod
    def delete_test_dir(cls):
        """Deletes the current test directory

        The users is responsible for cleaning that up herself."""
        if os.path.isdir(cls.testdir):
            shutil.rmtree(cls.testdir)

    @classmethod
    def run_OLI(cls):
        """Runs OfflineImap

        :returns: (rescode, stdout (as unicode))
        """
        try:
            output = subprocess.check_output(
                [cls.cmd,
                 "-c%s" % os.path.join(cls.testdir, 'offlineimap.conf')],
                shell=False)
        except subprocess.CalledProcessError as e:
            return e.returncode, e.output.decode('utf-8')
        return 0, output.decode('utf-8')

    @classmethod
    def delete_remote_testfolders(cls, reponame=None):
        """Delete all INBOX.OLITEST* folders on the remote IMAP repository

        reponame: All on `reponame` or all IMAP-type repositories if None"""
        config = cls.get_default_config()
        if reponame:
            sections = ['Repository {0}'.format(reponame)]
        else:
            sections = [r for r in config.sections()
                        if r.startswith('Repository')]
            sections = [s for s in sections if config.get(s, 'Type').lower() == 'imap']
        for sec in sections:
            # Connect to each IMAP repo and delete all folders
            # matching the folderfilter setting. We only allow basic
            # settings and no fancy password getting here...
            # 1) connect and get dir listing
            host = config.get(sec, 'remotehost')
            user = config.get(sec, 'remoteuser')
            passwd = config.get(sec, 'remotepass')
            imapobj = imaplib.IMAP4(host)
            imapobj.login(user, passwd)
            res_t, data = imapobj.list()
            assert res_t == 'OK'
            dirs = []
            for d in data:
                if d == '':
                    continue
                if isinstance(d, tuple):
                    # literal (unquoted)
                    folder = '"%s"' % d[1].replace('"', '\\"')
                else:
                    m = re.search(br'''
                        [ ]                     # space
                        (?P<dir>
                        (?P<quote>"?)           # starting quote
                        ([^"]|\\")*             # a non-quote or a backslashded quote
                        (?P=quote))$            # ending quote
                        ''', d, flags=re.VERBOSE)
                    folder = m.group('dir').decode('utf-8')
                    if not m.group('quote'):
                        folder = '"%s"' % folder
                # folder = folder.replace(br'\"', b'"') # remove quoting
                dirs.append(folder)
            # 2) filter out those not starting with INBOX.OLItest and del...
            dirs = [d for d in dirs
                    if d.startswith('"INBOX.OLItest')
                    or d.startswith('"INBOX/OLItest')]
            for folder in dirs:
                res_t, data = imapobj.delete(folder)
                assert res_t == 'OK', "Folder deletion of {0} failed with error" \
                                      ":\n{1} {2}".format(folder.decode('utf-8'), res_t, data)
            imapobj.logout()

    @classmethod
    def create_maildir(cls, folder):
        """Create empty maildir 'folder' in our test maildir

        Does not fail if it already exists"""
        assert cls.testdir is not None
        maildir = os.path.join(cls.testdir, 'mail', folder)
        for subdir in ('', 'tmp', 'cur', 'new'):
            try:
                os.makedirs(os.path.join(maildir, subdir))
            except OSError as e:
                if e.errno != 17:  # 'already exists' is ok.
                    raise

    @classmethod
    def delete_maildir(cls, folder):
        """Delete maildir 'folder' in our test maildir

        Does not fail if not existing"""
        assert cls.testdir is not None
        maildir = os.path.join(cls.testdir, 'mail', folder)
        shutil.rmtree(maildir, ignore_errors=True)

    @classmethod
    def create_mail(cls, folder, mailfile=None, content=None):
        """Create a mail in  maildir 'folder'/new

        Use default mailfilename if not given.
        Use some default content if not given"""
        assert cls.testdir is not None
        while True:  # Loop till we found a unique filename
            mailfile = '{0}:2,'.format(random.randint(0, 999999999))
            mailfilepath = os.path.join(cls.testdir, 'mail',
                                        folder, 'new', mailfile)
            if not os.path.isfile(mailfilepath):
                break
        with open(mailfilepath, "wb") as mailf:
            mailf.write(b'''From: test <test@offlineimap.org>
Subject: Boo
Date: 1 Jan 1980
To: test@offlineimap.org

Content here.''')

    @classmethod
    def count_maildir_mails(cls, folder):
        """Returns the number of mails in maildir 'folder'

        Counting only those in cur&new (ignoring tmp)."""
        assert cls.testdir is not None
        maildir = os.path.join(cls.testdir, 'mail', folder)

        boxes, mails = 0, 0
        for dirpath, dirs, files in os.walk(maildir, False):
            if set(dirs) == set(['cur', 'new', 'tmp']):
                # New maildir folder
                boxes += 1
                # raise RuntimeError("%s is not Maildir" % maildir)
            if dirpath.endswith(('/cur', '/new')):
                mails += len(files)
        return boxes, mails

    # find UID in a maildir filename
    re_uidmatch = re.compile(',U=(\d+)')

    @classmethod
    def get_maildir_uids(cls, folder):
        """Returns a list of maildir mail uids, 'None' if no valid uid"""
        assert cls.testdir is not None
        mailfilepath = os.path.join(cls.testdir, 'mail', folder)
        assert os.path.isdir(mailfilepath)
        ret = []
        for dirpath, dirs, files in os.walk(mailfilepath):
            if not dirpath.endswith((os.path.sep + 'new', os.path.sep + 'cur')):
                continue  # only /new /cur are interesting
            for file in files:
                m = cls.re_uidmatch.search(file)
                uid = m.group(1) if m else None
                ret.append(uid)
        return ret