1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import json
import logging
import os
import unittest
import shutil
import subprocess
import sys
import tempfile
import warnings
# NOTE(josh): FileNotFoundError doesn't exist in python2, so we'll define
# it to be the parent class.
try:
FileNotFoundError
except NameError:
# pylint: disable=W0622
FileNotFoundError = IOError
def get_repo_dir():
"""
Return the path to the repository root
"""
thisdir = os.path.dirname(os.path.realpath(__file__))
parent, _ = os.path.split(thisdir)
parent, _ = os.path.split(parent)
return parent
def format_signature(armored_packet):
"""
Append the common that were stripped out of the signature when stored in the
database.
"""
return "\n".join(
["-----BEGIN PGP SIGNATURE-----", ""]
+ armored_packet
+ ["-----END PGP SIGNATURE-----"]) + "\n"
def construct_agreement_text(template, dataitem):
"""
Re-construct the agreement text.
"""
template = template.replace("{{signer_name}}", dataitem["name"])
template = template.replace("{{signer_email}}", dataitem["email"])
return template
class TestContributorAgreements(unittest.TestCase):
"""
Validate the signature in the contributor database.
"""
def __init__(self, *args, **kwargs):
super(TestContributorAgreements, self).__init__(*args, **kwargs)
self.homedir = None
self.rmtrees = []
def setUp(self):
usual_homedir = os.path.expanduser("~/.gnupg")
pubring_path = os.path.join(usual_homedir, "pubring.kbx")
if os.path.exists(pubring_path):
self.homedir = usual_homedir
else:
self.homedir = tempfile.mkdtemp(prefix="gpgtmp_")
self.rmtrees.append(self.homedir)
def tearDown(self):
for dirpath in self.rmtrees:
for _ in range(3):
try:
# NOTE(josh): for some reason we see
# `FileNotFoundError:
# [Errno 2] No such file or directory: 'S.gpg-agent.browser'`
# My guess is that the file is deleted after the directory scan but
# before the unlink.
shutil.rmtree(dirpath)
break
except FileNotFoundError:
continue
else:
self.fail("FileNotFoundError after several retries")
def test_signatures(self):
"""
Iterate over signatures and verify them.
"""
gpg_argv = ["gpg", "--homedir", self.homedir]
if sys.version_info < (3, 0, 0):
self.skipTest("no pgpy on this python version")
# TODO(josh): For some reason importing pgpy seems to cause the
# stderr filedescriptor to leak when we subprocess below. pgpy must be
# doing some ugly subprocess thing on it's own
warnings.simplefilter("ignore", ResourceWarning)
# TODO(josh): pgpy seems to import distutils and the version distributed
# with virtualenv on this system has an outdated import of `imp` instead
# of `importlib`.
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
import pgpy # pylint: disable=import-error
repodir = get_repo_dir()
with open(os.path.join(
repodir, "cmakelang/contrib/signature_db.json")) as infile:
sigdb = json.load(infile)
# First, get a list of keys already in the local keyring
known_fingerprints = set()
proc = subprocess.Popen(
gpg_argv + ["--fingerprint", "--with-colons"],
stdout=subprocess.PIPE)
for line in proc.stdout:
parts = line.decode("utf-8").split(":")
if parts[0] == "fpr":
fingerprint = parts[-2]
known_fingerprints.add(fingerprint[-16:])
proc.wait()
# Now do a scan through the database and get a list of any keys we are
# missing
needkeys = []
for sigentry in sigdb:
clearsign = format_signature(sigentry["signature"])
sig = pgpy.PGPSignature()
sig.parse(clearsign)
if sig.signer not in known_fingerprints:
needkeys.append(sig.signer)
if needkeys:
# TODO(josh): use SKS pool instead of specific server
result = subprocess.check_call(
gpg_argv + ["--keyserver", "keyserver.ubuntu.com", "--recv-keys"]
+ needkeys)
self.assertEqual(
result, 0, msg="Failed to fetch all keys from keyserver")
with open(os.path.join(
repodir, "cmakelang/contrib/individual_ca.txt")) as infile:
template = infile.read()
# Then verify all the signatures
for sigentry in sigdb:
clearsign = format_signature(sigentry["signature"])
document = construct_agreement_text(template, sigentry)
with tempfile.NamedTemporaryFile(delete=False) as outfile:
outfile.write(clearsign.encode("utf-8"))
detached_sig = outfile.name
with tempfile.NamedTemporaryFile(delete=False) as outfile:
outfile.write(document.encode("utf-8"))
document_msg = outfile.name
with open(os.devnull, "w") as devnull:
proc = subprocess.Popen(
gpg_argv + ["--verify", detached_sig, document_msg],
stderr=devnull)
result = proc.wait()
self.assertEqual(
0, result,
msg="Failed to verify signature for {}\n\n"
"See {} and {}".format(
sigentry["name"], detached_sig, document_msg))
os.unlink(detached_sig)
os.unlink(document_msg)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()
|