File: test_ftp.py

package info (click to toggle)
smart-open 7.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 980 kB
  • sloc: python: 8,054; sh: 90; makefile: 14
file content (123 lines) | stat: -rw-r--r-- 4,724 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import unicode_literals
import gzip
import pytest
from smart_open import open
import ssl
from functools import partial

# localhost has self-signed cert, see ci_helpers/helpers.sh:create_ftp_ftps_servers
ssl.create_default_context = partial(ssl.create_default_context, cafile="/etc/vsftpd.pem")


@pytest.fixture(params=[("ftp", 21), ("ftps", 90)])
def server_info(request):
    return request.param

def test_nonbinary(server_info):
    server_type = server_info[0]
    port_num = server_info[1]
    file_contents = "Test Test \n new test \n another tests"
    appended_content1 = "Added \n to end"

    with open(f"{server_type}://user:123@localhost:{port_num}/file", "w") as f:
        f.write(file_contents)

    with open(f"{server_type}://user:123@localhost:{port_num}/file", "r") as f:
        read_contents = f.read()
        assert read_contents == file_contents
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file", "a") as f:
        f.write(appended_content1)
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file", "r") as f:
        read_contents = f.read()
        assert read_contents == file_contents + appended_content1

def test_binary(server_info):
    server_type = server_info[0]
    port_num = server_info[1]
    file_contents = b"Test Test \n new test \n another tests"
    appended_content1 = b"Added \n to end"

    with open(f"{server_type}://user:123@localhost:{port_num}/file2", "wb") as f:
        f.write(file_contents)

    with open(f"{server_type}://user:123@localhost:{port_num}/file2", "rb") as f:
        read_contents = f.read()
        assert read_contents == file_contents
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file2", "ab") as f:
        f.write(appended_content1)
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file2", "rb") as f:
        read_contents = f.read()
        assert read_contents == file_contents + appended_content1

def test_compression(server_info):
    server_type = server_info[0]
    port_num = server_info[1]
    file_contents = "Test Test \n new test \n another tests"
    appended_content1 = "Added \n to end"

    with open(f"{server_type}://user:123@localhost:{port_num}/file.gz", "w") as f:
        f.write(file_contents)

    with open(f"{server_type}://user:123@localhost:{port_num}/file.gz", "r") as f:
        read_contents = f.read()
        assert read_contents == file_contents
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file.gz", "a") as f:
        f.write(appended_content1)
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file.gz", "r") as f:
        read_contents = f.read()
        assert read_contents == file_contents + appended_content1

    # ftp socket makefile returns a file whose name attribute is fileno() which is int
    # that can't be used to infer compression extension, so the calls above would
    # silently not use any compression (neither reading nor writing) so they would pass
    # pytest suppresses the logging.warning('unable to transparently decompress...')
    # so check here explicitly that the bytes on server are gzip compressed
    with open(
        f"{server_type}://user:123@localhost:{port_num}/file.gz",
        "rb",
        compression='disable',
    ) as f:
        read_contents = gzip.decompress(f.read()).decode()
        assert read_contents == file_contents + appended_content1

def test_line_endings_non_binary(server_info):
    server_type = server_info[0]
    port_num = server_info[1]
    B_CLRF = b'\r\n'
    CLRF = '\r\n'
    file_contents = f"Test Test {CLRF} new test {CLRF} another tests{CLRF}"

    with open(f"{server_type}://user:123@localhost:{port_num}/file3", "w") as f:
        f.write(file_contents)

    with open(f"{server_type}://user:123@localhost:{port_num}/file3", "r") as f:    
        for line in f:
            assert not CLRF in line
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file3", "rb") as f:    
        for line in f:
            assert B_CLRF in line

def test_line_endings_binary(server_info):
    server_type = server_info[0]
    port_num = server_info[1]
    B_CLRF = b'\r\n'
    CLRF = '\r\n'
    file_contents = f"Test Test {CLRF} new test {CLRF} another tests{CLRF}".encode('utf-8')

    with open(f"{server_type}://user:123@localhost:{port_num}/file4", "wb") as f:
        f.write(file_contents)

    with open(f"{server_type}://user:123@localhost:{port_num}/file4", "r") as f:    
        for line in f:
            assert not CLRF in line
    
    with open(f"{server_type}://user:123@localhost:{port_num}/file4", "rb") as f:    
        for line in f:
            assert B_CLRF in line