File: util.py

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (306 lines) | stat: -rw-r--r-- 10,876 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import platform
import re
import sys

from functools import reduce
from itertools import chain
from google.protobuf import text_format
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.message import Message
from pathlib import Path
from typing import NewType, Any, Optional, List, Iterable

UniqueId = NewType("UniqueId", str)
HashCode = NewType("HashCode", int)

# Configure logging with timestamp, log level, filename, and line number.
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s:%(levelname)s:%(filename)s(%(lineno)d)] %(message)s")
logger = logging.getLogger(__name__)


def import_compiled_proto(build_path) -> Any:
  """Global import from function. |self.build_path| is needed to perform
  this import, hence why it's not a top-level import.

  The compiled proto is located ${build_path}/pyproto/ and generated as a part
  of compiling Chrome."""
  # Use the build path to import the compiled traffic annotation proto.
  proto_path = build_path / "pyproto" / "chrome" / "browser" / "privacy"
  sys.path.insert(0, str(proto_path))

  try:
    global traffic_annotation_pb2
    global traffic_annotation
    import traffic_annotation_pb2
    # Used for accessing enum constants.
    from traffic_annotation_pb2 import NetworkTrafficAnnotation as \
      traffic_annotation
    return traffic_annotation_pb2
  except ImportError as e:
    logger.critical(
      "Failed to import the compiled traffic annotation proto. Make sure "
      "you're on Linux or Windows and Chrome is built in '{}' before "
      "running this script.".format(build_path))
    raise


def get_current_platform(build_path: Optional[Path] = None) -> str:
  """Return the target platform of |build_path| based on heuristics."""
  # Use host platform as the source of truth (in most cases).
  current_platform: str = platform.system().lower()

  if current_platform == "linux" and build_path is not None:
    # Other OS builds can be cross-compiled from Linux. Look for a
    # target_os="foo" line in args.gn.
    try:
      gn_args = (build_path / "args.gn").read_text(encoding="utf-8")
      pattern = re.compile(
          r"^\s*target_os\s*=\s*\"(android|chromeos|win)\"\s*$", re.MULTILINE)
      match = pattern.search(gn_args)
      if match:
        current_platform = match.group(1)
        if current_platform == "win":
          current_platform = "windows"

    except (ValueError, OSError) as e:
      logger.info(e)
      # Maybe the file's absent, or it can't be decoded as UTF-8, or something.
      # It's probably not Android/ChromeOS in that case.
      pass

  return current_platform


def twos_complement_8bit(b: int) -> int:
  """Interprets b like a signed 8-bit integer, possibly changing its sign.

  For instance, twos_complement_8bit(204) returns -52."""
  if b >= 256:
    raise ValueError("b must fit inside 8 bits")
  if b & (1 << 7):
    # Negative number, calculate its value using two's-complement.
    return b - (1 << 8)
  else:
    # Positive number, do not touch.
    return b


def iterative_hash(s: str) -> HashCode:
  """Compute the has code of the given string as in:
  net/traffic_annotation/network_traffic_annotation.h

  Args:
    s: str
      The seed, e.g. unique id of traffic annotation.
  Returns: int
    A hash code.
  """
  return HashCode(
      reduce(lambda acc, b: (acc * 31 + twos_complement_8bit(b)) % 138003713,
             s.encode("utf-8"), 0))


def compute_hash_value(text: str) -> HashCode:
  """Same as iterative_hash, but returns -1 for empty strings."""
  return iterative_hash(text) if text else HashCode(-1)


def merge_string_field(src: Message, dst: Message, field: str):
  """Merges the content of one string field into an annotation."""
  if getattr(src, field):
    if getattr(dst, field):
      setattr(dst, field, "{}\n{}".format(getattr(src, field),
                                          getattr(dst, field)))
    else:
      setattr(dst, field, getattr(src, field))


def fill_proto_with_bogus(unique_id: str, proto: Message,
                          field_numbers: List[int]):
  """Fill proto with bogus values for the fields identified by field_numbers.
  Uses reflection to fill the proto with the right types."""
  descriptor = proto.DESCRIPTOR
  for field_number in field_numbers:
    field_number = abs(field_number)

    if field_number not in descriptor.fields_by_number:
      raise ValueError("{} is not a valid {} field".format(
          field_number, descriptor.name))

    field = descriptor.fields_by_number[field_number]
    repeated = field.label == FieldDescriptor.LABEL_REPEATED

    if field.type == FieldDescriptor.TYPE_STRING and not repeated:
      setattr(proto, field.name, "[Archived]")
    elif field.type == FieldDescriptor.TYPE_STRING and repeated:
      getattr(proto, field.name).append("[Archived]")
    elif field.type == FieldDescriptor.TYPE_ENUM and not repeated:
      # Assume the 2nd value in the enum is reasonable, since the 1st is
      # UNSPECIFIED.
      setattr(proto, field.name, field.enum_type.values[1].number)
    elif field.type == FieldDescriptor.TYPE_MESSAGE and repeated:
      getattr(proto, field.name).add()
    elif field.type == FieldDescriptor.TYPE_MESSAGE:
      # Non-repeated message, nothing to do.
      pass
    else:
      raise NotImplementedError(
          "Unimplemented proto field {} of type {} ({}) in {}".format(
              field.name, field.type,
              "repeated" if repeated else "non-repeated", unique_id))


def extract_annotation_id(line: str) -> Optional[UniqueId]:
  """Returns the annotation id given an '<item id=...' line"""
  m = re.search('id="([^"]+)"', line)
  return UniqueId(m.group(1)) if m else None


def escape_for_tsv(text: str) -> str:
  """Changes double-quotes to single-quotes, and adds double-quotes around the
  text if it has newlines/tabs."""
  text.replace("\"", "'")
  if "\n" in text or "\t" in text:
    return "\"{}\"".format(text)
  return text


def policy_to_text(chrome_policy: Iterable[Message]) -> str:
  """Unnests the policy name/values from chrome_policy, producing a
  human-readable string.

  For example, this:
    chrome_policy {
      SyncDisabled {
        policy_options {
          mode: MANDATORY
        }
        SyncDisabled: true
      }
    }

  becomes this:
    SyncDisabled: true"""
  items = []
  # Use the protobuf serializer library to print the fields, 2 levels deep.
  for policy in chrome_policy:
    for field, value in policy.ListFields():
      for subfield, subvalue in value.ListFields():
        if subfield.name == "policy_options":
          # Skip the policy_options field.
          continue
        writer = text_format.TextWriter(as_utf8=True)
        if subfield.label == FieldDescriptor.LABEL_REPEATED:
          # text_format.PrintField needs repeated fields passed in
          # one-at-a-time.
          for repeated_value in subvalue:
            text_format.PrintField(subfield,
                                   repeated_value,
                                   writer,
                                   as_one_line=True,
                                   use_short_repeated_primitives=True)
        else:
          text_format.PrintField(subfield,
                                 subvalue,
                                 writer,
                                 as_one_line=True,
                                 use_short_repeated_primitives=True)
        items.append(writer.getvalue().strip())
  # We wrote an extra comma at the end, remove it before returning.
  return ", ".join(items)
  return re.sub(r", $", "", writer.getvalue()).strip()


def write_annotations_tsv_file(file_path: Path, annotations: List["Annotation"],
                               missing_ids: List[UniqueId]):
  """Writes a TSV file of all annotations and their contents in file_path."""
  logger.info("Saving annotations to TSV file: {}.".format(file_path))
  Destination = traffic_annotation.TrafficSemantics.Destination
  CookiesAllowed = traffic_annotation.TrafficPolicy.CookiesAllowed

  lines = []
  title = "Unique ID\tLast Update\tSender\tDescription\tTrigger\tData\t" + \
  "Destination\tCookies Allowed\tCookies Store\tSetting\tChrome Policy\t" + \
  "Comments\tSource File"

  column_count = title.count("\t")
  for missing_id in missing_ids:
    lines.append(missing_id + "\t" * column_count)

  for annotation in annotations:
    if annotation.type.value != "definition":
      continue

    # TODO(nicolaso): Use StringIO for faster concatenation.

    line = annotation.proto.unique_id
    # Placeholder for Last Update Date, will be updated in the scripts.
    line += "\t"

    # Semantics.
    semantics = annotation.proto.semantics
    semantics_list = [
        semantics.sender,
        escape_for_tsv(semantics.description),
        escape_for_tsv(semantics.trigger),
        escape_for_tsv(semantics.data),
    ]

    for semantic_info in semantics_list:
      line += "\t{}".format(semantic_info)

    destination_names = {
        Destination.WEBSITE: "Website",
        Destination.GOOGLE_OWNED_SERVICE: "Google",
        Destination.LOCAL: "Local",
        Destination.PROXIED_GOOGLE_OWNED_SERVICE: "Proxied to Google",
        Destination.OTHER: "Other",
    }
    if (semantics.destination == Destination.OTHER
        and semantics.destination_other):
      line += "\tOther: {}".format(semantics.destination_other)
    elif semantics.destination in destination_names:
      line += "\t{}".format(destination_names[semantics.destination])
    else:
      raise ValueError(
          "Invalid value for the semantics.destination field: {}".format(
              semantics.destination))

    # Policy.
    policy = annotation.proto.policy
    if annotation.proto.policy.cookies_allowed == CookiesAllowed.YES:
      line += "\tYes"
    else:
      line += "\tNo"

    line += "\t{}".format(escape_for_tsv(policy.cookies_store))
    line += "\t{}".format(escape_for_tsv(policy.setting))

    # Chrome policies.
    if annotation.has_policy():
      policies_text = policy_to_text(
          chain(policy.chrome_policy, policy.chrome_device_policy))
    else:
      policies_text = policy.policy_exception_justification
    line += "\t{}".format(escape_for_tsv(policies_text))

    # Comments.
    line += "\t{}".format(escape_for_tsv(annotation.proto.comments))
    # Source.
    code_search_link = "https://cs.chromium.org/chromium/src/"
    line += "\t{}{}?l={}".format(code_search_link, annotation.file.as_posix(),
                                 annotation.line)
    lines.append(line)

  lines.sort()
  lines.insert(0, title)
  report = "\n".join(lines) + "\n"

  file_path.write_text(report, encoding="utf-8")