File: fledge_http_server_util.py

package info (click to toggle)
firefox 147.0.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,683,320 kB
  • sloc: cpp: 7,607,359; javascript: 6,533,295; ansic: 3,775,223; python: 1,415,500; xml: 634,561; asm: 438,949; java: 186,241; sh: 62,752; makefile: 18,079; objc: 13,092; perl: 12,808; yacc: 4,583; cs: 3,846; pascal: 3,448; lex: 1,720; ruby: 1,003; php: 436; lisp: 258; awk: 247; sql: 66; sed: 54; csh: 10; exp: 6
file content (194 lines) | stat: -rw-r--r-- 7,294 bytes parent folder | download | duplicates (12)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""Utility functions shared across multiple endpoints."""
from collections import namedtuple
from urllib.parse import unquote_plus, urlparse

def fail(response, body):
    """Sets up response to fail with the provided response body.

    Args:
      response: the wptserve Response that was passed to main
      body: the HTTP response body to use
    """
    response.status = (400, "Bad Request")
    response.headers.set(b"Content-Type", b"text/plain")
    response.content = body

def headers_to_ascii(headers):
  """Converts a header map with binary values to one with ASCII values.

  Takes a map of header names to list of values that are all binary strings
  and returns an otherwise identical map where keys and values have both been
  converted to ASCII strings.

  Args:
    headers: header map from binary key to binary value

  Returns header map from ASCII string key to ASCII string value
  """
  header_map = {}
  for pair in headers.items():
      values = []
      for value in pair[1]:
          values.append(value.decode("ASCII"))
      header_map[pair[0].decode("ASCII")] = values
  return header_map

def attach_origin_and_credentials_headers(request, response):
  """Attaches Access-Control-Allow-Origin and Access-Control-Allow-Credentials
  response headers to a response, if the request indicates they're needed.
  Only intended for internal use.

  Args:
    request: the wptserve Request that was passed to main
    response: the wptserve Response that was passed to main
  """
  if b"origin" in request.headers:
    response.headers.set(b"Access-Control-Allow-Origin",
                        request.headers.get(b"origin"))

  if b"credentials" in request.headers:
    response.headers.set(b"Access-Control-Allow-Credentials",
                        request.headers.get(b"credentials"))

def handle_cors_headers_fail_if_preflight(request, response):
  """Adds CORS headers if necessary. In the case of CORS preflights, generates
  a failure response. To be used when CORS preflights are not expected.

  Args:
    request: the wptserve Request that was passed to main
    response: the wptserve Response that was passed to main

  Returns True if the request is a CORS preflight, in which case the calling
  function should immediately return.
  """
  # Handle CORS preflight requests.
  if request.method == u"OPTIONS":
    fail(response, "CORS preflight unexpectedly received.")
    return True

  # Append CORS headers if needed
  attach_origin_and_credentials_headers(request, response)
  return False

def handle_cors_headers_and_preflight(request, response):
  """Applies CORS logic, either adding CORS headers to response or generating
  an entire response to preflights.

  Args:
    request: the wptserve Request that was passed to main
    response: the wptserve Response that was passed to main

  Returns True if the request is a CORS preflight, in which case the calling
  function should immediately return.
  """
  # Append CORS headers if needed
  attach_origin_and_credentials_headers(request, response)

  # Handle CORS preflight requests.
  if not request.method == u"OPTIONS":
    return False

  if not b"Access-Control-Request-Method" in request.headers:
    fail(response, "Failed to get access-control-request-method in preflight!")
    return True

  if not b"Access-Control-Request-Headers" in request.headers:
    fail(response, "Failed to get access-control-request-headers in preflight!")
    return True

  response.headers.set(b"Access-Control-Allow-Methods",
                        request.headers[b"Access-Control-Request-Method"])

  response.headers.set(b"Access-Control-Allow-Headers",
                        request.headers[b"Access-Control-Request-Headers"])

  response.status = (204, b"No Content")
  return True

def decode_trusted_scoring_signals_params(request):
  """Decodes query parameters to trusted query params handler.

  Args:
    request: the wptserve Request that was passed to main

  If successful, returns a named tuple TrustedScoringSignalsParams decoding the
  various expected query fields, as a hostname,  plus a field urlLists which is a list of
  {type: <render URL type>, urls: <render URL list>} pairs, where <render URL type> is
  one of the two render URL dictionary keys used in the response ("renderURLs" or
  "adComponentRenderURLs"). May be of length 1 or 2, depending on whether there
  are any component URLs.

  On failure, throws a ValueError with a message.
  """
  TrustedScoringSignalsParams = namedtuple(
      'TrustedScoringSignalsParams', ['hostname', 'urlLists'])

  hostname = None
  renderUrls = None
  adComponentRenderURLs = None
  urlLists = []

  # Manually parse query params. Can't use request.GET because it unescapes as well as splitting,
  # and commas mean very different things from escaped commas.
  for param in request.url_parts.query.split("&"):
      pair = param.split("=", 1)
      if len(pair) != 2:
          raise ValueError("Bad query parameter: " + param)
      # Browsers should escape query params consistently.
      if "%20" in pair[1]:
          raise ValueError("Query parameter should escape using '+': " + param)

      # Hostname can't be empty. The empty string can be a key or interest group name, though.
      if pair[0] == "hostname" and hostname == None and len(pair[1]) > 0:
          hostname = pair[1]
          continue
      if pair[0] == "renderUrls" and renderUrls == None:
          renderUrls = list(map(unquote_plus, pair[1].split(",")))
          urlLists.append({"type":"renderURLs", "urls":renderUrls})
          continue
      if pair[0] == "adComponentRenderUrls" and adComponentRenderURLs == None:
          adComponentRenderURLs = list(map(unquote_plus, pair[1].split(",")))
          urlLists.append({"type":"adComponentRenderURLs", "urls":adComponentRenderURLs})
          continue
      # Ignore the various creative scanning params; they're expected, but we
      # don't parse them here.
      if (pair[0] == 'adCreativeScanningMetadata' or
            pair[0] == 'adComponentCreativeScanningMetadata' or
            pair[0] == 'adSizes' or
            pair[0] == 'adComponentSizes' or
            pair[0] == 'adBuyer' or
            pair[0] == 'adComponentBuyer' or
            pair[0] == 'adBuyerAndSellerReportingIds'):
          continue
      raise ValueError("Unexpected query parameter: " + param)

  # "hostname" and "renderUrls" are mandatory.
  if not hostname:
      raise ValueError("hostname missing")
  if not renderUrls:
      raise ValueError("renderUrls missing")

  return TrustedScoringSignalsParams(hostname, urlLists)

def decode_render_url_signals_params(renderUrl):
  """Decodes signalsParams field encoded inside a renderURL.

  Args: renderUrl to extract signalsParams from.

  Returns an array of fields in signal params string.
  """
  signalsParams = None
  for param in urlparse(renderUrl).query.split("&"):
    pair = param.split("=", 1)
    if len(pair) != 2:
        continue
    if pair[0] == "signalsParams":
        if signalsParams != None:
            raise ValueError("renderUrl has multiple signalsParams: " + renderUrl)
        signalsParams = pair[1]

  if signalsParams is None:
    return []

  signalsParams = unquote_plus(signalsParams)
  return signalsParams.split(",")