File: database.py

package info (click to toggle)
miro 3.0.3-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 36,056 kB
  • ctags: 9,438
  • sloc: python: 52,860; cpp: 832; ansic: 692; xml: 432; sh: 403; makefile: 62
file content (553 lines) | stat: -rw-r--r-- 18,714 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
# -*- mode: python -*-

# Miro - an RSS based video player application
# Copyright (C) 2005-2010 Participatory Culture Foundation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
#
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.

import logging
import traceback
import threading

from miro import app
from miro import signals

class DatabaseConstraintError(Exception):
    """Raised when a DDBObject fails its constraint checking during
    signal_change().
    """
    pass

class DatabaseConsistencyError(Exception):
    """Raised when the database encounters an internal consistency
    issue.
    """
    pass

class DatabaseThreadError(Exception):
    """Raised when the database encounters an internal consistency
    issue.
    """
    pass

class DatabaseVersionError(StandardError):
    """Raised when an attempt is made to restore a database newer than
    the one we support
    """
    pass

class ObjectNotFoundError(StandardError):
    """Raised when an attempt is made to lookup an object that doesn't
    exist
    """
    pass

class TooManyObjects(StandardError):
    """Raised when an attempt is made to lookup a singleton and
    multiple rows match the query.
    """
    pass

class NotRootDBError(StandardError):
    """Raised when an attempt is made to call a function that's only
    allowed to be called from the root database.
    """
    pass

class NoValue(object):
    """Used as a dummy value so that "None" can be treated as a valid
    value.
    """
    pass

# begin* and end* no longer actually lock the database.  Instead
# confirm_db_thread prints a warning if it's run from any thread that
# isn't the main thread.  This can be removed from releases for speed
# purposes.

event_thread = None
def set_thread(thread):
    global event_thread
    if event_thread is None:
        event_thread = thread

def confirm_db_thread():
    if event_thread is None or event_thread != threading.currentThread():
        if event_thread is None:
            error_string = "Database event thread not set"
        else:
            error_string = "Database called from %s" % threading.currentThread()
        traceback.print_stack()
        raise DatabaseThreadError, error_string

class View(object):
    def __init__(self, klass, where, values, order_by, joins, limit):
        self.klass = klass
        self.where = where
        self.values = values
        self.order_by = order_by
        self.joins = joins
        self.limit = limit

    def __iter__(self):
        return app.db.query(self.klass, self.where, self.values,
                            self.order_by, self.joins, self.limit)

    def count(self):
        return app.db.query_count(self.klass, self.where, self.values,
                                  self.joins, self.limit)

    def get_singleton(self):
        results = list(self)
        if len(results) == 1:
            return results[0]
        elif len(results) == 0:
            raise ObjectNotFoundError("Can't find singleton")
        else:
            raise TooManyObjects("Too many results returned")

    def make_tracker(self):
        if self.limit is not None:
            raise ValueError("tracking views with limits not supported")
        return ViewTracker(self.klass, self.where, self.values, self.joins)

class ViewTrackerManager(object):
    def __init__(self):
        # maps table_name to trackers
        self.table_to_tracker = {}
        # maps joined tables to trackers
        self.joined_table_to_tracker = {}

    def trackers_for_table(self, table_name):
        try:
            return self.table_to_tracker[table_name]
        except KeyError:
            self.table_to_tracker[table_name] = set()
            return self.table_to_tracker[table_name]

    def trackers_for_ddb_class(self, klass):
        return self.trackers_for_table(app.db.table_name(klass))

    def update_view_trackers(self, obj):
        """Update view trackers based on an object change."""

        for tracker in self.trackers_for_ddb_class(obj.__class__):
            tracker.object_changed(obj)

    def bulk_update_view_trackers(self, table_name):
        for tracker in self.trackers_for_table(table_name):
            tracker.check_all_objects()

    def bulk_remove_from_view_trackers(self, table_name, objects):
        for tracker in self.trackers_for_table(table_name):
            tracker.remove_objects(objects)

    def remove_from_view_trackers(self, obj):
        """Update view trackers based on an object change."""

        for tracker in self.trackers_for_ddb_class(obj.__class__):
            tracker.remove_object(obj)

class ViewTracker(signals.SignalEmitter):
    def __init__(self, klass, where, values, joins):
        signals.SignalEmitter.__init__(self, 'added', 'removed', 'changed')
        self.klass = klass
        self.where = where
        if isinstance(values, list):
            raise TypeError("values must be a tuple")
        self.values = values
        self.joins = joins
        self.current_ids = set(app.db.query_ids(klass, where, values,
            joins=joins))
        self.table_name = app.db.table_name(klass)
        vt_manager = app.view_tracker_manager
        vt_manager.trackers_for_table(self.table_name).add(self)

    def unlink(self):
        vt_manager = app.view_tracker_manager
        vt_manager.trackers_for_table(self.table_name).discard(self)

    def _obj_in_view(self, obj):
        where = '%s.id = ?' % (self.table_name,)
        if self.where:
            where += ' AND (%s)' % (self.where,)

        values = (obj.id,) + self.values
        return app.db.query_count(self.klass, where, values, self.joins) > 0

    def object_changed(self, obj):
        self.check_object(obj)

    def remove_object(self, obj):
        if obj.id in self.current_ids:
            self.current_ids.remove(obj.id)
            self.emit('removed', obj)

    def remove_objects(self, objects):
        object_map = dict((o.id, o) for o in objects)
        object_ids = set(object_map.keys())
        for removed_id in self.current_ids.intersection(object_ids):
            self.current_ids.remove(removed_id)
            self.emit('removed', object_map[removed_id])

    def check_object(self, obj):
        before = (obj.id in self.current_ids)
        now = self._obj_in_view(obj)
        if before and not now:
            self.current_ids.remove(obj.id)
            self.emit('removed', obj)
        elif now and not before:
            self.current_ids.add(obj.id)
            self.emit('added', obj)
        elif before and now:
            self.emit('changed', obj)

    def check_all_objects(self):
        new_ids = set(app.db.query_ids(self.klass, self.where,
            self.values, joins=self.joins))
        old_ids = self.current_ids
        self.current_ids = new_ids
        for id_ in new_ids.difference(old_ids):
            self.emit('added', app.db.get_obj_by_id(id_))
        for id_ in old_ids.difference(new_ids):
            self.emit('removed', app.db.get_obj_by_id(id_))
        for id_ in old_ids.intersection(new_ids):
            # XXX this hits all the IDs, but there doesn't seem to be
            # a way to check if the objects have actually been
            # changed.  luckily, this isn't called very often.
            self.emit('changed', app.db.get_obj_by_id(id_))

    def __len__(self):
        return len(self.current_ids)

class BulkSQLManager(object):
    def __init__(self):
        self.active = False
        self.to_insert = {}
        self.to_remove = {}
        self.pending_inserts = set()

    def start(self):
        if self.active:
            raise ValueError("BulkSQLManager.start() called twice")
        self.active = True

    def finish(self):
        if not self.active:
            raise ValueError("BulkSQLManager.finish() called twice")
        self.commit()
        self.active = False

    def commit(self):
        for x in range(100):
            to_insert = self.to_insert
            to_remove = self.to_remove
            self.to_insert = {}
            self.to_remove = {}
            self._commit_sql(to_insert, to_remove)
            self._update_view_trackers(to_insert, to_remove)
            if len(self.to_insert) == len(self.to_remove) == 0:
                break
            # inside _commit_sql() or _update_view_trackers(), we were asked
            # to insert or remove more items, repeat the proccess again
        else:
            raise AssertionError("Called _commit_sql 100 times and still "
                    "have items to commit.  Are we in a circular loop?")
        self.to_insert = {}
        self.to_remove = {}
        self.pending_inserts = set()

    def _commit_sql(self, to_insert, to_remove):
        for table_name, objects in to_insert.items():
            logging.debug('bulk insert: %s %s', table_name, len(objects))
            app.db.bulk_insert(objects)
            for obj in objects:
                obj.inserted_into_db()

        for table_name, objects in to_remove.items():
            logging.debug('bulk remove: %s %s', table_name, len(objects))
            app.db.bulk_remove(objects)
            for obj in objects:
                obj.removed_from_db()

    def _update_view_trackers(self, to_insert, to_remove):
        for table_name in to_insert:
            app.view_tracker_manager.bulk_update_view_trackers(table_name)

        for table_name, objects in to_remove.items():
            if table_name in to_insert:
                # already updated the view above
                continue
            app.view_tracker_manager.bulk_remove_from_view_trackers(
                    table_name, objects)

    def add_insert(self, obj):
        table_name = app.db.table_name(obj.__class__)
        try:
            inserts_for_table = self.to_insert[table_name]
        except KeyError:
            inserts_for_table = []
            self.to_insert[table_name] = inserts_for_table
        inserts_for_table.append(obj)
        self.pending_inserts.add(obj)

    def will_insert(self, obj):
        return obj in self.pending_inserts

    def add_remove(self, obj):
        table_name = app.db.table_name(obj.__class__)
        if self.will_insert(obj):
            self.to_insert[table_name].remove(obj)
            self.pending_inserts.remove(obj)
            return
        try:
            removes_for_table = self.to_remove[table_name]
        except KeyError:
            removes_for_table = []
            self.to_remove[table_name] = removes_for_table
        removes_for_table.append(obj)

class AttributeUpdateTracker(object):
    """Used by DDBObject to track changes to attributes."""

    def __init__(self, name):
        self.name = name

    # Simple implementation of the python descriptor protocol.  We
    # just want to update changed_attributes when attributes are set.

    def __get__(self, instance, owner):
        try:
            return instance.__dict__[self.name]
        except KeyError:
            raise AttributeError(self.name)
        except AttributeError:
            if instance is None:
                raise AttributeError(
                    "Can't access '%s' as a class attribute" % self.name)
            else:
                raise

    def __set__(self, instance, value):
        if instance.__dict__.get(self.name, "BOGUS VALUE FOO") != value:
            instance.changed_attributes.add(self.name)
        instance.__dict__[self.name] = value

class DDBObject(signals.SignalEmitter):
    """Dynamic Database object
    """
    #The last ID used in this class
    lastID = 0

    def __init__(self, *args, **kwargs):
        self.in_db_init = True
        signals.SignalEmitter.__init__(self, 'removed')
        self.changed_attributes = set()

        if len(args) == 0 and kwargs.keys() == ['restored_data']:
            restoring = True
        else:
            restoring = False

        if restoring:
            self.__dict__.update(kwargs['restored_data'])
            app.db.remember_object(self)
            self.setup_restored()
            # handle setup_restored() calling remove()
            if not self.id_exists():
                return
        else:
            self.id = DDBObject.lastID = DDBObject.lastID + 1
            # call remember_object so that id_exists will return True
            # when setup_new() is being run
            app.db.remember_object(self)
            self.setup_new(*args, **kwargs)
            # handle setup_new() calling remove()
            if not self.id_exists():
                return

        self.in_db_init = False

        if not restoring:
            self._insert_into_db()

    def _insert_into_db(self):
        if not app.bulk_sql_manager.active:
            app.db.insert_obj(self)
            self.inserted_into_db()
            app.view_tracker_manager.update_view_trackers(self)
        else:
            app.bulk_sql_manager.add_insert(self)

    def inserted_into_db(self):
        self.check_constraints()
        self.on_db_insert()

    @classmethod
    def make_view(cls, where=None, values=None, order_by=None, joins=None,
            limit=None):
        if values is None:
            values = ()
        return View(cls, where, values, order_by, joins, limit)

    @classmethod
    def get_by_id(cls, id_):
        try:
            # try memory first before going to sqlite.
            obj = app.db.get_obj_by_id(id_)
            if app.db.object_from_class_table(obj, cls):
                return obj
            else:
                raise ObjectNotFoundError(id_)
        except KeyError:
            return cls.make_view('id=?', (id_,)).get_singleton()

    @classmethod
    def delete(cls, where, values=None):
        return app.db.delete(cls, where, values)

    @classmethod
    def select(cls, columns, where=None, values=None, convert=True):
        return app.db.select(cls, columns, where, values, convert)

    def setup_new(self):
        """Initialize a newly created object."""
        pass

    def setup_restored(self):
        """Initialize an object restored from disk."""
        pass

    def on_db_insert(self):
        """Called after an object has been inserted into the db."""
        pass

    @classmethod
    def track_attribute_changes(cls, name):
        """Set up tracking when attributes get set.

        Call this on a DDBObject subclass to track changes to certain
        attributes.  Each DDBObject has a changed_attributes set,
        which contains the attributes that have changed.

        This is used by the SQLite storage layer to track which
        attributes are changed between SQL UPDATE statements.

        For example:

        >> MyDDBObjectSubclass.track_attribute_changes('foo')
        >> MyDDBObjectSubclass.track_attribute_changes('bar')
        >>> obj = MyDDBObjectSubclass()
        >>> print obj.changed_attributes
        set([])
        >> obj.foo = obj.bar = obj.baz = 3
        >>> print obj.changed_attributes
        set(['foo', 'bar'])
        """
        # The AttributeUpdateTracker class does all the work
        setattr(cls, name, AttributeUpdateTracker(name))

    def reset_changed_attributes(self):
        self.changed_attributes = set()

    def get_id(self):
        """Returns unique integer assocaited with this object
        """
        return self.id

    def id_exists(self):
        try:
            self.get_by_id(self.id)
        except ObjectNotFoundError:
            return False
        else:
            return True

    def remove(self):
        """Call this after you've removed all references to the object
        """
        if not app.bulk_sql_manager.active:
            app.db.remove_obj(self)
            self.removed_from_db()
            app.view_tracker_manager.remove_from_view_trackers(self)
        else:
            app.bulk_sql_manager.add_remove(self)

    def removed_from_db(self):
        self.emit('removed')

    def confirm_db_thread(self):
        """Call this before you grab data from an object

        Usage::

            view.confirm_db_thread()
            ...
        """
        confirm_db_thread()

    def check_constraints(self):
        """Subclasses can override this method to do constraint
        checking before they get saved to disk.  They should raise a
        DatabaseConstraintError on problems.
        """
        pass

    def signal_change(self, needs_save=True):
        """Call this after you change the object
        """
        if self.in_db_init:
            # signal_change called while we were setting up a object,
            # just ignore it.
            return
        if not self.id_exists():
            msg = ("signal_change() called on non-existant object (id is %s)" \
                       % self.id)
            raise DatabaseConstraintError, msg
        self.on_signal_change()
        self.check_constraints()
        if app.bulk_sql_manager.will_insert(self):
            # Don't need to send an UPDATE SQL command, or check the view
            # trackers in this case.  Both will be done when the
            # BulkSQLManager.finish() is called.
            return
        if needs_save:
            app.db.update_obj(self)
        app.view_tracker_manager.update_view_trackers(self)

    def on_signal_change(self):
        pass

def update_last_id():
    DDBObject.lastID = app.db.get_last_id()

def setup_managers():
    app.view_tracker_manager = ViewTrackerManager()
    app.bulk_sql_manager = BulkSQLManager()

def initialize():
    update_last_id()
    setup_managers()