File: test_mongodb.py

package info (click to toggle)
ipython 2.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 28,032 kB
  • ctags: 15,433
  • sloc: python: 73,792; makefile: 428; sh: 297
file content (56 lines) | stat: -rw-r--r-- 1,563 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Tests for mongodb backend

Authors:

* Min RK
"""

#-------------------------------------------------------------------------------
#  Copyright (C) 2011  The IPython Development Team
#
#  Distributed under the terms of the BSD License.  The full license is in
#  the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------

#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------

import os

from unittest import TestCase

from nose import SkipTest

from pymongo import Connection
from IPython.parallel.controller.mongodb import MongoDB

from . import test_db

conn_kwargs = {}
if 'DB_IP' in os.environ:
    conn_kwargs['host'] = os.environ['DB_IP']
if 'DBA_MONGODB_ADMIN_URI' in os.environ:
    # On ShiningPanda, we need a username and password to connect. They are
    # passed in a mongodb:// URI.
    conn_kwargs['host'] = os.environ['DBA_MONGODB_ADMIN_URI']
if 'DB_PORT' in os.environ:
    conn_kwargs['port'] = int(os.environ['DB_PORT'])

try:
    c = Connection(**conn_kwargs)
except Exception:
    c=None

class TestMongoBackend(test_db.TaskDBTest, TestCase):
    """MongoDB backend tests"""

    def create_db(self):
        try:
            return MongoDB(database='iptestdb', _connection=c)
        except Exception:
            raise SkipTest("Couldn't connect to mongodb")

def teardown(self):
    if c is not None:
        c.drop_database('iptestdb')