File: load-file.lisp

package info (click to toggle)
pgloader 3.6.10-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,060 kB
  • sloc: sql: 32,321; lisp: 14,793; makefile: 435; sh: 85; python: 26
file content (133 lines) | stat: -rw-r--r-- 5,792 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
;;;
;;; Generic API for pgloader sources
;;; Methods for source types with multiple files input
;;;

(in-package :pgloader.load)

(defmethod copy-database ((copy md-copy)
                          &key
                            (on-error-stop *on-error-stop*)
                            truncate
                            disable-triggers
			    drop-indexes

                            max-parallel-create-index

                            ;; generic API, but ignored here
                            (worker-count 4)
                            (concurrency 1)

                            data-only
			    schema-only
                            create-tables
			    include-drop
                            foreign-keys
			    create-indexes
			    reset-sequences
                            materialize-views
                            set-table-oids
                            including
                            excluding)
  "Copy the contents of the COPY formated file to PostgreSQL."
  (declare (ignore data-only schema-only
                   create-tables include-drop foreign-keys
                   create-indexes reset-sequences materialize-views
                   set-table-oids including excluding))

  (let* ((*on-error-stop* on-error-stop)
         (pgconn (target-db copy))
         pgsql-catalog)

    (handler-case
        (with-pgsql-connection (pgconn)
          (setf pgsql-catalog
                (fetch-pgsql-catalog (db-name pgconn)
                                     :table (target copy)
                                     :variant (pgconn-variant pgconn)
                                     :pgversion (pgconn-major-version pgconn)))

          ;; if the user didn't tell us the column list of the table, now is
          ;; a proper time to set it in the copy object
          (unless (and (slot-boundp copy 'columns)
                       (slot-value copy 'columns))
            (setf (columns copy)
                  (mapcar (lambda (col)
                            ;; we need to handle the md-copy format for the
                            ;; column list, which allow for user given
                            ;; options: each column is a list which car is
                            ;; the column name.
                            (list (column-name col)))
                          (table-field-list (first (table-list pgsql-catalog))))))

          (log-message :data "CATALOG: ~s" pgsql-catalog)

          ;; this sets (table-index-list (target copy))
          (maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes)

          ;; now is the proper time to truncate, before parallel operations
          (when truncate
            (truncate-tables pgsql-catalog)))

      (cl-postgres:database-error (e)
        (log-message :fatal "Failed to prepare target PostgreSQL table.")
        (log-message :fatal "~a" e)
        (return-from copy-database)))

    ;; Keep the PostgreSQL table target around in the copy instance,
    ;; with the following subtleties to deal with:
    ;;   1. the catalog fetching did fill-in PostgreSQL columns as fields
    ;;   2. we might target fewer pg columns than the table actually has
    (let ((table (first (table-list pgsql-catalog))))
      (setf (table-column-list table)
            (loop :for column-name :in (mapcar #'first (columns copy))
               :collect (find column-name (table-field-list table)
                              :key #'column-name
                              :test #'string=)))
      (setf (target copy) table))

    ;; expand the specs of our source, we might have to care about several
    ;; files actually.
    (let* ((lp:*kernel* (make-kernel worker-count))
           (channel     (lp:make-channel))
           (path-list   (expand-spec (source copy)))
           (task-count  0))
      (with-stats-collection ("Files Processed" :section :post
                                                :use-result-as-read t
                                                :use-result-as-rows t)
        (loop :for path-spec :in path-list
           :count t
           :do (let ((table-source (clone-copy-for copy path-spec)))
                 (when (and (header table-source) (null (fields table-source)))
                   (parse-header table-source))
                 (incf task-count
                       (copy-from table-source
                                  :concurrency concurrency
                                  :kernel lp:*kernel*
                                  :channel channel
                                  :on-error-stop on-error-stop
                                  :disable-triggers disable-triggers)))))

      ;; end kernel
      (with-stats-collection ("COPY Threads Completion" :section :post
                                                        :use-result-as-read t
                                                        :use-result-as-rows t)
        (loop :repeat task-count
           :do (handler-case
                   (destructuring-bind (task table seconds)
                       (lp:receive-result channel)
                     (log-message :debug
                                  "Finished processing ~a for ~s ~50T~6$s"
                                  task (format-table-name table) seconds))
                 (condition (e)
                   (log-message :fatal "~a" e)))
           :finally (progn
                      (lp:end-kernel :wait nil)
                      (return task-count))))
      (lp:end-kernel :wait t))

    ;; re-create the indexes from the target table entry
    (create-indexes-again (target-db copy)
                          pgsql-catalog
                          :max-parallel-create-index max-parallel-create-index
                          :drop-indexes drop-indexes)))