File: attest.py

package info (click to toggle)
b4 0.6.2-1
  • links: PTS
  • area: main
  • in suites: bullseye
  • size: 324 kB
  • sloc: python: 3,567; sh: 11; makefile: 6
file content (154 lines) | stat: -rw-r--r-- 5,343 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2020 by the Linux Foundation
#

import sys
import email
import email.utils
import email.message
import email.header
import b4
import argparse
import base64

logger = b4.logger


def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
    if lmsg.msg.get(b4.HDR_PATCH_HASHES):
        if not replace:
            logger.info(' attest: message already attested')
            return
        del lmsg.msg[b4.HDR_PATCH_HASHES]
        del lmsg.msg[b4.HDR_PATCH_SIG]

    logger.info(' attest: generating attestation hashes')
    if not lmsg.attestation:
        raise RuntimeError('Could not calculate patch attestation')

    headers = list()
    hparts = [
        'v=1',
        'h=sha256',
        f'i={lmsg.attestation.ib}',
        f'm={lmsg.attestation.mb}',
        f'p={lmsg.attestation.pb}',
    ]
    if lmsg.git_patch_id:
        hparts.append(f'g={lmsg.git_patch_id}')

    hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
    headers.append(f'{hhname}:{hhval}')

    logger.debug('Signing with mode=%s', mode)
    if mode == 'pgp':
        usercfg = b4.get_user_config()
        keyid = usercfg.get('signingkey')
        identity = usercfg.get('email')
        if not identity:
            raise RuntimeError('Please set user.email to use this feature')
        if not keyid:
            raise RuntimeError('Please set user.signingKey to use this feature')

        logger.debug('Using i=%s, s=0x%s', identity, keyid.rstrip('!'))
        gpgargs = ['-b', '-u', f'{keyid}']

        hparts = [
            'm=pgp',
            f'i={identity}',
            's=0x%s' % keyid.rstrip('!'),
            'b=',
        ]

        shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
        headers.append(f'{shname}:{shval}')
        payload = '\r\n'.join(headers).encode()
        ecode, out, err = b4.gpg_run_command(gpgargs, payload)
        if ecode > 0:
            logger.critical('Running gpg failed')
            logger.critical(err.decode())
            raise RuntimeError('Running gpg failed')
        bdata = base64.b64encode(out).decode()
        shval += header_splitter(bdata)
    else:
        raise NotImplementedError('Mode %s not implemented' % mode)

    hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
    shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
    lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
    lmsg.msg[b4.HDR_PATCH_SIG] = shdr


def header_splitter(longstr: str, limit: int = 77) -> str:
    splitstr = list()
    first = True
    while len(longstr) > limit:
        at = limit
        if first:
            first = False
            at -= 2
        splitstr.append(longstr[:at])
        longstr = longstr[at:]
    splitstr.append(longstr)
    return ' '.join(splitstr)


def attest_patches(cmdargs: argparse.Namespace) -> None:
    for pf in cmdargs.patchfile:
        with open(pf, 'rb') as fh:
            msg = email.message_from_bytes(fh.read())
        lmsg = b4.LoreMessage(msg)
        lmsg.load_hashes()
        if not lmsg.attestation:
            logger.debug('Nothing to attest in %s, skipped')
            continue
        logger.info('Attesting: %s', pf)
        in_header_attest(lmsg, replace=True)
        with open(pf, 'wb') as fh:
            fh.write(lmsg.msg.as_bytes())


def mutt_filter() -> None:
    if sys.stdin.isatty():
        logger.error('Error: Mutt mode expects a message on stdin')
        sys.exit(1)
    inb = sys.stdin.buffer.read()
    # Quick exit if we don't find x-patch-sig
    if inb.find(b'X-Patch-Sig:') < 0:
        sys.stdout.buffer.write(inb)
        return
    msg = email.message_from_bytes(inb)
    try:
        if msg.get('x-patch-sig'):
            lmsg = b4.LoreMessage(msg)
            lmsg.load_hashes()
            latt = lmsg.attestation
            if latt:
                if latt.validate(msg):
                    trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail)
                    msg.add_header('Attested-By', trailer)
                elif latt.lsig:
                    if not latt.lsig.errors:
                        failed = list()
                        if not latt.pv:
                            failed.append('patch content')
                        if not latt.mv:
                            failed.append('commit message')
                        if not latt.iv:
                            failed.append('patch metadata')
                        latt.lsig.errors.add('signature failed (%s)' % ', '.join(failed))
                    msg.add_header('Attestation-Failed', ', '.join(latt.lsig.errors))
            # Delete the x-patch-hashes and x-patch-sig headers so
            # they don't boggle up the view
            for i in reversed(range(len(msg._headers))):  # noqa
                hdrName = msg._headers[i][0].lower()  # noqa
                if hdrName in ('x-patch-hashes', 'x-patch-sig'):
                    del msg._headers[i]  # noqa
    except:  # noqa
        # Don't prevent email from being displayed even if we died horribly
        sys.stdout.buffer.write(inb)
        return

    sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy))