File: _support.py

package info (click to toggle)
python-biopython 1.42-2
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 17,584 kB
  • ctags: 12,272
  • sloc: python: 80,461; xml: 13,834; ansic: 7,902; cpp: 1,855; sql: 1,144; makefile: 203
file content (158 lines) | stat: -rw-r--r-- 5,615 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright 2002 by Jeffrey Chang, Andrew Dalke.  All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license.  Please see the LICENSE file that should have been included
# as part of this package.

# This is based on some older code by Andrew Dalke.

"""Support code for dealing with registries.

Functions:
find_submodules   Find all the modules in a package.
load_module       Load a module and return it.  Raise ImportError if not found.
safe_load_module  Like load_module, but returns None if not found.

make_rate_limited_function   Limit the rate at which a function can run.
make_timed_function          Limit the amount of time a function can run.

make_cached_expression       Caches the make_parser method of expressions.

"""
import sys
import os
import time

from Bio.WWW import RequestLimiter

def find_submodules(modulename):
    """find_submodules(modulename) -> list of module names

    Look inside a package or module and recursively find all the
    modules that exist within it.
    
    """
    # First, figure out where this module exists in the file system.
    module = safe_load_module(modulename)
    if not module:   # This is not a valid python module or package.
        return []
    filename = module.__file__

    # If this is an actual module (rather than a package), then
    # there's no more submodules and we're done.
    if not filename.endswith("__init__.py") and \
       not filename.endswith("__init__.pyc") and \
       not filename.endswith("__init__.pyo"):
        return [modulename]

    # Since it's a package, get a list of all the modules inside it
    # and recurse on those.
    dirname = os.path.dirname(filename)
    submodulenames = {}    # prevent duplicates
    for filename in os.listdir(dirname):
        filename = os.path.splitext(filename)[0]
        if filename == '__init__':
            continue
        elif not filename:
            continue
        name = "%s.%s" % (modulename, filename)
        submodulenames[name] = 1
    submodulenames = submodulenames.keys()
    submodulenames.sort()

    submodules = []
    for name in submodulenames:
        try:
            x = find_submodules(name)
        except ImportError, x:
            raise
            pass   # ignore things that aren't valid modules (e.g. CVS)
        else:
            submodules.extend(x)

    return submodules

def load_module(modulename):
    """load_module(modulename) -> module"""
    try:
        module = __import__(modulename, {}, {}, modulename.split(".")[:-1])
    except SyntaxError, exc:
        raise
    except ImportError, exc:
        raise ImportError("%s during import of %r" % (exc, modulename)), \
              None, sys.exc_info()[2]
    return module

def safe_load_module(modulename):
    """safe_load_module(modulename) -> module or None"""
    try:
        module = load_module(modulename)
    except ImportError, x:
        if str(x).find("during import of") == -1:
            raise
        module = None
    return module

class make_rate_limited_function:
    """make_rate_limited_function(function, delay) -> callable object

    Create a version of function that does not run more often than
    once every delay seconds.

    """
    def __init__(self, function, delay):
        self.fn = function
        self.limiter = RequestLimiter(delay)
    def __call__(self, *args, **keywds):
        self.limiter.wait()
        return self.fn(*args, **keywds)

class make_timed_function:
    """make_timed_function(function, timeout[, retval2pickleable_fn][, pickleable2retval_fn]) -> callable object

    Create a version of function that times out if it does not
    complete within timeout seconds.

    Currently, there's an implementation limitation such that function
    must return a pickleable object (or nothing).  If the function
    returns an object that's not pickleable, then please set
    retval2pickleable_fn and pickleable2retval_fn to a pair of
    callbacks to convert the return value of the function to a
    pickleable form.  If this is impossible, then this function should
    not be used.

    """
    def __init__(self, function, timeout,
                 retval2pickleable_fn=None, pickleable2retval_fn=None):
        self.fn = function
        self.timeout = timeout
        self.retval2pickleable_fn = retval2pickleable_fn or (lambda x: x)
        self.pickleable2retval_fn = pickleable2retval_fn or (lambda x: x)
    def _call_fn(self, *args, **keywds):
        retval = self.fn(*args, **keywds)
        return self.retval2pickleable_fn(retval)
    def __call__(self, *args, **keywds):
        from Bio.MultiProc.copen import copen_fn
        end_time = time.time() + self.timeout
        handle = copen_fn(self._call_fn, *args, **keywds)
        while time.time() < end_time:
            if handle.poll():
                break
            time.sleep(0.01)
        else:
            handle.close()
            raise IOError, "timed out"
        pickleable = handle.read()
        return self.pickleable2retval_fn(pickleable)

# Only caches parsers for make_parser, not iterators
class make_cached_expression:
    """make_cached_expression(expression) -> cached expression object"""
    def __init__(self, expression):
        self.expression = expression
        self._parsers = {}   # debug_level -> parser
    def make_parser(self, debug_level=0):
        if self._parsers.get(debug_level) is None:
            parser = self.expression.make_parser(debug_level=debug_level)
            self._parsers[debug_level] = parser
        return self._parsers[debug_level].copy()