1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
|
#!/usr/bin/env python3
import argparse
import codecs
import datetime
import getpass
import glob
import hashlib
import os
import socket
import ssl
import sys
import time
import urllib.parse
import warnings
from ssl import CertificateError
import ansicat
import offutils
from offutils import xdg
try:
import chardet
_HAS_CHARDET = True
except ModuleNotFoundError:
_HAS_CHARDET = False
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
_HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except (ModuleNotFoundError, ImportError):
_HAS_CRYPTOGRAPHY = False
try:
with warnings.catch_warnings():
# Disable annoying warning shown to LibreSSL users
warnings.simplefilter("ignore")
import requests
_DO_HTTP = True
except (ModuleNotFoundError, ImportError):
_DO_HTTP = False
# This list is also used as a list of supported protocols
standard_ports = {
"gemini": 1965,
"gopher": 70,
"finger": 79,
"http": 80,
"https": 443,
"spartan": 300,
}
default_protocol = "gemini"
CRLF = "\r\n"
DEFAULT_TIMEOUT = 10
_MAX_REDIRECTS = 5
# monkey-patch Gemini support in urllib.parse
# see https://github.com/python/cpython/blob/master/Lib/urllib/parse.py
urllib.parse.uses_relative.append("gemini")
urllib.parse.uses_netloc.append("gemini")
urllib.parse.uses_relative.append("spartan")
urllib.parse.uses_netloc.append("spartan")
class UserAbortException(Exception):
pass
def parse_mime(mime):
options = {}
if mime:
if ";" in mime:
splited = mime.split(";", maxsplit=1)
mime = splited[0]
if len(splited) >= 1:
options_list = splited[1].split()
for o in options_list:
spl = o.split("=", maxsplit=1)
if len(spl) > 0:
options[spl[0]] = spl[1]
return mime, options
def normalize_url(url):
if "://" not in url and ("./" not in url and url[0] != "/"):
if not url.startswith("mailto:"):
url = "gemini://" + url
return url
def cache_last_modified(url):
if not url:
return None
path = get_cache_path(url)
if path and os.path.isfile(path):
return os.path.getmtime(path)
else:
return None
def is_cache_valid(url, validity=0):
# Validity is the acceptable time for
# a cache to be valid (in seconds)
# If 0, then any cache is considered as valid
# (use validity = 1 if you want to refresh everything)
if offutils.is_local(url):
return True
cache = get_cache_path(url)
if cache:
# If path is too long, we always return True to avoid
# fetching it.
if len(cache) > 259:
print("We return False because path is too long")
return False
if os.path.exists(cache) and not os.path.isdir(cache):
if validity > 0:
last_modification = cache_last_modified(url)
now = time.time()
age = now - last_modification
return age < validity
else:
return True
else:
# Cache has not been build
return False
else:
# There’s not even a cache!
return False
def get_cache_path(url, add_index=True):
# Sometimes, cache_path became a folder! (which happens for index.html/index.gmi)
# In that case, we need to reconstruct it
# if add_index=False, we don’t add that "index.gmi" at the ends of the cache_path
# First, we parse the URL
if not url:
return None
parsed = urllib.parse.urlparse(url)
if url[0] == "/" or url.startswith("./") or os.path.exists(url):
scheme = "file"
elif parsed.scheme:
scheme = parsed.scheme
else:
scheme = default_protocol
if scheme in ["file", "mailto", "list"]:
local = True
host = ""
port = None
# file:// is 7 char
if url.startswith("file://"):
path = url[7:]
elif scheme == "mailto":
path = parsed.path
elif url.startswith("list://"):
listdir = os.path.join(xdg("data"), "lists")
listname = url[7:].lstrip("/")
if listname in [""]:
name = "My Lists"
path = listdir
else:
name = listname
path = os.path.join(listdir, "%s.gmi" % listname)
else:
path = url
else:
local = False
# Convert unicode hostname to punycode using idna RFC3490
host = parsed.netloc # .encode("idna").decode()
try:
port = parsed.port or standard_ports.get(scheme, 0)
except ValueError:
port = standard_ports.get(scheme, 0)
# special gopher selector case
if scheme == "gopher":
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
path = parsed.path[2:]
else:
itemtype = "1"
path = ""
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9", "g", "I", "s", ";"):
mime = "binary"
else:
mime = "text/gopher"
else:
path = parsed.path
if parsed.query:
# we don’t add the query if path is too long because path above 260 char
# are not supported and crash python.
# Also, very long query are usually useless stuff
if len(path + parsed.query) < 258:
path += "/" + parsed.query
# Now, we have a partial path. Let’s make it full path.
if local:
cache_path = path
elif scheme and host:
cache_path = os.path.expanduser(xdg("cache") + scheme + "/" + host + path)
# There’s an OS limitation of 260 characters per path.
# We will thus cut the path enough to add the index afterward
cache_path = cache_path[:249]
# this is a gross hack to give a name to
# index files. This will break if the index is not
# index.gmi. I don’t know how to know the real name
# of the file. But first, we need to ensure that the domain name
# finish by "/". Else, the cache will create a file, not a folder.
if scheme.startswith("http"):
index = "index.html"
elif scheme == "finger":
index = "index.txt"
elif scheme == "gopher":
index = "gophermap"
else:
index = "index.gmi"
if path == "" or os.path.isdir(cache_path):
if not cache_path.endswith("/"):
cache_path += "/"
if not url.endswith("/"):
url += "/"
if add_index and cache_path.endswith("/"):
cache_path += index
# sometimes, the index itself is a dir
# like when folder/index.gmi?param has been created
# and we try to access folder
if add_index and os.path.isdir(cache_path):
cache_path += "/" + index
else:
# URL is missing either a supported scheme or a valid host
# print("Error: %s is not a supported url"%url)
return None
if len(cache_path) > 259:
#print("Path is %s characters long which is too long. \
# OS only allows 260 characters.\n\n"%(len(cache_path)))
#print(url)
#return None
# path lenght is limited to 260 charaters. Let’s cut it and
# hope that there’s no major conflict here. (that’s still better
# than crashing, after all.
cache_path = cache_path[:259]
return cache_path
def write_body(url, body, mime=None):
# body is a copy of the raw gemtext
# Write_body() also create the cache !
# DEFAULT GEMINI MIME
mime, options = parse_mime(mime)
cache_path = get_cache_path(url)
if cache_path:
if mime and mime.startswith("text/"):
mode = "w"
else:
mode = "wb"
cache_dir = os.path.dirname(cache_path)
# If the subdirectory already exists as a file (not a folder)
# We remove it (happens when accessing URL/subfolder before
# URL/subfolder/file.gmi.
# This causes loss of data in the cache
# proper solution would be to save "sufolder" as "sufolder/index.gmi"
# If the subdirectory doesn’t exist, we recursively try to find one
# until it exists to avoid a file blocking the creation of folders
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir, exist_ok=True)
with open(cache_path, mode=mode) as f:
f.write(body)
f.close()
return cache_path
def set_error(url, err):
# If we get an error, we want to keep an existing cache
# but we need to touch it or to create an empty one
# to avoid hitting the error at each refresh
cache = get_cache_path(url)
if is_cache_valid(url):
os.utime(cache)
elif cache:
cache_dir = os.path.dirname(cache)
root_dir = cache_dir
while not os.path.exists(root_dir):
root_dir = os.path.dirname(root_dir)
if os.path.isfile(root_dir):
os.remove(root_dir)
os.makedirs(cache_dir, exist_ok=True)
if os.path.isdir(cache_dir):
with open(cache, "w") as c:
c.write(str(datetime.datetime.now()) + "\n")
c.write("ERROR while caching %s\n\n" % url)
c.write("*****\n\n")
c.write(str(type(err)) + " = " + str(err))
# cache.write("\n" + str(err.with_traceback(None)))
c.write("\n*****\n\n")
c.write("If you believe this error was temporary, type " "reload" ".\n")
c.write("The resource will be tentatively fetched during next sync.\n")
c.close()
return cache
def _fetch_http(
url,
max_size=None,
timeout=DEFAULT_TIMEOUT,
accept_bad_ssl_certificates=False,
force_large_download=False,
**kwargs,
):
if not _DO_HTTP:
return None
def too_large_error(url, length, max_size):
err = "Size of %s is %s Mo\n" % (url, length)
err += "Offpunk only download automatically content under %s Mo\n" % (
max_size / 1000000
)
err += "To retrieve this content anyway, type 'reload'."
return set_error(url, err)
if accept_bad_ssl_certificates:
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "ALL:@SECLEVEL=1"
requests.packages.urllib3.disable_warnings()
verify = False
else:
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "ALL:@SECLEVEL=2"
verify = True
header = {}
header["User-Agent"] = "Offpunk/Netcache - https://offpunk.net"
with requests.get(
url, verify=verify, headers=header, stream=True, timeout=DEFAULT_TIMEOUT
) as response:
if "content-type" in response.headers:
mime = response.headers["content-type"]
else:
mime = None
if "content-length" in response.headers:
length = int(response.headers["content-length"])
else:
length = 0
if not force_large_download and max_size and length > max_size:
response.close()
return too_large_error(url, str(length / 100), max_size)
elif not force_large_download and max_size and length == 0:
body = b""
downloaded = 0
for r in response.iter_content():
body += r
# We divide max_size for streamed content
# in order to catch them faster
size = sys.getsizeof(body)
max = max_size / 2
current = round(size * 100 / max, 1)
if current > downloaded:
downloaded = current
print(
" -> Receiving stream: %s%% of allowed data" % downloaded,
end="\r",
)
# print("size: %s (%s\% of maxlenght)"%(size,size/max_size))
if size > max_size / 2:
response.close()
return too_large_error(url, "streaming", max_size)
response.close()
else:
body = response.content
response.close()
if mime and "text/" in mime:
body = body.decode("UTF-8", "replace")
cache = write_body(url, body, mime)
return cache
def _fetch_gopher(url, timeout=DEFAULT_TIMEOUT, **kwargs):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or 70
if len(parsed.path) >= 2:
itemtype = parsed.path[1]
selector = parsed.path[2:]
else:
itemtype = "1"
selector = ""
addresses = socket.getaddrinfo(host, port, family=0, type=socket.SOCK_STREAM)
s = socket.create_connection((host, port))
for address in addresses:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
try:
s.connect(address[4])
break
except OSError as e:
err = e
if parsed.query:
request = selector + "\t" + parsed.query
elif itemtype == "7":
user_input = input("> ")
request = selector + "\t" + user_input
else:
request = selector
request += "\r\n"
s.sendall(request.encode("UTF-8"))
response1 = s.makefile("rb")
response = response1.read()
# Transcode response into UTF-8
# if itemtype in ("0","1","h"):
if itemtype not in ("9", "g", "I", "s", ";"):
# Try most common encodings
for encoding in ("UTF-8", "ISO-8859-1"):
try:
response = response.decode("UTF-8")
break
except UnicodeDecodeError:
pass
else:
# try to find encoding
if _HAS_CHARDET:
detected = chardet.detect(response)
response = response.decode(detected["encoding"])
else:
raise UnicodeDecodeError
if itemtype == "0":
mime = "text/gemini"
elif itemtype == "1":
mime = "text/gopher"
elif itemtype == "h":
mime = "text/html"
elif itemtype in ("9", "g", "I", "s", ";"):
mime = None
else:
# by default, we should consider Gopher
mime = "text/gopher"
cache = write_body(url, response, mime)
return cache
def _fetch_finger(url, timeout=DEFAULT_TIMEOUT, **kwargs):
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or standard_ports["finger"]
query = parsed.path.lstrip("/") + "\r\n"
with socket.create_connection((host, port)) as sock:
sock.settimeout(timeout)
sock.send(query.encode())
response = sock.makefile("rb").read().decode("UTF-8")
cache = write_body(response, "text/plain")
return cache
# Originally copied from reference spartan client by Michael Lazar
def _fetch_spartan(url, **kwargs):
cache = None
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
port = url_parts.port or standard_ports["spartan"]
path = url_parts.path or "/"
query = url_parts.query
redirect_url = None
with socket.create_connection((host, port)) as sock:
if query:
data = urllib.parse.unquote_to_bytes(query)
else:
data = b""
encoded_host = host.encode("idna")
ascii_path = urllib.parse.unquote_to_bytes(path)
encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii")
sock.send(b"%s %s %d\r\n" % (encoded_host, encoded_path, len(data)))
fp = sock.makefile("rb")
response = fp.readline(4096).decode("ascii").strip("\r\n")
parts = response.split(" ", maxsplit=1)
code, meta = int(parts[0]), parts[1]
if code == 2:
body = fp.read()
if meta.startswith("text"):
body = body.decode("UTF-8")
cache = write_body(url, body, meta)
elif code == 3:
redirect_url = url_parts._replace(path=meta).geturl()
else:
return set_error(url, "Spartan code %s: Error %s" % (code, meta))
if redirect_url:
cache = _fetch_spartan(redirect_url)
return cache
def _validate_cert(address, host, cert, accept_bad_ssl=False, automatic_choice=None):
"""
Validate a TLS certificate in TOFU mode.
If the cryptography module is installed:
- Check the certificate Common Name or SAN matches `host`
- Check the certificate's not valid before date is in the past
- Check the certificate's not valid after date is in the future
Whether the cryptography module is installed or not, check the
certificate's fingerprint against the TOFU database to see if we've
previously encountered a different certificate for this IP address and
hostname.
"""
now = datetime.datetime.now(datetime.timezone.utc)
if _HAS_CRYPTOGRAPHY:
# Using the cryptography module we can get detailed access
# to the properties of even self-signed certs, unlike in
# the standard ssl library...
c = x509.load_der_x509_certificate(cert, _BACKEND)
# Check certificate validity dates
if accept_bad_ssl:
if c.not_valid_before >= now:
raise CertificateError(
"Certificate not valid until: {}!".format(c.not_valid_before)
)
elif c.not_valid_after <= now:
raise CertificateError(
"Certificate expired as of: {})!".format(c.not_valid_after)
)
# Check certificate hostnames
names = []
common_name = c.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if common_name:
names.append(common_name[0].value)
try:
names.extend(
[
alt.value
for alt in c.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
).value
]
)
except x509.ExtensionNotFound:
pass
names = set(names)
for name in names:
try:
ssl._dnsname_match(str(name), host)
break
except CertificateError:
continue
else:
# If we didn't break out, none of the names were valid
raise CertificateError(
"Hostname does not match certificate common name or any alternative names."
)
sha = hashlib.sha256()
sha.update(cert)
fingerprint = sha.hexdigest()
# The directory of this host and IP-address, e.g.
# ~/.local/share/offpunk/certs/srht.site/46.23.81.157/
certdir = os.path.join(xdg("data"), "certs")
hostdir = os.path.join(certdir, host)
sitedir = os.path.join(hostdir, address)
# 1. We check through cached certificates do extract the
# most_frequent_cert and to see if one is matching the current one.
# 2. If we have no match but one valid most_frequent_cert, we do the
# "throws warning" code.
# 3. If no certificate directory or no valid cached certificates, we do
# the "First-Use" routine.
most_frequent_cert = None
matching_fingerprint = False
# 1. Have we been here before? (the directory exists)
if os.path.isdir(sitedir):
max_count = 0
files = os.listdir(sitedir)
count = 0
certcache = os.path.join(xdg("config"), "cert_cache")
for cached_fingerprint in files:
filepath = os.path.join(sitedir, cached_fingerprint)
certpath = os.path.join(certcache, cached_fingerprint + ".crt")
with open(filepath, "r") as f:
count = int(f.read())
if os.path.exists(certpath):
if count > max_count:
max_count = count
most_frequent_cert = cached_fingerprint
if fingerprint == cached_fingerprint:
# Matched!
# Increase the counter for this certificate (this also updates
# the modification time of the file)
with open(filepath, "w") as f:
f.write(str(count + 1))
matching_fingerprint = True
break
# 2. Do we have some certificates but none of them is matching the current one?
if most_frequent_cert and not matching_fingerprint:
with open(os.path.join(certcache, most_frequent_cert + ".crt"), "rb") as fp:
previous_cert = fp.read()
if _HAS_CRYPTOGRAPHY:
# Load the most frequently seen certificate to see if it has
# expired
previous_cert = x509.load_der_x509_certificate(previous_cert, _BACKEND)
previous_ttl = previous_cert.not_valid_after_utc - now
print(previous_ttl)
print("****************************************")
print("[SECURITY WARNING] Unrecognised certificate!")
print(
"The certificate presented for {} ({}) has never been seen before.".format(
host, address
)
)
print("This MIGHT be a Man-in-the-Middle attack.")
print(
"A different certificate has previously been seen {} times.".format(
max_count
)
)
if _HAS_CRYPTOGRAPHY:
if previous_ttl < datetime.timedelta():
print("That certificate has expired, which reduces suspicion somewhat.")
else:
print("That certificate is still valid for: {}".format(previous_ttl))
print("****************************************")
print("Attempt to verify the new certificate fingerprint out-of-band:")
print(fingerprint)
if automatic_choice:
choice = automatic_choice
else:
choice = input("Accept this new certificate? Y/N ").strip().lower()
if choice in ("y", "yes"):
with open(os.path.join(sitedir, fingerprint), "w") as fp:
fp.write("1")
with open(os.path.join(certcache, fingerprint + ".crt"), "wb") as fp:
fp.write(cert)
else:
raise Exception("TOFU Failure!")
# 3. If no directory or no cert found in it, we cache it
if not most_frequent_cert:
if not os.path.exists(certdir): # XDG_DATA/offpunk/certs
os.makedirs(certdir)
if not os.path.exists(hostdir): # XDG_DATA/offpunk/certs/site.net
os.makedirs(hostdir)
if not os.path.exists(
sitedir
): # XDG_DATA/offpunk/certs/site.net/123.123.123.123
os.makedirs(sitedir)
with open(os.path.join(sitedir, fingerprint), "w") as fp:
fp.write("1")
certcache = os.path.join(xdg("config"), "cert_cache")
if not os.path.exists(certcache):
os.makedirs(certcache)
with open(os.path.join(certcache, fingerprint + ".crt"), "wb") as fp:
fp.write(cert)
def _get_client_certkey(site_id: str, host: str):
# returns {cert: str, key: str}
certdir = os.path.join(xdg("data"), "certs", host)
certf = os.path.join(certdir, "%s.cert" % site_id)
keyf = os.path.join(certdir, "%s.key" % site_id)
if not os.path.exists(certf) or not os.path.exists(keyf):
if host != "":
split = host.split(".")
# if len(split) > 2: # Why not allow a global identity? Maybe I want
# to login to all sites with the same
# certificate.
return _get_client_certkey(site_id, ".".join(split[1:]))
return None
certkey = dict(cert=certf, key=keyf)
return certkey
def _get_site_ids(url: str):
newurl = normalize_url(url)
u = urllib.parse.urlparse(newurl)
if u.scheme == "gemini" and u.username is None:
certdir = os.path.join(xdg("data"), "certs")
netloc_parts = u.netloc.split(".")
site_ids = []
for i in range(len(netloc_parts), 0, -1):
lasti = ".".join(netloc_parts[-i:])
direc = os.path.join(certdir, lasti)
for certfile in glob.glob(os.path.join(direc, "*.cert")):
site_id = certfile.split("/")[-1].split(".")[-2]
site_ids.append(site_id)
return site_ids
else:
return []
def create_certificate(name: str, days: int, hostname: str):
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
sitecertdir = os.path.join(xdg("data"), "certs", hostname)
keyfile = os.path.join(sitecertdir, name + ".key")
# create the directory of it doesn't exist
os.makedirs(sitecertdir, exist_ok=True)
with open(keyfile, "wb") as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
xname = x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
]
)
# generate the cert, valid a week ago (timekeeping is hard, let's give it a
# little margin). issuer and subject are your name
cert = (
x509.CertificateBuilder()
.subject_name(xname)
.issuer_name(xname)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=7))
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=days))
.sign(key, hashes.SHA256())
)
certfile = os.path.join(sitecertdir, name + ".cert")
with open(certfile, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
def ask_certs(url: str):
certs = get_certs(url)
if len(certs) == 0:
print("There are no certificates available for this site.")
create_cert = input("Do you want to create one? (y/n) ")
if create_cert == "y":
name = input("Name for this certificate: ")
days = input("Validity in days: ")
if name != "" and days.isdigit():
site = urllib.parse.urlparse(url)
create_certificate(name, int(days), site.hostname)
new_url = "gemini://" + name +"@"+ url.split("://")[1]
return(new_url)
else:
print("The name or validity you typed are invalid")
return(url)
else:
return(url)
if len(certs) == 1:
print("The one available certificate for this site is:")
elif len(certs) > 1:
print(
"The", len(certs), "available certificates for this site are:"
)
if len(certs) > 0:
counter = 0
stri = ""
for cert in certs:
stri += "[%s] %s \n" % (counter + 1, cert)
counter += 1
stri += "\n"
stri += "which certificate do you want to use? > "
ans = input(stri)
if ans.isdigit() and 0 < int(ans) <= len(certs):
identity = certs[int(ans) -1]
else:
identity = None
if identity:
new_url = "gemini://" + identity +"@"+ url.split("://")[1]
return(new_url)
return(url) #return the same url, no "cert" attached
def get_certs(url: str):
u = urllib.parse.urlparse(normalize_url(url))
if u.scheme == "gemini":
certdir = os.path.join(xdg("data"), "certs")
netloc_parts = u.netloc.split(".")
site_ids = []
if "@" in netloc_parts[0]:
netloc_parts[0] = netloc_parts[0].split("@")[1]
# certdir does not contemplate ports, so we should take it out here if present
if ":" in netloc_parts[-1]:
netloc_parts[-1] = netloc_parts[-1].split(":")[0]
for i in range(len(netloc_parts), 0, -1):
lasti = ".".join(netloc_parts[-i:])
direc = os.path.join(certdir, lasti)
for certfile in glob.glob(os.path.join(direc, "*.cert")):
site_id = certfile.split("/")[-1].split(".")[-2]
site_ids.append(site_id)
return site_ids
else:
return []
def _fetch_gemini(
url,
timeout=DEFAULT_TIMEOUT,
interactive=True,
accept_bad_ssl_certificates=False,
**kwargs,
):
cache = None
newurl = url
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
site_id = url_parts.username
port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
# In AV-98, this was the _send_request method
# Send a selector to a given host and port.
# Returns the resolved address and binary file with the reply."""
host = host.encode("idna").decode()
# Do DNS resolution
# DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled
if ":" in host:
# This is likely a literal IPv6 address, so we can *only* ask for
# IPv6 addresses or getaddrinfo will complain
family_mask = socket.AF_INET6
elif socket.has_ipv6:
# Accept either IPv4 or IPv6 addresses
family_mask = 0
else:
# IPv4 only
family_mask = socket.AF_INET
addresses = socket.getaddrinfo(
host, port, family=family_mask, type=socket.SOCK_STREAM
)
# Sort addresses so IPv6 ones come first
addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True)
# Continuation of send_request
# Prepare TLS context
protocol = (
ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >= 6 else ssl.PROTOCOL_TLSv1_2
)
context = ssl.SSLContext(protocol)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# When using an identity, use the certificate and key
if site_id:
certkey = _get_client_certkey(site_id, host)
if certkey:
context.load_cert_chain(certkey["cert"], certkey["key"])
else:
print("This identity doesn't exist for this site (or is disabled).")
# Impose minimum TLS version
# In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
context.minimum_version = ssl.TLSVersion.TLSv1_2
# Otherwise, it seems very hard...
# The below is less strict than it ought to be, but trying to disable
# TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures
# with recent versions of OpenSSL. What a mess...
else:
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_SSLv2
# Try to enforce sensible ciphers
try:
context.set_ciphers(
"AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH"
)
except ssl.SSLError:
# Rely on the server to only support sensible things, I guess...
pass
# Connect to remote host by any address possible
err = None
for address in addresses:
try:
s = socket.socket(address[0], address[1])
s.settimeout(timeout)
s = context.wrap_socket(s, server_hostname=host)
s.connect(address[4])
break
except OSError as e:
err = e
else:
# If we couldn't connect to *any* of the addresses, just
# bubble up the exception from the last attempt and deny
# knowledge of earlier failures.
raise err
# Do TOFU
cert = s.getpeercert(binary_form=True)
# Remember that we showed the current cert to this domain...
# TODO : accept badssl and automatic choice
_validate_cert(address[4][0], host, cert, automatic_choice="y")
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
new_host = host
# Handle IPV6 hostname
if ":" in new_host:
new_host = "[" + new_host + "]"
if port != standard_ports["gemini"]:
new_host += ":" + str(port)
url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))
if site_id:
url = urllib.parse.urlunparse(url._replace(netloc=site_id + "@" + new_host))
else:
url = url_no_username
s.sendall((url_no_username + CRLF).encode("UTF-8"))
f = s.makefile(mode="rb")
## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
header = f.readline(1027)
header = urllib.parse.unquote(header.decode("UTF-8"))
if not header or header[-1] != "\n":
raise RuntimeError("Received invalid header from server!")
header = header.strip()
# Validate header
status, meta = header.split(maxsplit=1)
if len(meta) > 1024 or len(status) != 2 or not status.isnumeric():
f.close()
raise RuntimeError("Received invalid header from server!")
# Update redirect loop/maze escaping state
if not status.startswith("3"):
previous_redirectors = set()
# TODO FIXME
else:
# we set a previous_redirectors anyway because refactoring in progress
previous_redirectors = set()
# Handle non-SUCCESS headers, which don't have a response body
# Inputs
if status.startswith("1"):
if interactive:
print(meta)
if status == "11":
user_input = getpass.getpass("> ")
else:
user_input = input("> ")
newurl = url.split("?")[0]
return _fetch_gemini(newurl + "?" + user_input)
else:
return None, None
# Redirects
elif status.startswith("3"):
newurl = urllib.parse.urljoin(url, meta)
if newurl == url:
raise RuntimeError("URL redirects to itself!")
elif newurl in previous_redirectors:
raise RuntimeError("Caught in redirect loop!")
elif len(previous_redirectors) == _MAX_REDIRECTS:
raise RuntimeError(
"Refusing to follow more than %d consecutive redirects!"
% _MAX_REDIRECTS
)
# TODO: redirections handling should be refactored
# elif "interactive" in options and not options["interactive"]:
# follow = self.automatic_choice
# # Never follow cross-domain redirects without asking
# elif new_gi.host.encode("idna") != gi.host.encode("idna"):
# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url)
# # Never follow cross-protocol redirects without asking
# elif new_gi.scheme != gi.scheme:
# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url)
# # Don't follow *any* redirect without asking if auto-follow is off
# elif not self.options["auto_follow_redirects"]:
# follow = input("Follow redirect to %s? (y/n) " % new_gi.url)
# # Otherwise, follow away
else:
follow = "yes"
if follow.strip().lower() not in ("y", "yes"):
raise UserAbortException()
previous_redirectors.add(url)
# if status == "31":
# # Permanent redirect
# self.permanent_redirects[gi.url] = new_gi.url
return _fetch_gemini(newurl, interactive=interactive)
# Errors
elif status.startswith("4") or status.startswith("5"):
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
if interactive:
print("You need to provide a client-certificate to access this page.")
url_with_identity = ask_certs(url)
if (url_with_identity != url):
return fetch(url_with_identity)
error = "You need to provide a client-certificate to access this page.\r\nType \"certs\" to create or re-use one"
raise RuntimeError(error)
# Invalid status
elif not status.startswith("2"):
raise RuntimeError("Server returned undefined status code %s!" % status)
# If we're here, this must be a success and there's a response body
assert status.startswith("2")
mime = meta
# Read the response body over the network
fbody = f.read()
# DEFAULT GEMINI MIME
if mime == "":
mime = "text/gemini; charset=utf-8"
shortmime, mime_options = parse_mime(mime)
if "charset" in mime_options:
try:
codecs.lookup(mime_options["charset"])
except LookupError:
# raise RuntimeError("Header declared unknown encoding %s" % mime_options)
# If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header
mime_options["charset"] = "UTF-8"
if shortmime.startswith("text/"):
# Get the charset and default to UTF-8 in none
encoding = mime_options.get("charset", "UTF-8")
try:
body = fbody.decode(encoding)
except UnicodeError:
raise RuntimeError(
"Could not decode response body using %s\
encoding declared in header!"
% encoding
)
else:
body = fbody
cache = write_body(url, body, mime)
return cache, url
def fetch(
url,
offline=False,
download_image_first=True,
images_mode="readable",
validity=0,
**kwargs,
):
url = normalize_url(url)
newurl = url
path = None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
# First, we look if we have a valid cache, even if offline
# If we are offline, any cache is better than nothing
if is_cache_valid(url, validity=validity) or (
offline and is_cache_valid(url, validity=0)
):
path = get_cache_path(url)
# if the cache is a folder, we should add a "/" at the end of the URL
if not url.endswith("/") and os.path.isdir(
get_cache_path(url, add_index=False)
):
newurl = url + "/"
elif offline and is_cache_valid(url, validity=0):
path = get_cache_path(url)
elif "://" in url and not offline:
try:
scheme = url.split("://")[0]
if scheme not in standard_ports:
if print_error:
print("%s is not a supported protocol" % scheme)
path = None
elif scheme in ("http", "https"):
if _DO_HTTP:
path = _fetch_http(newurl, **kwargs)
else:
print("HTTP requires python-requests")
elif scheme == "gopher":
path = _fetch_gopher(newurl, **kwargs)
elif scheme == "finger":
path = _fetch_finger(newurl, **kwargs)
elif scheme == "gemini":
path, newurl = _fetch_gemini(url, **kwargs)
elif scheme == "spartan":
path, newurl = _fetch_spartan(url, **kwargs)
else:
print("scheme %s not implemented yet" % scheme)
except UserAbortException:
return None, newurl
except Exception as err:
cache = set_error(newurl, err)
# Print an error message
# we fail silently when sync_only
if isinstance(err, socket.gaierror):
if print_error:
print("ERROR: DNS error!")
elif isinstance(err, ConnectionRefusedError):
if print_error:
print("ERROR1: Connection refused!")
elif isinstance(err, ConnectionResetError):
if print_error:
print("ERROR2: Connection reset!")
elif isinstance(err, (TimeoutError, socket.timeout)):
if print_error:
print("""ERROR3: Connection timed out!
Slow internet connection? Use 'set timeout' to be more patient.""")
elif isinstance(err, FileExistsError):
if print_error:
print("""ERROR5: Trying to create a directory which already exists
in the cache : """)
print(err)
elif _DO_HTTP and isinstance(err, requests.exceptions.SSLError):
if print_error:
print("""ERROR6: Bad SSL certificate:\n""")
print(err)
print(
"""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n"""
)
print("""set accept_bad_ssl_certificates True""")
elif _DO_HTTP and isinstance(err, requests.exceptions.ConnectionError):
if print_error:
print("""ERROR7: Cannot connect to URL:\n""")
print(str(err))
else:
if print_error:
import traceback
print("ERROR4: " + str(type(err)) + " : " + str(err))
# print("\n" + str(err.with_traceback(None)))
print(traceback.format_exc())
return cache, newurl
# We download images contained in the document (from full mode)
if not offline and download_image_first and images_mode:
renderer = ansicat.renderer_from_file(path, newurl)
if renderer:
for image in renderer.get_images(mode=images_mode):
# Image should exist, should be an url (not a data image)
# and should not be already cached
if (
image
and not image.startswith("data:image/")
and not is_cache_valid(image)
):
width = offutils.term_width() - 1
toprint = "Downloading %s" % image
toprint = toprint[:width]
toprint += " " * (width - len(toprint))
print(toprint, end="\r")
# d_i_f and images_mode are False/None to avoid recursive downloading
# if that ever happen
fetch(
image,
offline=offline,
download_image_first=False,
images_mode=None,
validity=0,
**kwargs,
)
return path, newurl
def main():
descri = "Netcache is a command-line tool to retrieve, cache and access networked content.\n\
By default, netcache will returns a cached version of a given URL, downloading it \
only if a cache version doesn't exist. A validity duration, in seconds, can also \
be given so netcache downloads the content only if the existing cache is older than the validity."
# Parse arguments
parser = argparse.ArgumentParser(prog="netcache", description=descri)
parser.add_argument(
"--path",
action="store_true",
help="return path to the cache instead of the content of the cache",
)
parser.add_argument(
"--ids",
action="store_true",
help="return a list of id's for the gemini-site instead of the content of the cache",
)
parser.add_argument(
"--offline",
action="store_true",
help="Do not attempt to download, return cached version or error",
)
parser.add_argument(
"--max-size",
type=int,
help="Cancel download of items above that size (value in Mb).",
)
parser.add_argument(
"--timeout",
type=int,
help="Time to wait before cancelling connection (in second).",
)
parser.add_argument(
"--cache-validity",
type=int,
default=0,
help="maximum age, in second, of the cached version before \
redownloading a new version",
)
# No argument: write help
parser.add_argument(
"url",
metavar="URL",
nargs="*",
help="download URL and returns the content or the path to a cached version",
)
# --validity : returns the date of the cached version, Null if no version
# --force-download : download and replace cache, even if valid
args = parser.parse_args()
param = {}
for u in args.url:
if args.offline:
path = get_cache_path(u)
elif args.ids:
ids = _get_site_ids(u)
else:
path, url = fetch(
u,
max_size=args.max_size,
timeout=args.timeout,
validity=args.cache_validity,
)
if args.path:
print(path)
elif args.ids:
print(ids)
else:
with open(path, "r") as f:
print(f.read())
f.close()
if __name__ == "__main__":
main()
|