File: threaded_compile.py

package info (click to toggle)
sqlalchemy 0.6.3-3%2Bsqueeze1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 10,744 kB
  • ctags: 15,132
  • sloc: python: 93,431; ansic: 787; makefile: 137; xml: 17
file content (75 lines) | stat: -rw-r--r-- 1,761 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""test that mapper compilation is threadsafe, including
when additional mappers are created while the existing
collection is being compiled."""

from sqlalchemy import *
from sqlalchemy.orm import *
import thread, time
from sqlalchemy.orm import mapperlib


meta = MetaData('sqlite:///foo.db')

t1 = Table('t1', meta,
    Column('c1', Integer, primary_key=True),
    Column('c2', String(30))
    )

t2 = Table('t2', meta,
    Column('c1', Integer, primary_key=True),
    Column('c2', String(30)),
    Column('t1c1', None, ForeignKey('t1.c1'))
)
t3 = Table('t3', meta,
    Column('c1', Integer, primary_key=True),
    Column('c2', String(30)),
)
meta.create_all()

class T1(object):
    pass

class T2(object):
    pass

class FakeLock(object):
    def acquire(self):pass
    def release(self):pass

# uncomment this to disable the mutex in mapper compilation;
# should produce thread collisions
#mapperlib._COMPILE_MUTEX = FakeLock()

def run1():
    for i in range(50):
        print "T1", thread.get_ident()
        class_mapper(T1)
        time.sleep(.05)

def run2():
    for i in range(50):
        print "T2", thread.get_ident()
        class_mapper(T2)
        time.sleep(.057)

def run3():
    for i in range(50):
        def foo():
            print "FOO", thread.get_ident()
            class Foo(object):pass
            mapper(Foo, t3)
            class_mapper(Foo).compile()
        foo()
        time.sleep(.05)

mapper(T1, t1, properties={'t2':relationship(T2, backref="t1")})
mapper(T2, t2)
print "START"
for j in range(0, 5):
    thread.start_new_thread(run1, ())
    thread.start_new_thread(run2, ())
    thread.start_new_thread(run3, ())
    thread.start_new_thread(run3, ())
    thread.start_new_thread(run3, ())
print "WAIT"
time.sleep(5)