1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
import random
import threading
import time
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import relationship
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.fixtures import fixture_session
class ConcurrentUseDeclMappingTest(fixtures.TestBase):
def teardown_test(self):
clear_mappers()
@classmethod
def make_a(cls, Base):
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data = Column(String)
bs = relationship("B")
# need a strong ref so that the class is not gc'ed
cls.A = A
@classmethod
def query_a(cls, Base, result):
s = fixture_session()
time.sleep(random.random() / 100)
A = cls.A
try:
s.query(A).join(A.bs)
except orm_exc.UnmappedClassError as oe:
# this is the failure mode, where B is being handled by
# declarative and is in the registry but not mapped yet.
result[0] = oe
except exc.InvalidRequestError:
# if make_b() starts too slowly, we can reach here, because
# B isn't in the registry yet. We can't guard against this
# case in the library because a class can refer to a name that
# doesn't exist and that has to raise.
result[0] = True
else:
# no conflict
result[0] = True
@classmethod
def make_b(cls, Base):
class B(Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
@declared_attr
def data(cls):
time.sleep(0.001)
return Column(String)
a_id = Column(ForeignKey("a.id"))
cls.B = B
def test_concurrent_create(self):
for i in range(50):
Base = declarative_base()
clear_mappers()
self.make_a(Base)
result = [False]
threads = [
threading.Thread(target=self.make_b, args=(Base,)),
threading.Thread(target=self.query_a, args=(Base, result)),
]
for t in threads:
t.start()
for t in threads:
t.join()
if isinstance(result[0], orm_exc.UnmappedClassError):
raise result[0]
|