File: validate_database.py

package info (click to toggle)
cmake-format 0.6.13-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,436 kB
  • sloc: python: 16,990; makefile: 14
file content (175 lines) | stat: -rw-r--r-- 5,440 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import json
import logging
import os
import unittest
import shutil
import subprocess
import sys
import tempfile
import warnings

# NOTE(josh): FileNotFoundError doesn't exist in python2, so we'll define
# it to be the parent class.
try:
  FileNotFoundError
except NameError:
  # pylint: disable=W0622
  FileNotFoundError = IOError


def get_repo_dir():
  """
  Return the path to the repository root
  """
  thisdir = os.path.dirname(os.path.realpath(__file__))
  parent, _ = os.path.split(thisdir)
  parent, _ = os.path.split(parent)
  return parent


def format_signature(armored_packet):
  """
  Append the common that were stripped out of the signature when stored in the
  database.
  """
  return "\n".join(
      ["-----BEGIN PGP SIGNATURE-----", ""]
      + armored_packet
      + ["-----END PGP SIGNATURE-----"]) + "\n"


def construct_agreement_text(template, dataitem):
  """
  Re-construct the agreement text.
  """
  template = template.replace("{{signer_name}}", dataitem["name"])
  template = template.replace("{{signer_email}}", dataitem["email"])
  return template


class TestContributorAgreements(unittest.TestCase):
  """
  Validate the signature in the contributor database.
  """

  def __init__(self, *args, **kwargs):
    super(TestContributorAgreements, self).__init__(*args, **kwargs)
    self.homedir = None
    self.rmtrees = []

  def setUp(self):
    usual_homedir = os.path.expanduser("~/.gnupg")
    pubring_path = os.path.join(usual_homedir, "pubring.kbx")
    if os.path.exists(pubring_path):
      self.homedir = usual_homedir
    else:
      self.homedir = tempfile.mkdtemp(prefix="gpgtmp_")
      self.rmtrees.append(self.homedir)

  def tearDown(self):
    for dirpath in self.rmtrees:
      for _ in range(3):
        try:
          # NOTE(josh): for some reason we see
          # `FileNotFoundError:
          #   [Errno 2] No such file or directory: 'S.gpg-agent.browser'`
          # My guess is that the file is deleted after the directory scan but
          # before the unlink.
          shutil.rmtree(dirpath)
          break
        except FileNotFoundError:
          continue
      else:
        self.fail("FileNotFoundError after several retries")

  def test_signatures(self):
    """
    Iterate over signatures and verify them.
    """
    gpg_argv = ["gpg", "--homedir", self.homedir]

    if sys.version_info < (3, 0, 0):
      self.skipTest("no pgpy on this python version")

    # TODO(josh): For some reason importing pgpy seems to cause the
    # stderr filedescriptor to leak when we subprocess below. pgpy must be
    # doing some ugly subprocess thing on it's own
    warnings.simplefilter("ignore", ResourceWarning)

    # TODO(josh): pgpy seems to import distutils and the version distributed
    # with virtualenv on this system has an outdated import of `imp` instead
    # of `importlib`.
    with warnings.catch_warnings():
      warnings.simplefilter("ignore", DeprecationWarning)
      import pgpy  # pylint: disable=import-error

    repodir = get_repo_dir()
    with open(os.path.join(
        repodir, "cmakelang/contrib/signature_db.json")) as infile:
      sigdb = json.load(infile)

    # First, get a list of keys already in the local keyring
    known_fingerprints = set()
    proc = subprocess.Popen(
        gpg_argv + ["--fingerprint", "--with-colons"],
        stdout=subprocess.PIPE)
    for line in proc.stdout:
      parts = line.decode("utf-8").split(":")
      if parts[0] == "fpr":
        fingerprint = parts[-2]
        known_fingerprints.add(fingerprint[-16:])
    proc.wait()

    # Now do a scan through the database and get a list of any keys we are
    # missing
    needkeys = []
    for sigentry in sigdb:
      clearsign = format_signature(sigentry["signature"])
      sig = pgpy.PGPSignature()
      sig.parse(clearsign)
      if sig.signer not in known_fingerprints:
        needkeys.append(sig.signer)

    if needkeys:
      # TODO(josh): use SKS pool instead of specific server
      result = subprocess.check_call(
          gpg_argv + ["--keyserver", "keyserver.ubuntu.com", "--recv-keys"]
          + needkeys)
      self.assertEqual(
          result, 0, msg="Failed to fetch all keys from keyserver")

    with open(os.path.join(
        repodir, "cmakelang/contrib/individual_ca.txt")) as infile:
      template = infile.read()

    # Then verify all the signatures
    for sigentry in sigdb:
      clearsign = format_signature(sigentry["signature"])
      document = construct_agreement_text(template, sigentry)
      with tempfile.NamedTemporaryFile(delete=False) as outfile:
        outfile.write(clearsign.encode("utf-8"))
        detached_sig = outfile.name
      with tempfile.NamedTemporaryFile(delete=False) as outfile:
        outfile.write(document.encode("utf-8"))
        document_msg = outfile.name

      with open(os.devnull, "w") as devnull:
        proc = subprocess.Popen(
            gpg_argv + ["--verify", detached_sig, document_msg],
            stderr=devnull)
        result = proc.wait()
      self.assertEqual(
          0, result,
          msg="Failed to verify signature for {}\n\n"
              "See {} and {}".format(
                  sigentry["name"], detached_sig, document_msg))
      os.unlink(detached_sig)
      os.unlink(document_msg)


if __name__ == '__main__':
  logging.basicConfig(level=logging.DEBUG)
  unittest.main()