1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
|
# $Id: io.py 10146 2025-05-27 06:14:22Z milde $
# Author: David Goodger <goodger@python.org>
# Copyright: This module has been placed in the public domain.
"""
I/O classes provide a uniform API for low-level input and output. Subclasses
exist for a variety of input/output mechanisms.
"""
from __future__ import annotations
__docformat__ = 'reStructuredText'
import codecs
import locale
import os
import re
import sys
import warnings
from docutils import TransformSpec
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Any, BinaryIO, ClassVar, Final, Literal, TextIO
from docutils import nodes
from docutils.nodes import StrPath
# Guess the locale's preferred encoding.
# If no valid guess can be made, _locale_encoding is set to `None`:
#
# TODO: check whether this is set correctly with every OS and Python version
# or whether front-end tools need to call `locale.setlocale()`
# before importing this module
try:
# Return locale encoding also in UTF-8 mode
with warnings.catch_warnings():
warnings.simplefilter("ignore")
_locale_encoding: str | None = (locale.getlocale()[1]
or locale.getdefaultlocale()[1]
).lower()
except: # NoQA: E722 (catchall)
# Any problem determining the locale: use None
_locale_encoding = None
try:
codecs.lookup(_locale_encoding)
except (LookupError, TypeError):
_locale_encoding = None
class InputError(OSError): pass
class OutputError(OSError): pass
def check_encoding(stream: TextIO, encoding: str) -> bool | None:
"""Test, whether the encoding of `stream` matches `encoding`.
Returns
:None: if `encoding` or `stream.encoding` are not a valid encoding
argument (e.g. ``None``) or `stream.encoding is missing.
:True: if the encoding argument resolves to the same value as `encoding`,
:False: if the encodings differ.
"""
try:
return codecs.lookup(stream.encoding) == codecs.lookup(encoding)
except (LookupError, AttributeError, TypeError):
return None
def error_string(err: BaseException) -> str:
"""Return string representation of Exception `err`.
"""
return f'{err.__class__.__name__}: {err}'
class Input(TransformSpec):
"""
Abstract base class for input wrappers.
Docutils input objects must provide a `read()` method that
returns the source, typically as `str` instance.
Inheriting `TransformSpec` allows input objects to add "transforms" to
the "Transformer". (Since Docutils 0.19, input objects are no longer
required to be `TransformSpec` instances.)
"""
component_type: Final = 'input'
default_source_path: ClassVar[str | None] = None
def __init__(
self,
source: str | TextIO | nodes.document | None = None,
source_path: StrPath | None = None,
encoding: str | Literal['unicode'] | None = 'utf-8',
error_handler: str | None = 'strict',
) -> None:
self.encoding = encoding
"""Text encoding for the input source."""
self.error_handler = error_handler
"""Text decoding error handler."""
self.source = source
"""The source of input data."""
self.source_path = source_path
"""A text reference to the source."""
if not source_path:
self.source_path = self.default_source_path
self.successful_encoding = None
"""The encoding that successfully decoded the source data."""
def __repr__(self) -> str:
return '%s: source=%r, source_path=%r' % (self.__class__, self.source,
self.source_path)
def read(self) -> str:
"""Return input as `str`. Define in subclasses."""
raise NotImplementedError
def decode(self, data: str | bytes) -> str:
"""
Decode `data` if required.
Return Unicode `str` instances unchanged (nothing to decode).
If `self.encoding` is None, determine encoding from data
or try UTF-8 and the locale's preferred encoding.
The client application should call ``locale.setlocale()`` at the
beginning of processing::
locale.setlocale(locale.LC_ALL, '')
Raise UnicodeError if unsuccessful.
Provisional: encoding detection will be removed in Docutils 1.0.
"""
if self.encoding and self.encoding.lower() == 'unicode':
assert isinstance(data, str), ('input encoding is "unicode" '
'but `data` is no `str` instance')
if isinstance(data, str):
# nothing to decode
return data
if self.encoding:
# We believe the user/application when the encoding is
# explicitly given.
encoding_candidates = [self.encoding]
else:
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning)
data_encoding = self.determine_encoding_from_data(data)
if data_encoding:
# `data` declares its encoding with "magic comment" or BOM,
encoding_candidates = [data_encoding]
else:
# Apply heuristics if the encoding is not specified.
# Start with UTF-8, because that only matches
# data that *IS* UTF-8:
encoding_candidates = ['utf-8']
# If UTF-8 fails, fall back to the locale's preferred encoding:
if sys.version_info[:2] >= (3, 11):
fallback = locale.getencoding()
else:
fallback = locale.getpreferredencoding(do_setlocale=False)
if fallback and fallback.lower() != 'utf-8':
encoding_candidates.append(fallback)
if not self.encoding and encoding_candidates[0] != 'utf-8':
warnings.warn('Input encoding auto-detection will be removed and '
'the encoding values None and "" become invalid '
'in Docutils 1.0.', DeprecationWarning, stacklevel=2)
for enc in encoding_candidates:
try:
decoded = str(data, enc, self.error_handler)
self.successful_encoding = enc
return decoded
except (UnicodeError, LookupError) as err:
# keep exception instance for use outside of the "for" loop.
error = err
raise UnicodeError(
'Unable to decode input data. Tried the following encodings: '
f'{", ".join(repr(enc) for enc in encoding_candidates)}.\n'
f'({error_string(error)})')
coding_slug: ClassVar[re.Pattern[bytes]] = re.compile(
br'coding[:=]\s*([-\w.]+)'
)
"""Encoding declaration pattern."""
byte_order_marks: ClassVar[tuple[tuple[bytes, str], ...]] = (
(codecs.BOM_UTF32_BE, 'utf-32'),
(codecs.BOM_UTF32_LE, 'utf-32'),
(codecs.BOM_UTF8, 'utf-8-sig'),
(codecs.BOM_UTF16_BE, 'utf-16'),
(codecs.BOM_UTF16_LE, 'utf-16'),
)
"""Sequence of (start_bytes, encoding) tuples for encoding detection.
The first bytes of input data are checked against the start_bytes strings.
A match indicates the given encoding.
Internal. Will be removed in Docutils 1.0.
"""
def determine_encoding_from_data(self, data: bytes) -> str | None:
"""
Try to determine the encoding of `data` by looking *in* `data`.
Check for a byte order mark (BOM) or an encoding declaration.
Deprecated. Will be removed in Docutils 1.0.
"""
warnings.warn('docutils.io.Input.determine_encoding_from_data() '
'will be removed in Docutils 1.0.',
DeprecationWarning, stacklevel=2)
# check for a byte order mark:
for start_bytes, encoding in self.byte_order_marks:
if data.startswith(start_bytes):
return encoding
# check for an encoding declaration pattern in first 2 lines of file:
for line in data.splitlines()[:2]:
match = self.coding_slug.search(line)
if match:
return match.group(1).decode('ascii')
return None
def isatty(self) -> bool:
"""Return True, if the input source is connected to a TTY device."""
try:
return self.source.isatty()
except AttributeError:
return False
class Output(TransformSpec):
"""
Abstract base class for output wrappers.
Docutils output objects must provide a `write()` method that
expects and handles one argument (the output).
Inheriting `TransformSpec` allows output objects to add "transforms" to
the "Transformer". (Since Docutils 0.19, output objects are no longer
required to be `TransformSpec` instances.)
"""
component_type: Final = 'output'
default_destination_path: ClassVar[str | None] = None
def __init__(
self,
destination: TextIO | str | bytes | None = None,
destination_path: StrPath | None = None,
encoding: str | None = None,
error_handler: str | None = 'strict',
) -> None:
self.encoding: str | None = encoding
"""Text encoding for the output destination."""
self.error_handler: str = error_handler or 'strict'
"""Text encoding error handler."""
self.destination: TextIO | str | bytes | None = destination
"""The destination for output data."""
self.destination_path: StrPath | None = destination_path
"""A text reference to the destination."""
if not destination_path:
self.destination_path = self.default_destination_path
def __repr__(self) -> str:
return ('%s: destination=%r, destination_path=%r'
% (self.__class__, self.destination, self.destination_path))
def write(self, data: str | bytes) -> str | bytes | None:
"""Write `data`. Define in subclasses."""
raise NotImplementedError
def encode(self, data: str | bytes) -> str | bytes:
"""
Encode and return `data`.
If `data` is a `bytes` instance, it is returned unchanged.
Otherwise it is encoded with `self.encoding`.
Provisional: If `self.encoding` is set to the pseudo encoding name
"unicode", `data` must be a `str` instance and is returned unchanged.
"""
if self.encoding and self.encoding.lower() == 'unicode':
assert isinstance(data, str), ('output encoding is "unicode" '
'but `data` is no `str` instance')
return data
if not isinstance(data, str):
# Non-unicode (e.g. bytes) output.
return data
else:
return data.encode(self.encoding, self.error_handler)
class ErrorOutput:
"""
Wrapper class for file-like error streams with
failsafe de- and encoding of `str`, `bytes`, and `Exception` instances.
"""
def __init__(
self,
destination: TextIO | BinaryIO | str | Literal[False] | None = None,
encoding: str | None = None,
encoding_errors: str = 'backslashreplace',
decoding_errors: str = 'replace',
) -> None:
"""
:Parameters:
- `destination`: a file-like object,
a string (path to a file),
`None` (write to `sys.stderr`, default), or
evaluating to `False` (write() requests are ignored).
- `encoding`: `destination` text encoding. Guessed if None.
- `encoding_errors`: how to treat encoding errors.
"""
if destination is None:
destination = sys.stderr
elif not destination:
destination = False
# if `destination` is a file name, open it
elif isinstance(destination, str):
destination = open(destination, 'w')
self.destination: TextIO | BinaryIO | Literal[False] = destination
"""Where warning output is sent."""
self.encoding: str = (
encoding
or getattr(destination, 'encoding', None)
or _locale_encoding
or 'ascii'
)
"""The output character encoding."""
self.encoding_errors: str = encoding_errors
"""Encoding error handler."""
self.decoding_errors: str = decoding_errors
"""Decoding error handler."""
def write(self, data: str | bytes | Exception) -> None:
"""
Write `data` to self.destination. Ignore, if self.destination is False.
`data` can be a `bytes`, `str`, or `Exception` instance.
"""
if not self.destination:
return
if isinstance(data, Exception):
data = str(data)
# The destination is either opened in text or binary mode.
# If data has the wrong type, try to convert it.
try:
self.destination.write(data)
except UnicodeEncodeError:
# Encoding data from string to bytes failed with the
# destination's encoding and error handler.
# Try again with our own encoding and error handler.
binary = data.encode(self.encoding, self.encoding_errors)
self.destination.write(binary)
except TypeError:
if isinstance(data, str): # destination may expect bytes
binary = data.encode(self.encoding, self.encoding_errors)
self.destination.write(binary)
elif self.destination in (sys.stderr, sys.stdout):
# write bytes to raw stream
self.destination.buffer.write(data)
else:
# destination in text mode, write str
string = data.decode(self.encoding, self.decoding_errors)
self.destination.write(string)
def close(self) -> None:
"""
Close the error-output stream.
Ignored if the destination is` sys.stderr` or `sys.stdout` or has no
close() method.
"""
if self.destination in (sys.stdout, sys.stderr):
return
try:
self.destination.close()
except AttributeError:
pass
def isatty(self) -> bool:
"""Return True, if the destination is connected to a TTY device."""
try:
return self.destination.isatty()
except AttributeError:
return False
class FileInput(Input):
"""
Input for single, simple file-like objects.
"""
def __init__(
self,
source: TextIO | None = None,
source_path: StrPath | None = None,
encoding: str | Literal['unicode'] | None = 'utf-8',
error_handler: str | None = 'strict',
autoclose: bool = True,
mode: Literal['r', 'rb', 'br'] = 'r'
) -> None:
"""
:Parameters:
- `source`: either a file-like object (which is read directly), or
`None` (which implies `sys.stdin` if no `source_path` given).
- `source_path`: a path to a file, which is opened for reading.
- `encoding`: the expected text encoding of the input file.
- `error_handler`: the encoding error handler to use.
- `autoclose`: close automatically after read (except when
`sys.stdin` is the source).
- `mode`: how the file is to be opened (see standard function
`open`). The default is read only ('r').
"""
super().__init__(source, source_path, encoding, error_handler)
self.autoclose = autoclose
self._stderr = ErrorOutput()
if source is None:
if source_path:
try:
self.source = open(source_path, mode,
encoding=self.encoding,
errors=self.error_handler)
except OSError as error:
raise InputError(error.errno, error.strerror, source_path)
else:
self.source = sys.stdin
elif check_encoding(self.source, self.encoding) is False:
# TODO: re-open, warn or raise error?
raise UnicodeError('Encoding clash: encoding given is "%s" '
'but source is opened with encoding "%s".' %
(self.encoding, self.source.encoding))
if not source_path:
try:
self.source_path = self.source.name
except AttributeError:
pass
def read(self) -> str:
"""
Read and decode a single file, return as `str`.
"""
try:
if not self.encoding and hasattr(self.source, 'buffer'):
# read as binary data
data = self.source.buffer.read()
# decode with heuristics
data = self.decode(data)
# normalize newlines
data = '\n'.join(data.splitlines()+[''])
else:
data = self.source.read()
finally:
if self.autoclose:
self.close()
return data
def readlines(self) -> list[str]:
"""
Return lines of a single file as list of strings.
"""
return self.read().splitlines(True)
def close(self) -> None:
if self.source is not sys.stdin:
self.source.close()
class FileOutput(Output):
"""Output for single, simple file-like objects."""
default_destination_path: Final = '<file>'
mode: Literal['w', 'a', 'x', 'wb', 'ab', 'xb', 'bw', 'ba', 'bx'] = 'w'
"""The mode argument for `open()`."""
# 'wb' for binary (e.g. OpenOffice) files (see also `BinaryFileOutput`).
# (Do not use binary mode ('wb') for text files, as this prevents the
# conversion of newlines to the system specific default.)
def __init__(self,
destination: TextIO | None = None,
destination_path: StrPath | None = None,
encoding: str | None = None,
error_handler: str | None = 'strict',
autoclose: bool = True,
handle_io_errors: None = None,
mode=None,
) -> None:
"""
:Parameters:
- `destination`: either a file-like object (which is written
directly) or `None` (which implies `sys.stdout` if no
`destination_path` given).
- `destination_path`: a path to a file, which is opened and then
written.
- `encoding`: the text encoding of the output file.
- `error_handler`: the encoding error handler to use.
- `autoclose`: close automatically after write (except when
`sys.stdout` or `sys.stderr` is the destination).
- `handle_io_errors`: ignored, deprecated, will be removed.
- `mode`: how the file is to be opened (see standard function
`open`). The default is 'w', providing universal newline
support for text files.
"""
super().__init__(
destination, destination_path, encoding, error_handler)
self.opened = True
self.autoclose = autoclose
if handle_io_errors is not None:
warnings.warn('io.FileOutput: init argument "handle_io_errors" '
'is ignored and will be removed in '
'Docutils 2.0.', DeprecationWarning, stacklevel=2)
if mode is not None:
self.mode = mode
self._stderr = ErrorOutput()
if destination is None:
if destination_path:
self.opened = False
else:
self.destination = sys.stdout
elif ( # destination is file-type object -> check mode:
mode and hasattr(self.destination, 'mode')
and mode != self.destination.mode):
print('Warning: Destination mode "%s" differs from specified '
'mode "%s"' % (self.destination.mode, mode),
file=self._stderr)
if not destination_path:
try:
self.destination_path = self.destination.name
except AttributeError:
pass
def open(self) -> None:
# Specify encoding
if 'b' not in self.mode:
kwargs = {'encoding': self.encoding,
'errors': self.error_handler}
else:
kwargs = {}
try:
self.destination = open(self.destination_path, self.mode, **kwargs)
except OSError as error:
raise OutputError(error.errno, error.strerror,
self.destination_path)
self.opened = True
def write(self, data: str | bytes) -> str | bytes:
"""Write `data` to a single file, also return it.
`data` can be a `str` or `bytes` instance.
If writing `bytes` fails, an attempt is made to write to
the low-level interface ``self.destination.buffer``.
If `data` is a `str` instance and `self.encoding` and
`self.destination.encoding` are set to different values, `data`
is encoded to a `bytes` instance using `self.encoding`.
Provisional: future versions may raise an error if `self.encoding`
and `self.destination.encoding` are set to different values.
"""
if not self.opened:
self.open()
if (isinstance(data, str)
and check_encoding(self.destination, self.encoding) is False):
if os.linesep != '\n':
data = data.replace('\n', os.linesep) # fix endings
data = self.encode(data)
try:
self.destination.write(data)
except TypeError as err:
if isinstance(data, bytes):
try:
self.destination.buffer.write(data)
except AttributeError:
if check_encoding(self.destination,
self.encoding) is False:
raise ValueError(
f'Encoding of {self.destination_path} '
f'({self.destination.encoding}) differs \n'
f' from specified encoding ({self.encoding})')
else:
raise err
except (UnicodeError, LookupError) as err:
raise UnicodeError(
'Unable to encode output data. output-encoding is: '
f'{self.encoding}.\n({error_string(err)})')
finally:
if self.autoclose:
self.close()
return data
def close(self) -> None:
if self.destination not in (sys.stdout, sys.stderr):
self.destination.close()
self.opened = False
class BinaryFileOutput(FileOutput):
"""
A version of docutils.io.FileOutput which writes to a binary file.
Deprecated. Use `FileOutput` (works with `bytes` since Docutils 0.20).
Will be removed in Docutils 0.24.
"""
# Used by core.publish_cmdline_to_binary() which is also deprecated.
mode = 'wb'
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn('"BinaryFileOutput" is obsoleted by "FileOutput"'
' and will be removed in Docutils 0.24.',
DeprecationWarning, stacklevel=2)
super().__init__(*args, **kwargs)
class StringInput(Input):
"""Input from a `str` or `bytes` instance."""
source: str | bytes
default_source_path: Final = '<string>'
def read(self) -> str:
"""Return the source as `str` instance.
Decode, if required (see `Input.decode`).
"""
return self.decode(self.source)
class StringOutput(Output):
"""Output to a `bytes` or `str` instance.
Provisional.
"""
destination: str | bytes
default_destination_path: Final = '<string>'
def write(self, data: str | bytes) -> str | bytes:
"""Store `data` in `self.destination`, and return it.
If `self.encoding` is set to the pseudo encoding name "unicode",
`data` must be a `str` instance and is stored/returned unchanged
(cf. `Output.encode`).
Otherwise, `data` can be a `bytes` or `str` instance and is
stored/returned as a `bytes` instance
(`str` data is encoded with `self.encode()`).
Attention: the `output_encoding`_ setting may affect the content
of the output (e.g. an encoding declaration in HTML or XML or the
representation of characters as LaTeX macro vs. literal character).
"""
self.destination = self.encode(data)
return self.destination
class NullInput(Input):
"""Degenerate input: read nothing."""
source: None
default_source_path: Final = 'null input'
def read(self) -> str:
"""Return an empty string."""
return ''
class NullOutput(Output):
"""Degenerate output: write nothing."""
destination: None
default_destination_path: Final = 'null output'
def write(self, data: str | bytes) -> None:
"""Do nothing, return None."""
class DocTreeInput(Input):
"""
Adapter for document tree input.
The document tree must be passed in the ``source`` parameter.
"""
source: nodes.document
default_source_path: Final = 'doctree input'
def read(self) -> nodes.document:
"""Return the document tree."""
return self.source
|