1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
;;;
;;; Generic API for pgloader sources
;;; Methods for source types with multiple files input
;;;
(in-package :pgloader.load)
(defmethod copy-database ((copy md-copy)
&key
(on-error-stop *on-error-stop*)
truncate
disable-triggers
drop-indexes
max-parallel-create-index
;; generic API, but ignored here
(worker-count 4)
(concurrency 1)
data-only
schema-only
create-tables
include-drop
foreign-keys
create-indexes
reset-sequences
materialize-views
set-table-oids
including
excluding)
"Copy the contents of the COPY formated file to PostgreSQL."
(declare (ignore data-only schema-only
create-tables include-drop foreign-keys
create-indexes reset-sequences materialize-views
set-table-oids including excluding))
(let* ((*on-error-stop* on-error-stop)
(pgconn (target-db copy))
pgsql-catalog)
(handler-case
(with-pgsql-connection (pgconn)
(setf pgsql-catalog
(fetch-pgsql-catalog (db-name pgconn)
:table (target copy)
:variant (pgconn-variant pgconn)
:pgversion (pgconn-major-version pgconn)))
;; if the user didn't tell us the column list of the table, now is
;; a proper time to set it in the copy object
(unless (and (slot-boundp copy 'columns)
(slot-value copy 'columns))
(setf (columns copy)
(mapcar (lambda (col)
;; we need to handle the md-copy format for the
;; column list, which allow for user given
;; options: each column is a list which car is
;; the column name.
(list (column-name col)))
(table-field-list (first (table-list pgsql-catalog))))))
(log-message :data "CATALOG: ~s" pgsql-catalog)
;; this sets (table-index-list (target copy))
(maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes)
;; now is the proper time to truncate, before parallel operations
(when truncate
(truncate-tables pgsql-catalog)))
(cl-postgres:database-error (e)
(log-message :fatal "Failed to prepare target PostgreSQL table.")
(log-message :fatal "~a" e)
(return-from copy-database)))
;; Keep the PostgreSQL table target around in the copy instance,
;; with the following subtleties to deal with:
;; 1. the catalog fetching did fill-in PostgreSQL columns as fields
;; 2. we might target fewer pg columns than the table actually has
(let ((table (first (table-list pgsql-catalog))))
(setf (table-column-list table)
(loop :for column-name :in (mapcar #'first (columns copy))
:collect (find column-name (table-field-list table)
:key #'column-name
:test #'string=)))
(setf (target copy) table))
;; expand the specs of our source, we might have to care about several
;; files actually.
(let* ((lp:*kernel* (make-kernel worker-count))
(channel (lp:make-channel))
(path-list (expand-spec (source copy)))
(task-count 0))
(with-stats-collection ("Files Processed" :section :post
:use-result-as-read t
:use-result-as-rows t)
(loop :for path-spec :in path-list
:count t
:do (let ((table-source (clone-copy-for copy path-spec)))
(when (and (header table-source) (null (fields table-source)))
(parse-header table-source))
(incf task-count
(copy-from table-source
:concurrency concurrency
:kernel lp:*kernel*
:channel channel
:on-error-stop on-error-stop
:disable-triggers disable-triggers)))))
;; end kernel
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(loop :repeat task-count
:do (handler-case
(destructuring-bind (task table seconds)
(lp:receive-result channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds))
(condition (e)
(log-message :fatal "~a" e)))
:finally (progn
(lp:end-kernel :wait nil)
(return task-count))))
(lp:end-kernel :wait t))
;; re-create the indexes from the target table entry
(create-indexes-again (target-db copy)
pgsql-catalog
:max-parallel-create-index max-parallel-create-index
:drop-indexes drop-indexes)))
|