1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
import re
import cgi
import sys
import urllib
from sqlalchemy import exceptions
"""Provide the URL object as well as the make_url parsing function."""
class URL(object):
"""Represent the components of a URL used to connect to a database.
This object is suitable to be passed directly to a ``create_engine()``
call. The fields of the URL are parsed from a string by the
``module-level make_url()`` function. the string format of the URL is
an RFC-1738-style string.
Attributes on URL include:
drivername
The name of the database backend. this name will correspond to a module in sqlalchemy/databases
username
The user name for the connection.
password
database password.
host
The name of the host.
port
The port number.
database
The database.
query
A dictionary containing key/value pairs representing the URL's query string.
"""
def __init__(self, drivername, username=None, password=None, host=None, port=None, database=None, query=None):
self.drivername = drivername
self.username = username
self.password = password
self.host = host
if port is not None:
self.port = int(port)
else:
self.port = None
self.database= database
self.query = query or {}
def __str__(self):
s = self.drivername + "://"
if self.username is not None:
s += self.username
if self.password is not None:
s += ':' + urllib.quote_plus(self.password)
s += "@"
if self.host is not None:
s += self.host
if self.port is not None:
s += ':' + str(self.port)
if self.database is not None:
s += '/' + self.database
if len(self.query):
keys = self.query.keys()
keys.sort()
s += '?' + "&".join(["%s=%s" % (k, self.query[k]) for k in keys])
return s
def get_dialect(self):
"""Return the SQLAlchemy database dialect class corresponding to this URL's driver name."""
dialect=None
if self.drivername == 'ansi':
import sqlalchemy.ansisql
return sqlalchemy.ansisql.dialect
try:
module=getattr(__import__('sqlalchemy.databases.%s' % self.drivername).databases, self.drivername)
dialect=module.dialect
except ImportError:
if sys.exc_info()[2].tb_next is None:
import pkg_resources
for res in pkg_resources.iter_entry_points('sqlalchemy.databases'):
if res.name==self.drivername:
dialect=res.load()
else:
raise
if dialect is not None:
return dialect
raise ImportError('unknown database %r' % self.drivername)
def translate_connect_args(self, names):
"""Translate this URL's attributes into a dictionary of connection arguments.
Given a list of argument names corresponding to the URL
attributes (`host`, `database`, `username`, `password`,
`port`), will assemble the attribute values of this URL into
the dictionary using the given names.
"""
a = {}
attribute_names = ['host', 'database', 'username', 'password', 'port']
for n in names:
sname = attribute_names.pop(0)
if n is None:
continue
if getattr(self, sname, None):
a[n] = getattr(self, sname)
return a
def make_url(name_or_url):
"""Given a string or unicode instance, produce a new URL instance.
The given string is parsed according to the rfc1738 spec. If an
existing URL object is passed, just returns the object.
"""
if isinstance(name_or_url, basestring):
return _parse_rfc1738_args(name_or_url)
else:
return name_or_url
def _parse_rfc1738_args(name):
pattern = re.compile(r'''
(\w+)://
(?:
([^:/]*)
(?::([^/]*))?
@)?
(?:
([^/:]*)
(?::([^/]*))?
)?
(?:/(.*))?
'''
, re.X)
m = pattern.match(name)
if m is not None:
(name, username, password, host, port, database) = m.group(1, 2, 3, 4, 5, 6)
if database is not None:
tokens = database.split(r"?", 2)
database = tokens[0]
query = (len(tokens) > 1 and dict( cgi.parse_qsl(tokens[1]) ) or None)
if query is not None:
query = dict([(k.encode('ascii'), query[k]) for k in query])
else:
query = None
opts = {'username':username,'password':password,'host':host,'port':port,'database':database, 'query':query}
if opts['password'] is not None:
opts['password'] = urllib.unquote_plus(opts['password'])
return URL(name, **opts)
else:
raise exceptions.ArgumentError("Could not parse rfc1738 URL from string '%s'" % name)
def _parse_keyvalue_args(name):
m = re.match( r'(\w+)://(.*)', name)
if m is not None:
(name, args) = m.group(1, 2)
opts = dict( cgi.parse_qsl( args ) )
return URL(name, *opts)
else:
return None
|