File: ftpserver.py

package info (click to toggle)
linkchecker 10.6.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,132 kB
  • sloc: python: 13,154; makefile: 134; sh: 71; xml: 36; sql: 20; javascript: 19; php: 2
file content (110 lines) | stat: -rw-r--r-- 3,281 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# Copyright (C) 2010-2014 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Define http test support classes for LinkChecker tests.
"""
import os
import time
import threading
from ftplib import FTP

from tests import skip
from . import LinkCheckTest


TIMEOUT = 5


class FtpServerTest(LinkCheckTest):
    """Start/stop an FTP server that can be used for testing."""

    def __init__(self, methodName="runTest"):
        """Init test class and store default ftp server port."""
        super().__init__(methodName=methodName)
        self.host = "localhost"
        self.port = None

    def setUp(self):
        """Start a new FTP server in a new thread."""
        super().setUp()
        self.port = start_server(self.host, 0)
        self.assertFalse(self.port is None)

    def tearDown(self):
        """Send stop request to server."""
        try:
            stop_server(self.host, self.port)
        except Exception:
            pass


def start_server(host, port):
    def line_logger(self, msg):
        if "kill" in msg:
            raise KeyboardInterrupt()

    try:
        from pyftpdlib.authorizers import DummyAuthorizer
        from pyftpdlib.handlers import FTPHandler
        from pyftpdlib.servers import FTPServer
        from pyftpdlib import __ver__ as pyftpdlib_version
    except ImportError:
        skip("pyftpdlib is not available")
        return
    authorizer = DummyAuthorizer()
    datadir = os.path.join(os.path.dirname(__file__), "data")
    authorizer.add_anonymous(datadir)

    # Instantiate FTP handler class
    ftp_handler = FTPHandler
    ftp_handler.authorizer = authorizer
    ftp_handler.timeout = TIMEOUT
    ftp_handler.logline = line_logger

    # Define a customized banner (string returned when client connects)
    ftp_handler.banner = "pyftpdlib %s based ftpd ready." % pyftpdlib_version

    # Instantiate FTP server class and listen to host:port
    address = (host, port)
    server = FTPServer(address, ftp_handler)
    port = server.address[1]
    t = threading.Thread(None, server.serve_forever)
    t.start()
    # wait for server to start up
    tries = 0
    while tries < 5:
        tries += 1
        try:
            ftp = FTP()
            ftp.connect(host, port, TIMEOUT)
            ftp.login()
            ftp.close()
            break
        except Exception:
            time.sleep(0.5)
    return port


def stop_server(host, port):
    """Stop a running FTP server."""
    ftp = FTP()
    ftp.connect(host, port, TIMEOUT)
    ftp.login()
    try:
        ftp.sendcmd("kill")
    except EOFError:
        pass
    ftp.close()