File: util_tests.py

package info (click to toggle)
intake 0.6.6-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 6,552 kB
  • sloc: python: 12,408; makefile: 37; sh: 14
file content (65 lines) | stat: -rw-r--r-- 1,761 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
#
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------

from contextlib import contextmanager
import os
import requests
import shutil
import subprocess
import sys
import tempfile
import time
import yaml

from .utils import make_path_posix

ex = sys.executable
here = os.path.dirname(__file__)
defcat = make_path_posix(os.path.join(
    here, 'cli', 'server', 'tests', 'catalog1.yml'))
PY2 = sys.version_info[0] == 2

@contextmanager
def tempdir():
    d = tempfile.mkdtemp()
    try:
        yield d
    finally:
        if os.path.exists(d):
            shutil.rmtree(d)


@contextmanager
def temp_conf(conf):
    with tempdir() as d:
        fn = os.path.join(d, 'conf.yaml')
        with open(fn, 'w') as f:
            yaml.dump(conf, f)
        yield fn


@contextmanager
def server(args=None, cat=None, env=None, wait=None, timeout=25):
    cat = cat if cat is not None else defcat
    args = list(args if args is not None else []) + []
    env = env if env is not None else {}
    cmd = [ex, '-m', 'intake.cli.server'] + list(args) + [cat]
    p = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    if wait is not None:
        while True:
            try:
                requests.get('http://localhost:%i/v1/info' % wait)
                break
            except:
                time.sleep(0.1)
                timeout -= 0.1
                assert timeout > 0
    try:
        yield p
    finally:
        p.terminate()