1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
#!/usr/bin/env python
# PYTHON_ARGCOMPLETE_OK
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from __future__ import print_function
import csv
import os
import sys
try:
from urllib.error import HTTPError
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen, HTTPError
import zipfile
from argparse import ArgumentParser
from io import BytesIO, TextIOWrapper
try:
import argcomplete
except ImportError:
argcomplete = None
try:
from tqdm import tqdm
except ImportError:
tqdm = None
try:
from proteus import Model, config
except ImportError:
prog = os.path.basename(sys.argv[0])
sys.exit("proteus must be installed to use %s" % prog)
def _progress(iterable, **kwargs):
if tqdm:
return tqdm(iterable, disable=None, **kwargs)
else:
return iterable
def clean(code):
sys.stderr.write('Cleaning')
PostalCode = Model.get('country.postal_code')
PostalCode._proxy.delete(
[c.id for c in PostalCode.find([('country.code', '=', code)])], {})
print('.', file=sys.stderr)
def fetch(code):
sys.stderr.write('Fetching')
url = 'https://downloads.tryton.org/geonames/%s.zip' % code
try:
responce = urlopen(url)
except HTTPError as e:
print("Code '%s' not available: %" % (code, e.reason), file=sys.stderr)
print('.', file=sys.stderr)
return
else:
data = responce.read()
with zipfile.ZipFile(BytesIO(data)) as zf:
data = zf.read('%s.txt' % code)
print('.', file=sys.stderr)
return data
def import_(data):
PostalCode = Model.get('country.postal_code')
Country = Model.get('country.country')
Subdivision = Model.get('country.subdivision')
print('Importing', file=sys.stderr)
def get_country(code):
country = countries.get(code)
if not country:
try:
country, = Country.find([('code', '=', code)])
except ValueError:
sys.exit("Error missing country with code %s" % code)
countries[code] = country
return country
countries = {}
def get_subdivision(country, code):
code = '%s-%s' % (country, code)
subdivision = subdivisions.get(code)
if not subdivision:
try:
subdivision, = Subdivision.find([('code', '=', code)])
except ValueError:
return
subdivisions[code] = subdivision
return subdivision
subdivisions = {}
f = TextIOWrapper(BytesIO(data), encoding='utf-8')
codes = []
for row in _progress(csv.DictReader(
f, fieldnames=_fieldnames, delimiter='\t'),
total=data.count(b'\n')):
country = get_country(row['country'])
for code in ['code1', 'code2', 'code3']:
subdivision = get_subdivision(row['country'], row[code])
if code == 'code1' or subdivision:
codes.append(
PostalCode(country=country, subdivision=subdivision,
postal_code=row['postal'], city=row['place']))
PostalCode.save(codes)
_fieldnames = ['country', 'postal', 'place', 'name1', 'code1',
'name2', 'code2', 'name3', 'code3', 'latitude', 'longitude', 'accuracy']
def main(database, codes, config_file=None):
config.set_trytond(database, config_file=config_file)
do_import(codes)
def do_import(codes):
for code in codes:
print(code, file=sys.stderr)
code = code.upper()
clean(code)
fetched = fetch(code)
if fetched:
import_(fetched)
def run():
parser = ArgumentParser()
parser.add_argument('-d', '--database', dest='database', required=True)
parser.add_argument('-c', '--config', dest='config_file',
help='the trytond config file')
parser.add_argument('codes', nargs='+')
if argcomplete:
argcomplete.autocomplete(parser)
args = parser.parse_args()
main(args.database, args.codes, args.config_file)
if __name__ == '__main__':
run()
|