1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
dbf2sqlite - convert dbf files into sqlite database
Ole Martin Bjørndalen
University of Tromsø
Todo:
- -v --verbose option
- handle existing table (-f option?)
- primary key option? (make first column primary key)
- create only option?
- insert only option?
- options to select columns to insert?
"""
import os
import sys
import argparse
import sqlite3
import traceback
from dbfread import DBF
typemap = {
'F': 'FLOAT',
'L': 'BOOLEAN',
'I': 'INTEGER',
'C': 'TEXT',
'N': 'REAL', # because it can be integer or float
'M': 'TEXT',
'D': 'DATE',
'T': 'DATETIME',
'0': 'INTEGER',
}
def add_table(cursor, table):
"""Add a dbase table to an open sqlite database."""
cursor.execute('drop table if exists %s' % table.name)
field_types = {}
for f in table.fields:
field_types[f.name] = typemap.get(f.type, 'TEXT')
#
# Create the table
#
defs = ', '.join(['"%s" %s' % (f, field_types[f])
for f in table.field_names])
sql = 'create table "%s" (%s)' % (table.name, defs)
cursor.execute(sql)
# Create data rows
refs = ', '.join([':' + f for f in table.field_names])
sql = 'insert into "%s" values (%s)' % (table.name, refs)
for rec in table:
cursor.execute(sql, list(rec.values()))
def parse_args():
parser = argparse.ArgumentParser(
description='usage: %prog [OPTIONS] table1.dbf ... tableN.dbf')
arg = parser.add_argument
arg('-o', '--output-file',
action='store',
dest='output_file',
default=None,
help='sqlite database to write to '
'(default is to print schema to stdout)')
arg('-e', '--encoding',
action='store',
dest='encoding',
default=None,
help='character encoding in DBF file')
arg('--char-decode-errors',
action='store',
dest='char_decode_errors',
default='strict',
help='how to handle decode errors (see pydoc bytes.decode)')
arg('tables',
metavar='TABLE',
nargs='+',
help='tables to add to sqlite database')
return parser.parse_args()
def main():
args = parse_args()
conn = sqlite3.connect(args.output_file or ':memory:')
cursor = conn.cursor()
for table_file in args.tables:
try:
add_table(cursor, DBF(table_file,
lowernames=True,
encoding=args.encoding,
char_decode_errors=args.char_decode_errors))
except UnicodeDecodeError as err:
traceback.print_exc()
sys.exit('Please use --encoding or --char-decode-errors.')
conn.commit()
#
# Dump SQL schema and data to stdout if no
# database file was specified.
#
# This currently only works in Python 3,
# since Python 2 somehow defaults to 'ascii'
# encoding.
#
if not args.output_file:
for line in conn.iterdump():
print(line)
if __name__ == '__main__':
main()
|