File: util.py

package info (click to toggle)
python-cogent 1.5.3-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 16,424 kB
  • ctags: 24,343
  • sloc: python: 134,200; makefile: 100; ansic: 17; sh: 10
file content (133 lines) | stat: -rw-r--r-- 4,025 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import re
from sqlalchemy import create_engine, MetaData, Table

__author__ = "Gavin Huttley"
__copyright__ = "Copyright 2007-2012, The Cogent Project"
__credits__ = ["Gavin Huttley"]
__license__ = "GPL"
__version__ = "1.5.3"
__maintainer__ = "Gavin Huttley"
__email__ = "Gavin.Huttley@anu.edu.au"
__status__ = "alpha"

class DisplayString(str):
    """provides a mechanism for customising the str() and repr() of objects"""
    def __new__(cls, arg, num_words=None, repr_length=None, with_quotes=False):
        new = str.__new__(cls, str(arg))
        new.num_words = num_words
        new.repr_length = repr_length or len(str(arg))
        new.with_quotes = with_quotes
        return new
    
    def __repr__(self):
        if self.num_words is not None:
            new = " ".join(self.split()[:self.num_words])
        elif self.repr_length != len(self):
            new = self[:self.repr_length]
        else:
            new = self
        if len(self) > len(new):
            new += '...'
        new = [new, "'%s'" % new][self.with_quotes]
        return new
    

class CaseInsensitiveString(str):
    """A case insensitive string class. Comparisons are case insensitive."""
    def __new__(cls, arg, h=None):
        n = str.__new__(cls, str(arg))
        n._hash=hash(''.join(list(n)).lower())
        n._lower = ''.join(list(n)).lower()
        return n
    
    def __eq__(self, other):
        return self._lower == ''.join(list(other)).lower()
    
    def __hash__(self):
        # dict hashing done via lower case
        return self._hash
    
    def __str__(self):
        return ''.join(list(self))

class LazyRecord(object):
    """a convenience class for conducting lazy evaluations of class
    properties"""
    NULL_VALUE = None
    def __init__(self):
        """blind constructor of caches"""
        self._cached = {}
        self._table_rows = {}
    
    def _get_cached_value(self, attr_name, get_attr_func):
        if attr_name not in self._cached:
            get_attr_func()
        
        return self._cached[attr_name]
    
    def _populate_cache_from_record(self, attr_column, table_name):
        """attr_column: the attribute name <-> table column key mappin
        table_name: the key in _table_rows"""
        table = self._table_rows[table_name]
        for attr, column, func in attr_column:
            if attr not in self._cached:
                self._cached[attr] = func(table[column])
    
    def _set_null_values(self, attrs, table_name = None):
        for attr in attrs:
            self._cached[attr] = self.NULL_VALUE
        
        if table_name:
            self._table_rows[table_name] = self.NULL_VALUE
    

class NoItemError(Exception):
    def __init__(self, value):
        self.value = value
    
    def __str__(self):
        return repr(self.value)
    

def convert_strand(val):
    """ensures a consistent internal representation of strand"""
    if isinstance(val, str):
        assert val in '-+', 'unknown strand "%s"' % val
        val = [-1,1][val == '+']
    elif val is not None:
        val = [-1,1][val > 0]
    else:
        val = 1
    return val

def asserted_one(items):
    """asserts items has a single value and returns it"""
    one = False
    for item in items:
        if one:
            raise ValueError('More than one: [%s]' % item.items())
        one = True
    if one:
        return item
    else:
        raise NoItemError('No items')

def what_columns(table):
    """shows what columns are in a table"""
    print [c.name for c in table.c]

def yield_selected(sqlalchemy_select, limit=100):
    """takes a SQLAlchemy select condition, yielding limit per db query
    purpose is to not get all records from the db in one go
    """
    offset = 0
    while 1:
        results = sqlalchemy_select.offset(offset).limit(limit).execute()
        count = 0
        for result in results:
            yield result
            count += 1
        offset += limit
        if count == 0 or count < limit:
            break