File: caching.py

package info (click to toggle)
mysql-connector-python 1.2.3-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,716 kB
  • ctags: 5,129
  • sloc: python: 23,339; makefile: 28
file content (236 lines) | stat: -rw-r--r-- 7,863 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# MySQL Connector/Python - MySQL driver written in Python.
# Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.

# MySQL Connector/Python is licensed under the terms of the GPLv2
# <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>, like most
# MySQL Connectors. There are special exceptions to the terms and
# conditions of the GPLv2 as it is applied to this software, see the
# FOSS License Exception
# <http://www.mysql.com/about/legal/licensing/foss-exception.html>.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

"""Implementing caching mechanisms for MySQL Fabric"""


from datetime import datetime, timedelta
from hashlib import sha1
import logging
import threading

from . import FabricShard

_LOGGER = logging.getLogger('myconnpy-fabric')
_CACHE_TTL = 1 * 60  # 1 minute


class CacheEntry(object):

    """Base class for MySQL Fabric cache entries"""

    def __init__(self, version=None, fabric_uuid=None, ttl=_CACHE_TTL):
        self.version = version
        self.fabric_uuid = fabric_uuid
        self.last_updated = datetime.utcnow()
        self._ttl = ttl

    @classmethod
    def hash_index(cls, part1, part2=None):
        """Create hash for indexing"""
        raise NotImplementedError

    @property
    def invalid(self):
        """Returns True if entry is not valid any longer

        This property returns True when the entry is not valid any longer.
        The entry is valid when now > (last updated + ttl), where ttl is
        in seconds.
        """
        if not self.last_updated:
            return False
        atime = self.last_updated + timedelta(seconds=self._ttl)
        return datetime.utcnow() > atime

    def reset_ttl(self):
         self.last_updated = datetime.utcnow()

    def invalidate(self):
        """Invalidates the cache entry"""
        self.last_updated = None


class CacheShardTable(CacheEntry):

    """Cache entry for a Fabric sharded table"""

    def __init__(self, shard, version=None, fabric_uuid=None):
        if not isinstance(shard, FabricShard):
            ValueError("shard argument must be a FabricShard instance")
        super(CacheShardTable, self).__init__(version=version,
                                              fabric_uuid=fabric_uuid)
        self.partitioning = {}
        self._shard = shard

        if shard.key and shard.group:
            self.add_partition(shard.key, shard.group)

    def __getattr__(self, attr):
        return getattr(self._shard, attr)

    def add_partition(self, key, group):
        """Add sharding information for a group"""
        if self.shard_type == 'RANGE':
            key = int(key)
        self.partitioning[key] = {
            'group': group,
        }
        self.reset_ttl()

    @classmethod
    def hash_index(cls, part1, part2=None):
        """Create hash for indexing"""
        return sha1(part1.encode('utf-8') + part2.encode('utf-8')).hexdigest()

    def __repr__(self):
        return "{class_}({database}.{table}.{column})".format(
            class_=self.__class__,
            database=self.database,
            table=self.table,
            column=self.column
        )


class CacheGroup(CacheEntry):
    """Cache entry for a Fabric group"""
    def __init__(self, group_name, servers):
        super(CacheGroup, self).__init__(version=None, fabric_uuid=None)
        self.group_name = group_name
        self.servers = servers

    @classmethod
    def hash_index(cls, part1, part2=None):
        """Create hash for indexing"""
        return sha1(part1.encode('utf-8')).hexdigest()

    def __repr__(self):
        return "{class_}({group})".format(
            class_=self.__class__,
            group=self.group_name,
        )

class FabricCache(object):
    """Singleton class for caching Fabric data

    Only one instance of this class can exists globally.
    """
    def __init__(self, ttl=_CACHE_TTL):
        self._ttl = ttl
        self._sharding = {}
        self._groups = {}
        self.__sharding_lock = threading.Lock()
        self.__groups_lock = threading.Lock()

    def remove_group(self, entry_hash):
        """Remove cache entry for group"""
        with self.__groups_lock:
            try:
                del self._groups[entry_hash]
            except KeyError:
                # not cached, that's OK
                pass
            else:
                _LOGGER.debug("Group removed from cache")

    def remove_shardtable(self, entry_hash):
        """Remove cache entry for shard"""
        with self.__sharding_lock:
            try:
                del self._sharding[entry_hash]
            except KeyError:
                # not cached, that's OK
                pass

    def sharding_cache_table(self, shard, version=None, fabric_uuid=None):
        """Cache information about a shard"""
        entry_hash = CacheShardTable.hash_index(shard.database, shard.table)

        with self.__sharding_lock:
            try:
                entry = self._sharding[entry_hash]
                entry.add_partition(shard.key, shard.group)
            except KeyError:
                # New cache entry
                entry = CacheShardTable(shard, version=version,
                                        fabric_uuid=fabric_uuid)
                self._sharding[entry_hash] = entry

    def cache_group(self, group_name, servers):
        """Cache information about a group"""
        entry_hash = CacheGroup.hash_index(group_name)

        with self.__groups_lock:
            try:
                entry = self._groups[entry_hash]
                entry.servers = servers
                entry.reset_ttl()
                _LOGGER.debug("Recaching group {0} with {1}".format(
                    group_name, servers))
            except KeyError:
                # New cache entry
                entry = CacheGroup(group_name, servers)
                self._groups[entry_hash] = entry
                _LOGGER.debug("Caching group {0} with {1}".format(
                    group_name, servers))

    def sharding_search(self, database, table):
        """Search cache for a shard based on database and table"""
        entry_hash = CacheShardTable.hash_index(database, table)

        entry = None
        try:
            entry = self._sharding[entry_hash]
            if entry.invalid:
                _LOGGER.debug("{entry} invalidated".format(entry))
                self.remove_shardtable(entry_hash)
                return None
        except KeyError:
            # Nothing in cache
            return None

        return entry

    def group_search(self, group_name):
        """Search cache for a group based on its name"""
        entry_hash = CacheGroup.hash_index(group_name)

        entry = None
        try:
            entry = self._groups[entry_hash]
            if entry.invalid:
                _LOGGER.debug("{entry} invalidated".format(entry))
                self.remove_group(entry_hash)
                return None
        except KeyError:
            # Nothing in cache
            return None

        return entry

    def __repr__(self):
        return "{class_}(groups={nrgroups},shards={nrshards})".format(
            class_=self.__class__,
            nrgroups=len(self._groups),
            nrshards=len(self._sharding)
        )