1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
import time, resource
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.test.util import gc_collect
from sqlalchemy.test import profiling
db = create_engine('sqlite://')
metadata = MetaData(db)
Person_table = Table('Person', metadata,
Column('id', Integer, primary_key=True),
Column('type', String(10)),
Column('name', String(40)),
Column('sex', Integer),
Column('age', Integer))
Employee_table = Table('Employee', metadata,
Column('id', Integer, ForeignKey('Person.id'), primary_key=True),
Column('foo', String(40)),
Column('bar', Integer),
Column('bat', Integer))
class RawPerson(object): pass
class Person(object): pass
mapper(Person, Person_table)
class JoinedPerson(object):pass
class Employee(JoinedPerson):pass
mapper(JoinedPerson, Person_table, \
polymorphic_on=Person_table.c.type, polymorphic_identity='person')
mapper(Employee, Employee_table, \
inherits=JoinedPerson, polymorphic_identity='employee')
compile_mappers()
def setup():
metadata.create_all()
i = Person_table.insert()
data = [{'name':'John Doe','sex':1,'age':35, 'type':'employee'}] * 100
for j in xrange(500):
i.execute(data)
# note we arent fetching from employee_table,
# so we can leave it empty even though its "incorrect"
#i = Employee_table.insert()
#data = [{'foo':'foo', 'bar':'bar':'bat':'bat'}] * 100
#for j in xrange(500):
# i.execute(data)
print "Inserted 50,000 rows"
def sqlite_select(entity_cls):
conn = db.connect().connection
cr = conn.cursor()
cr.execute("SELECT id, name, sex, age FROM Person")
people = []
for row in cr.fetchall():
person = entity_cls()
person.id = row[0]
person.name = row[1]
person.sex = row[2]
person.age = row[3]
people.append(person)
cr.close()
conn.close()
def sql_select(entity_cls):
people = []
for row in Person_table.select().execute().fetchall():
person = entity_cls()
person.id = row['id']
person.name = row['name']
person.sex = row['sex']
person.age = row['age']
people.append(person)
#@profiling.profiled(report=True, always=True)
def orm_select():
session = create_session()
people = session.query(Person).all()
#@profiling.profiled(report=True, always=True)
def joined_orm_select():
session = create_session()
people = session.query(JoinedPerson).all()
def all():
setup()
try:
t, t2 = 0, 0
def usage(label):
now = resource.getrusage(resource.RUSAGE_SELF)
print "%s: %0.3fs real, %0.3fs user, %0.3fs sys" % (
label, t2 - t,
now.ru_utime - usage.last.ru_utime,
now.ru_stime - usage.last.ru_stime)
usage.snap(now)
usage.snap = lambda stats=None: setattr(
usage, 'last', stats or resource.getrusage(resource.RUSAGE_SELF))
gc_collect()
usage.snap()
t = time.clock()
sqlite_select(RawPerson)
t2 = time.clock()
usage('sqlite select/native')
gc_collect()
usage.snap()
t = time.clock()
sqlite_select(Person)
t2 = time.clock()
usage('sqlite select/instrumented')
gc_collect()
usage.snap()
t = time.clock()
sql_select(RawPerson)
t2 = time.clock()
usage('sqlalchemy.sql select/native')
gc_collect()
usage.snap()
t = time.clock()
sql_select(Person)
t2 = time.clock()
usage('sqlalchemy.sql select/instrumented')
gc_collect()
usage.snap()
t = time.clock()
orm_select()
t2 = time.clock()
usage('sqlalchemy.orm fetch')
gc_collect()
usage.snap()
t = time.clock()
joined_orm_select()
t2 = time.clock()
usage('sqlalchemy.orm "joined" fetch')
finally:
metadata.drop_all()
if __name__ == '__main__':
all()
|