File: test_webhdfs.py

package info (click to toggle)
smart-open 7.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 980 kB
  • sloc: python: 8,054; sh: 90; makefile: 14
file content (68 lines) | stat: -rw-r--r-- 2,041 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
#
# This code is distributed under the terms and conditions
# from the MIT License (MIT).
#
"""
Sample code for WebHDFS integration tests.
To run it working WebHDFS in your network is needed - simply
set _SO_WEBHDFS_BASE_URL env variable to webhdfs url you have
write access to.

For example on Amazon EMR WebHDFS is accessible on driver port 14000, so
it may look like:

$ export SO_WEBHDFS_BASE_URL=webhdfs://hadoop@your-emr-driver:14000/tmp/
$ py.test integration-tests/test_webhdfs.py
"""
import json
import os
import smart_open
from smart_open.webhdfs import WebHdfsException
import pytest

_SO_WEBHDFS_BASE_URL = os.environ.get("SO_WEBHDFS_BASE_URL")
assert (
    _SO_WEBHDFS_BASE_URL is not None
), "please set the SO_WEBHDFS_BASE_URL environment variable"


def make_url(path):
    return "{base_url}/{path}".format(
        base_url=_SO_WEBHDFS_BASE_URL.rstrip("/"), path=path.lstrip("/")
    )


def test_write_and_read():
    with smart_open.open(make_url("test2.txt"), "w") as f:
        f.write("write_test\n")
    with smart_open.open(make_url("test2.txt"), "r") as f:
        assert f.read() == "write_test\n"


def test_binary_write_and_read():
    with smart_open.open(make_url("test3.txt"), "wb") as f:
        f.write(b"binary_write_test\n")
    with smart_open.open(make_url("test3.txt"), "rb") as f:
        assert f.read() == b"binary_write_test\n"


def test_not_found():
    with pytest.raises(WebHdfsException) as exc_info:
        with smart_open.open(make_url("not_existing"), "r") as f:
            assert f.read()
    assert exc_info.value.status_code == 404


def test_quoted_path():
    with smart_open.open(make_url("test_%40_4.txt"), "w") as f:
        f.write("write_test\n")

    with smart_open.open(make_url("?op=LISTSTATUS"), "r") as f:
        data = json.load(f)
        filenames = [
            entry["pathSuffix"] for entry in data["FileStatuses"]["FileStatus"]
        ]
        assert "test_@_4.txt" in filenames