File: customauthenticator.py

package info (click to toggle)
python-pyu2f 0.1.5-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 264 kB
  • sloc: python: 1,421; makefile: 3
file content (243 lines) | stat: -rw-r--r-- 8,415 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Class to offload the end to end flow of U2F signing."""

import base64
import hashlib
import json
import os
import struct
import subprocess
import sys

from pyu2f import errors
from pyu2f import model
from pyu2f.convenience import baseauthenticator

SK_SIGNING_PLUGIN_ENV_VAR = 'SK_SIGNING_PLUGIN'
U2F_SIGNATURE_TIMEOUT_SECONDS = 5

SK_SIGNING_PLUGIN_NO_ERROR = 0
SK_SIGNING_PLUGIN_TOUCH_REQUIRED = 0x6985
SK_SIGNING_PLUGIN_WRONG_DATA = 0x6A80


class CustomAuthenticator(baseauthenticator.BaseAuthenticator):
  """Offloads U2F signing to a pluggable command-line tool.

  Offloads U2F signing to a signing plugin which takes the form of a
  command-line tool. The command-line tool is configurable via the
  SK_SIGNING_PLUGIN environment variable.

  The signing plugin should implement the following interface:

  Communication occurs over stdin/stdout, and messages are both sent and
  received in the form:

  [4 bytes - payload size (little-endian)][variable bytes - json payload]

  Signing Request JSON
  {
    "type": "sign_helper_request",
    "signData": [{
        "keyHandle": <url-safe base64-encoded key handle>,
        "appIdHash": <url-safe base64-encoded SHA-256 hash of application ID>,
        "challengeHash": <url-safe base64-encoded SHA-256 hash of ClientData>,
        "version": U2F protocol version (usually "U2F_V2")
        },...],
    "timeoutSeconds": <security key touch timeout>
  }

  Signing Response JSON
  {
    "type": "sign_helper_reply",
    "code": <result code>.
    "errorDetail": <text description of error>,
    "responseData": {
      "appIdHash": <url-safe base64-encoded SHA-256 hash of application ID>,
      "challengeHash": <url-safe base64-encoded SHA-256 hash of ClientData>,
      "keyHandle": <url-safe base64-encoded key handle>,
      "version": <U2F protocol version>,
      "signatureData": <url-safe base64-encoded signature>
    }
  }

  Possible response error codes are:

    NoError            = 0
    UnknownError       = -127
    TouchRequired      = 0x6985
    WrongData          = 0x6a80
  """

  def __init__(self, origin):
    self.origin = origin

  def Authenticate(self, app_id, challenge_data,
                   print_callback=sys.stderr.write):
    """See base class."""

    # Ensure environment variable is present
    plugin_cmd = os.environ.get(SK_SIGNING_PLUGIN_ENV_VAR)
    if plugin_cmd is None:
      raise errors.PluginError('{} env var is not set'
                               .format(SK_SIGNING_PLUGIN_ENV_VAR))

    # Prepare input to signer
    client_data_map, signing_input = self._BuildPluginRequest(
        app_id, challenge_data, self.origin)

    # Call plugin
    print_callback('Please insert and touch your security key\n')
    response = self._CallPlugin([plugin_cmd], signing_input)

    # Handle response
    key_challenge_pair = (response['keyHandle'], response['challengeHash'])
    client_data_json = client_data_map[key_challenge_pair]
    client_data = client_data_json.encode()
    return self._BuildAuthenticatorResponse(app_id, client_data, response)

  def IsAvailable(self):
    """See base class."""
    return os.environ.get(SK_SIGNING_PLUGIN_ENV_VAR) is not None

  def _BuildPluginRequest(self, app_id, challenge_data, origin):
    """Builds a JSON request in the form that the plugin expects."""
    client_data_map = {}
    encoded_challenges = []
    app_id_hash_encoded = self._Base64Encode(self._SHA256(app_id))
    for challenge_item in challenge_data:
      key = challenge_item['key']
      key_handle_encoded = self._Base64Encode(key.key_handle)

      raw_challenge = challenge_item['challenge']

      client_data_json = model.ClientData(
          model.ClientData.TYP_AUTHENTICATION,
          raw_challenge,
          origin).GetJson()

      challenge_hash_encoded = self._Base64Encode(
          self._SHA256(client_data_json))

      # Populate challenges list
      encoded_challenges.append({
          'appIdHash': app_id_hash_encoded,
          'challengeHash': challenge_hash_encoded,
          'keyHandle': key_handle_encoded,
          'version': key.version,
      })

      # Populate ClientData map
      key_challenge_pair = (key_handle_encoded, challenge_hash_encoded)
      client_data_map[key_challenge_pair] = client_data_json

    signing_request = {
        'type': 'sign_helper_request',
        'signData': encoded_challenges,
        'timeoutSeconds': U2F_SIGNATURE_TIMEOUT_SECONDS,
        'localAlways': True
    }

    return client_data_map, json.dumps(signing_request)

  def _BuildAuthenticatorResponse(self, app_id, client_data, plugin_response):
    """Builds the response to return to the caller."""
    encoded_client_data = self._Base64Encode(client_data)
    signature_data = str(plugin_response['signatureData'])
    key_handle = str(plugin_response['keyHandle'])

    response = {
        'clientData': encoded_client_data,
        'signatureData': signature_data,
        'applicationId': app_id,
        'keyHandle': key_handle,
    }
    return response

  def _CallPlugin(self, cmd, input_json):
    """Calls the plugin and validates the response."""
    # Calculate length of input
    input_length = len(input_json)
    length_bytes_le = struct.pack('<I', input_length)
    request = length_bytes_le + input_json.encode()

    # Call plugin
    sign_process = subprocess.Popen(cmd,
                                    stdin=subprocess.PIPE,
                                    stdout=subprocess.PIPE)

    stdout = sign_process.communicate(request)[0]
    exit_status = sign_process.wait()

    # Parse and validate response size
    response_len_le = stdout[:4]
    response_len = struct.unpack('<I', response_len_le)[0]
    response = stdout[4:]
    if response_len != len(response):
      raise errors.PluginError(
          'Plugin response length {} does not match data {} (exit_status={})'
          .format(response_len, len(response), exit_status))

    # Ensure valid json
    try:
      json_response = json.loads(response.decode())
    except ValueError:
      raise errors.PluginError('Plugin returned invalid output (exit_status={})'
                               .format(exit_status))

    # Ensure response type
    if json_response.get('type') != 'sign_helper_reply':
      raise errors.PluginError('Plugin returned invalid response type '
                               '(exit_status={})'
                               .format(exit_status))

    # Parse response codes
    result_code = json_response.get('code')
    if result_code is None:
      raise errors.PluginError('Plugin missing result code (exit_status={})'
                               .format(exit_status))

    # Handle errors
    if result_code == SK_SIGNING_PLUGIN_TOUCH_REQUIRED:
      raise errors.U2FError(errors.U2FError.TIMEOUT)
    elif result_code == SK_SIGNING_PLUGIN_WRONG_DATA:
      raise errors.U2FError(errors.U2FError.DEVICE_INELIGIBLE)
    elif result_code != SK_SIGNING_PLUGIN_NO_ERROR:
      raise errors.PluginError(
          'Plugin failed with error {} - {} (exit_status={})'
          .format(result_code,
                  json_response.get('errorDetail'),
                  exit_status))

    # Ensure response data is present
    response_data = json_response.get('responseData')
    if response_data is None:
      raise errors.PluginErrors(
          'Plugin returned output with missing responseData (exit_status={})'
          .format(exit_status))

    return response_data

  def _SHA256(self, string):
    """Helper method to perform SHA256."""
    md = hashlib.sha256()
    md.update(string.encode())
    return md.digest()

  def _Base64Encode(self, bytes_data):
      """Helper method to base64 encode, strip padding, and return str
      result."""
      return base64.urlsafe_b64encode(bytes_data).decode().rstrip('=')