File: ia_copy.py

package info (click to toggle)
python-internetarchive 5.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,048 kB
  • sloc: python: 8,208; makefile: 180; xml: 180
file content (164 lines) | stat: -rw-r--r-- 6,458 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
ia_copy.py

'ia' subcommand for copying files on archive.org
"""

# Copyright (C) 2012-2024 Internet Archive
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

import argparse
import sys
from typing import Optional
from urllib.parse import quote

from requests import Response

import internetarchive as ia
from internetarchive.cli.cli_utils import MetadataAction, QueryStringAction
from internetarchive.utils import get_s3_xml_text, merge_dictionaries


def setup(subparsers):
    """
    Setup args for copy command.

    Args:
        subparsers: subparser object passed from ia.py
    """
    parser = subparsers.add_parser("copy",
                                   aliases=["cp"],
                                   help="Copy files from archive.org items")
    # Positional arguments
    parser.add_argument("source",
                        metavar="SOURCE",
                        help="Source file formatted as: identifier/file")
    parser.add_argument("destination",
                        metavar="DESTINATION",
                        help="Destination file formatted as: identifier/file")

    # Options
    parser.add_argument("-m", "--metadata",
                        metavar="KEY:VALUE",
                        nargs="+",
                        default={},
                        action=MetadataAction,
                        help=("Metadata to add to your new item, if you are moving the "
                              "file to a new item"))
    parser.add_argument("--replace-metadata",
                        action="store_true",
                        help=("Only use metadata specified as argument, do not copy any "
                              "from the source item"))
    parser.add_argument("-H", "--header",
                        metavar="KEY:VALUE",
                        nargs="+",
                        default={},
                        action=QueryStringAction,
                        help="S3 HTTP headers to send with your request")
    parser.add_argument("--ignore-file-metadata",
                        action="store_true",
                        help="Do not copy file metadata")
    parser.add_argument("-n", "--no-derive",
                        action="store_true",
                        help="Do not derive uploaded files")
    parser.add_argument("--no-backup",
                        action="store_true",
                        help=("Turn off archive.org backups, "
                              "clobbered files will not be saved to "
                              "'history/files/$key.~N~'"))

    parser.set_defaults(func=lambda args: main(args, "copy", parser))


def assert_src_file_exists(src_location: str) -> bool:
    """
    Assert that the source file exists on archive.org.
    """
    assert SRC_ITEM.exists  # type: ignore
    global SRC_FILE
    src_filename = src_location.split("/", 1)[-1]
    SRC_FILE = SRC_ITEM.get_file(src_filename)  # type: ignore
    assert SRC_FILE.exists  # type: ignore
    return True


def main(args: argparse.Namespace,
         cmd: str,
         parser: argparse.ArgumentParser) -> tuple[Response, ia.files.File | None]:
    """
    Main entry point for 'ia copy'.
    """
    SRC_FILE = None

    if args.source == args.destination:
        parser.error("error: The source and destination files cannot be the same!")

    global SRC_ITEM
    SRC_ITEM = args.session.get_item(args.source.split("/")[0])  # type: ignore
    SRC_FILE = SRC_ITEM.get_file(args.source.split("/",1)[-1])  # type: ignore

    try:
        assert_src_file_exists(args.source)
    except AssertionError:
        parser.error(f"error: https://{args.session.host}/download/{args.source} "
                      "does not exist. Please check the "
                      "identifier and filepath and retry.")

    args.header["x-amz-copy-source"] = f"/{quote(args.source)}"
    # Copy the old metadata verbatim if no additional metadata is supplied,
    # else combine the old and the new metadata in a sensible manner.
    if args.metadata or args.replace_metadata:
        args.header["x-amz-metadata-directive"] = "REPLACE"
    else:
        args.header["x-amz-metadata-directive"] = "COPY"

    # New metadata takes precedence over old metadata.
    if not args.replace_metadata:
        args.metadata = merge_dictionaries(SRC_ITEM.metadata,  # type: ignore
                                           args.metadata)

    # File metadata is copied by default but can be dropped.
    file_metadata = None if args.ignore_file_metadata else SRC_FILE.metadata  # type: ignore

    # Add keep-old-version by default.
    if not args.header.get("x-archive-keep-old-version") and not args.no_backup:
        args.header["x-archive-keep-old-version"] = "1"

    url = f"{args.session.protocol}//s3.us.archive.org/{quote(args.destination)}"
    queue_derive = not args.no_derive
    req = ia.iarequest.S3Request(url=url,
                                 method="PUT",
                                 metadata=args.metadata,
                                 file_metadata=file_metadata,
                                 headers=args.header,
                                 queue_derive=queue_derive,
                                 access_key=args.session.access_key,
                                 secret_key=args.session.secret_key)
    p = req.prepare()
    r = args.session.send(p)
    if r.status_code != 200:
        try:
            msg = get_s3_xml_text(r.text)
        except Exception as e:
            msg = r.text
        print(f"error: failed to {cmd} '{args.source}' to '{args.destination}' - {msg}",
              file=sys.stderr)
        sys.exit(1)
    elif cmd == "copy":
        print(f"success: copied '{args.source}' to '{args.destination}'.",
              file=sys.stderr)
    return (r, SRC_FILE)