File: bulk-copy.lisp

package info (click to toggle)
cl-postmodern 20180430-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 804 kB
  • sloc: lisp: 7,423; makefile: 2
file content (134 lines) | stat: -rw-r--r-- 4,885 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134

(in-package :cl-postgres)

(defclass bulk-copier ()
  ((own-connection :initarg :own-connection :reader bulk-copier-own-connection)
   (database :initarg :database :reader copier-database)
   (table :initarg :table :reader copier-table)
   (columns :initarg :columns :reader copier-columns)
   (count :initform 0 :accessor copier-count)))

(defmethod print-object ((self bulk-copier) stream)
  (print-unreadable-object (self stream :type t :identity t)
    (format stream "~a ~a" (copier-table self)
	    (copier-columns self))))


(defun open-db-writer (db-spec table columns)
  (let* ((own-connection (listp db-spec))
         (copier (make-instance 'bulk-copier
                   :own-connection own-connection
                   :database (if own-connection (apply 'open-database db-spec) db-spec)
                   :table table
                   :columns columns)))
    (initialize-copier copier)
    copier))

(defun close-db-writer (self &key (abort nil))
  (unwind-protect
       (let* ((connection (copier-database self))
              (socket (connection-socket connection)))
         (with-reconnect-restart connection
           (using-connection connection
             (send-copy-done socket))))
    (when (or abort (bulk-copier-own-connection self))
      (close-database (copier-database self))))
  (copier-count self))

(defun db-write-row (self row &optional (data (prepare-row self row)))
  (let* ((connection (copier-database self))
	 (socket (connection-socket connection)))
    (with-reconnect-restart connection
      (using-connection connection
        (with-syncing
          (copy-data-message socket data)))))
  (incf (copier-count self)))

(defun copy-query (self)
  (format nil "~%copy ~a ~@[(~{~a~^,~})~] ~a ~a"
    (copier-table self)
    (copier-columns self)
    "FROM"
    "STDIN"))

(defun send-copy-start (socket query)
  (with-syncing
    (query-message socket query)
    (flush-message socket)
    (force-output socket)
    (message-case socket
      ;; Ignore the field formats because we're only supporting plain
      ;; text for now
      (#\G (read-uint1 socket)
	   (skip-bytes socket (* 2 (read-uint2 socket)))))))

(defun initialize-copier (self)
  (let* ((query (copy-query self))
	 (connection (copier-database self))
	 (socket (connection-socket connection)))
    (with-reconnect-restart connection
      (using-connection connection
	(send-copy-start socket query)))))


(defun copier-write-value (s val)
  (typecase val
    (string (let ((pg-string (with-output-to-string (str)
                               (loop for byte across (cl-postgres-trivial-utf-8:string-to-utf-8-bytes val) do
                                    (case (code-char byte)
                                      (#\Space (princ " " str))
                                      ((#\Newline #\Tab) (format str "\\~a" (code-char byte)))
                                      (#\\ (progn (princ #\\ str) (princ #\\ str)))
                                      (otherwise (if (and (< 32 byte)
                                                          (> 127 byte))
                                                     (write-char (code-char byte) str)
                                                     (princ (format nil "\\~o" byte) str))))))))
              #+nil(print `(:loading ,pg-string))
              (princ pg-string s)))
    (number (princ val s))
    (null (princ "false" s))
    (symbol (case val
              (:null (princ "\\N" s))
              ((t) (princ "true" s))
              (otherwise (error "copier-write-val: Symbols shouldn't be getting this far ~a" val))))))

(defun copier-write-sequence (s vector)
  (write-char #\{ s)
  (loop for (item . more-p) on (coerce vector 'list)
     do (cond
          ((null item) (copier-write-value s :null))
          ((atom item) (copier-write-value s item))
          (t (copier-write-sequence s item)))
     when more-p
     do (write-char #\, s))
  (write-char #\} s))

(defmethod prepare-row (self row)
  (declare (ignore self))
  (with-output-to-string (s)
    (loop for (val . more-p) on row
		   do (progn
            (if (typep val '(or string
                             (not vector)))
                (copier-write-value s val)
                (copier-write-sequence s val)))
		   if more-p do (write-char #\Tab s)
		   finally
         (write-char #\Newline s))))

(defun send-copy-done (socket)
  (with-syncing
    (setf sync-sent t)
    (copy-done-message socket)
    (force-output socket)
    (message-case socket
      (#\C (let* ((command-tag (read-str socket))
		  (space (position #\Space command-tag :from-end t)))
	     (when space
	       (parse-integer command-tag :junk-allowed t :start (1+ space))))))
    (block find-ready
      (loop (message-case socket
              (#\Z (read-uint1 socket)
                   (return-from find-ready))
              (t :skip))))))