1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
import posixpath
from ftplib import FTP, error_perm
from posixpath import dirname
from typing import IO
def ftp_makedirs_cwd(ftp: FTP, path: str, first_call: bool = True) -> None:
"""Set the current directory of the FTP connection given in the ``ftp``
argument (as a ftplib.FTP object), creating all parent directories if they
don't exist. The ftplib.FTP object must be already connected and logged in.
"""
try:
ftp.cwd(path)
except error_perm:
ftp_makedirs_cwd(ftp, dirname(path), False)
ftp.mkd(path)
if first_call:
ftp.cwd(path)
def ftp_store_file(
*,
path: str,
file: IO[bytes],
host: str,
port: int,
username: str,
password: str,
use_active_mode: bool = False,
overwrite: bool = True,
) -> None:
"""Opens a FTP connection with passed credentials,sets current directory
to the directory extracted from given path, then uploads the file to server
"""
with FTP() as ftp:
ftp.connect(host, port)
ftp.login(username, password)
if use_active_mode:
ftp.set_pasv(False)
file.seek(0)
dirname, filename = posixpath.split(path)
ftp_makedirs_cwd(ftp, dirname)
command = "STOR" if overwrite else "APPE"
ftp.storbinary(f"{command} {filename}", file)
file.close()
|