1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
(in-package :cl-postgres)
(defclass bulk-copier ()
((own-connection :initarg :own-connection :reader bulk-copier-own-connection)
(database :initarg :database :reader copier-database)
(table :initarg :table :reader copier-table)
(columns :initarg :columns :reader copier-columns)
(count :initform 0 :accessor copier-count)))
(defmethod print-object ((self bulk-copier) stream)
(print-unreadable-object (self stream :type t :identity t)
(format stream "~a ~a" (copier-table self)
(copier-columns self))))
(defun open-db-writer (db-spec table columns)
(let* ((own-connection (listp db-spec))
(copier (make-instance 'bulk-copier
:own-connection own-connection
:database (if own-connection (apply 'open-database db-spec) db-spec)
:table table
:columns columns)))
(initialize-copier copier)
copier))
(defun close-db-writer (self &key (abort nil))
(unwind-protect
(let* ((connection (copier-database self))
(socket (connection-socket connection)))
(with-reconnect-restart connection
(using-connection connection
(send-copy-done socket))))
(when (or abort (bulk-copier-own-connection self))
(close-database (copier-database self))))
(copier-count self))
(defun db-write-row (self row &optional (data (prepare-row self row)))
(let* ((connection (copier-database self))
(socket (connection-socket connection)))
(with-reconnect-restart connection
(using-connection connection
(with-syncing
(copy-data-message socket data)))))
(incf (copier-count self)))
(defun copy-query (self)
(format nil "~%copy ~a ~@[(~{~a~^,~})~] ~a ~a"
(copier-table self)
(copier-columns self)
"FROM"
"STDIN"))
(defun send-copy-start (socket query)
(with-syncing
(query-message socket query)
(flush-message socket)
(force-output socket)
(message-case socket
;; Ignore the field formats because we're only supporting plain
;; text for now
(#\G (read-uint1 socket)
(skip-bytes socket (* 2 (read-uint2 socket)))))))
(defun initialize-copier (self)
(let* ((query (copy-query self))
(connection (copier-database self))
(socket (connection-socket connection)))
(with-reconnect-restart connection
(using-connection connection
(send-copy-start socket query)))))
(defun copier-write-value (s val)
(typecase val
(string (let ((pg-string (with-output-to-string (str)
(loop for byte across (cl-postgres-trivial-utf-8:string-to-utf-8-bytes val) do
(case (code-char byte)
(#\Space (princ " " str))
((#\Newline #\Tab) (format str "\\~a" (code-char byte)))
(#\\ (progn (princ #\\ str) (princ #\\ str)))
(otherwise (if (and (< 32 byte)
(> 127 byte))
(write-char (code-char byte) str)
(princ (format nil "\\~o" byte) str))))))))
#+nil(print `(:loading ,pg-string))
(princ pg-string s)))
(number (princ val s))
(null (princ "false" s))
(symbol (case val
(:null (princ "\\N" s))
((t) (princ "true" s))
(otherwise (error "copier-write-val: Symbols shouldn't be getting this far ~a" val))))))
(defun copier-write-sequence (s vector)
(write-char #\{ s)
(loop for (item . more-p) on (coerce vector 'list)
do (cond
((null item) (copier-write-value s :null))
((atom item) (copier-write-value s item))
(t (copier-write-sequence s item)))
when more-p
do (write-char #\, s))
(write-char #\} s))
(defmethod prepare-row (self row)
(declare (ignore self))
(with-output-to-string (s)
(loop for (val . more-p) on row
do (progn
(if (typep val '(or string
(not vector)))
(copier-write-value s val)
(copier-write-sequence s val)))
if more-p do (write-char #\Tab s)
finally
(write-char #\Newline s))))
(defun send-copy-done (socket)
(with-syncing
(setf sync-sent t)
(copy-done-message socket)
(force-output socket)
(message-case socket
(#\C (let* ((command-tag (read-str socket))
(space (position #\Space command-tag :from-end t)))
(when space
(parse-integer command-tag :junk-allowed t :start (1+ space))))))
(block find-ready
(loop (message-case socket
(#\Z (read-uint1 socket)
(return-from find-ready))
(t :skip))))))
|