File: validate_pullrequest.py

package info (click to toggle)
cmake-format 0.6.13-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,436 kB
  • sloc: python: 16,990; makefile: 14
file content (179 lines) | stat: -rw-r--r-- 5,830 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import json
import logging
import os
import unittest
import subprocess
import sys
import tempfile


def get_repo_dir():
  """
  Return the path to the repository root
  """
  thisdir = os.path.dirname(os.path.realpath(__file__))
  parent, _ = os.path.split(thisdir)
  parent, _ = os.path.split(parent)
  return parent


class TestContribution(unittest.TestCase):
  """
  Verify that contributed code meets requirements
  """

  def __init__(self, *args, **kwargs):
    super(TestContribution, self).__init__(*args, **kwargs)
    self.merge_target = None
    self.merge_feature = None
    self.repodir = None

  def setUp(self):
    if os.environ.get("TRAVIS_PULL_REQUEST", "false") == "false":
      self.skipTest("Not a pull request")
      return

    self.repodir = get_repo_dir()

  def test_single_commit(self):
    """
    Ensure that the pull request is a single commit off of the target
    branch. If it is, then the merge base will be the parent of the feature
    commit.
    """
    repodir = get_repo_dir()
    feature_sha1 = os.environ.get("TRAVIS_PULL_REQUEST_SHA")
    feature_parent = subprocess.check_output(
        ["git", "show", "-s", "--pretty=%P", feature_sha1], cwd=repodir
    ).decode("utf-8").strip()
    self.assertEqual(1, len(feature_parent.split()))

    target_branch = os.environ.get("TRAVIS_BRANCH")
    subprocess.check_call(
        ["git", "fetch", "origin", target_branch],
        cwd=self.repodir)
    merge_base = subprocess.check_output(
        ["git", "merge-base", "FETCH_HEAD", feature_sha1],
        cwd=self.repodir).decode("utf-8").strip()
    self.assertEqual(feature_parent, merge_base)

  def test_merge_base_is_not_too_old(self):
    """
    When we merge a pull request we'll get something like this::

           x -----------
          /             \
   ... - A - o - o - o - B

    where x is the signed commit of the change to pull in. If the commit was
    created "a long time ago" (in a git-sense) then this makes the graph
    difficult to read as there will be lots of very long merge-lines. Therefore
    we require that pull requests be "close-enough" to the current HEAD.
    """
    repodir = get_repo_dir()
    feature_sha1 = os.environ.get("TRAVIS_PULL_REQUEST_SHA")
    target_branch = os.environ.get("TRAVIS_BRANCH")
    subprocess.check_call(
        ["git", "fetch", "origin", target_branch],
        cwd=self.repodir)
    merge_base = subprocess.check_output(
        ["git", "merge-base", "FETCH_HEAD", feature_sha1],
        cwd=self.repodir).decode("utf-8").strip()
    distance_str = subprocess.check_output(
        ["git", "rev-list", "--count", "--first-parent",
         "{}..{}".format(merge_base, "FETCH_HEAD")], cwd=repodir
    ).decode("utf-8")
    distance = int(distance_str)
    self.assertLess(distance, 10)

  def test_commit_is_signed(self):
    """
    Ensure that the feature commit is signed
    """
    repodir = get_repo_dir()
    feature_sha1 = os.environ.get("TRAVIS_PULL_REQUEST_SHA")
    signer_encoded = subprocess.check_output(
        ["git", "show", "-s", r'--pretty="%GK"', feature_sha1],
        cwd=repodir).decode("utf-8").strip().strip('"')

    # TODO(josh): deduplicate with validate_database
    # First, get a list of keys already in the local keyring
    known_fingerprints = set()
    proc = subprocess.Popen(
        ["gpg", "--fingerprint", "--with-colons"],
        stdout=subprocess.PIPE)
    for line in proc.stdout:
      parts = line.decode("utf-8").split(":")
      if parts[0] == "fpr":
        fingerprint = parts[-2]
        known_fingerprints.add(fingerprint[-16:])
    proc.wait()

    if signer_encoded not in known_fingerprints:
      # TODO(josh): use SKS pool instead of specific server
      result = subprocess.check_call(
          ["gpg", "--keyserver", "keyserver.ubuntu.com", "--recv-keys",
           signer_encoded])
      self.assertEqual(
          result, 0, msg="Failed to fetch signer key from keyserver")

    with tempfile.NamedTemporaryFile(delete=False) as stderr:
      result = subprocess.call(
          ["git", "verify-commit", feature_sha1],
          cwd=repodir, stderr=stderr)
      stderrpath = stderr.name
    with open(stderrpath, "r") as infile:
      stderrtext = infile.read()
    os.unlink(stderrpath)
    self.assertEqual(
        0, result,
        "git was unable to verify commit {}\n{}"
        .format(feature_sha1, stderrtext))

  def test_commit_signer_has_signed_ca(self):
    """
    Verify that whoever signed the commit has also signed the contributor
    agreement / copyright assignment.
    """
    if sys.version_info < (3, 0, 0):
      self.skipTest("no pgpy on this python version")
    import pgpy  # pylint: disable=import-error

    repodir = get_repo_dir()
    feature_sha1 = os.environ.get("TRAVIS_PULL_REQUEST_SHA")
    signer_encoded = subprocess.check_output(
        ["git", "show", "-s", r'--pretty="%GK"', feature_sha1],
        cwd=repodir).decode("utf-8").strip().strip('"')

    with open(os.path.join(
        repodir, "cmakelang/contrib/signature_db.json")) as infile:
      sigdb = json.load(infile)

    for sigentry in sigdb:
      clearsign = format_signature(sigentry["signature"])
      sig = pgpy.PGPSignature()
      sig.parse(clearsign)
      if sig.signer == signer_encoded:
        return

    self.fail("Could not find signer {} in signature database"
              .format(signer_encoded))


def format_signature(armored_packet):
  """
  Append the common that were stripped out of the signature when stored in the
  database.
  """
  return "\n".join(
      ["-----BEGIN PGP SIGNATURE-----", ""]
      + armored_packet
      + ["-----END PGP SIGNATURE-----"])


if __name__ == '__main__':
  logging.basicConfig(level=logging.DEBUG)
  unittest.main()