1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import contextlib
import os
import re
import textwrap
from marionette_driver.addons import Addons
from marionette_driver.errors import MarionetteException
from marionette_driver.wait import Wait
from marionette_harness import MarionetteTestCase
from marionette_harness.runner.mixins.window_manager import WindowManagerMixin
from telemetry_harness.ping_server import PingServer
CANARY_CLIENT_ID = "c0ffeec0-ffee-c0ff-eec0-ffeec0ffeec0"
UUID_PATTERN = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
)
class TelemetryTestCase(WindowManagerMixin, MarionetteTestCase):
def __init__(self, *args, **kwargs):
"""Initialize the test case and create a ping server."""
super(TelemetryTestCase, self).__init__(*args, **kwargs)
def setUp(self, *args, **kwargs):
"""Set up the test case and start the ping server."""
self.ping_server = PingServer(
self.testvars["server_root"], self.testvars["server_url"]
)
self.ping_server.start()
super(TelemetryTestCase, self).setUp(*args, **kwargs)
# Store IDs of addons installed via self.install_addon()
self.addon_ids = []
with self.marionette.using_context(self.marionette.CONTEXT_CONTENT):
self.marionette.navigate("about:about")
def disable_telemetry(self):
"""Disable the Firefox Data Collection and Use in the current browser."""
self.marionette.instance.profile.set_persistent_preferences(
{"datareporting.healthreport.uploadEnabled": False}
)
self.marionette.set_pref("datareporting.healthreport.uploadEnabled", False)
def enable_telemetry(self):
"""Enable the Firefox Data Collection and Use in the current browser."""
self.marionette.instance.profile.set_persistent_preferences(
{"datareporting.healthreport.uploadEnabled": True}
)
self.marionette.set_pref("datareporting.healthreport.uploadEnabled", True)
@contextlib.contextmanager
def new_tab(self):
"""Perform operations in a new tab and then close the new tab."""
with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
start_tab = self.marionette.current_window_handle
new_tab = self.open_tab(focus=True)
self.marionette.switch_to_window(new_tab)
yield
self.marionette.close()
self.marionette.switch_to_window(start_tab)
def navigate_in_new_tab(self, url):
"""Open a new tab and navigate to the provided URL."""
with self.new_tab():
with self.marionette.using_context(self.marionette.CONTEXT_CONTENT):
self.marionette.navigate(url)
def assertIsValidUUID(self, value):
"""Check if the given UUID is valid."""
self.assertIsNotNone(value)
self.assertNotEqual(value, "")
# Check for client ID that is used when Telemetry upload is disabled
self.assertNotEqual(value, CANARY_CLIENT_ID, msg="UUID is CANARY CLIENT ID")
self.assertIsNotNone(
re.match(UUID_PATTERN, value),
msg="UUID does not match regular expression",
)
def wait_for_pings(self, action_func, ping_filter, count, ping_server=None):
"""Call the given action and wait for pings to come in and return
the `count` number of pings, that match the given filter.
"""
if ping_server is None:
ping_server = self.ping_server
# Keep track of the current number of pings
current_num_pings = len(ping_server.pings)
# New list to store new pings that satisfy the filter
filtered_pings = []
def wait_func(*args, **kwargs):
# Ignore existing pings in ping_server.pings
new_pings = ping_server.pings[current_num_pings:]
# Filter pings to make sure we wait for the correct ping type
filtered_pings[:] = [p for p in new_pings if ping_filter(p)]
return len(filtered_pings) >= count
self.logger.info(
"wait_for_pings running action '{action}'.".format(
action=action_func.__name__
)
)
# Call given action and wait for a ping
action_func()
try:
Wait(self.marionette, 60).until(wait_func)
except Exception as e:
self.fail("Error waiting for ping: {}".format(e))
return filtered_pings[:count]
def wait_for_ping(self, action_func, ping_filter, ping_server=None):
"""Call wait_for_pings() with the given action_func and ping_filter and
return the first result.
"""
[ping] = self.wait_for_pings(
action_func, ping_filter, 1, ping_server=ping_server
)
return ping
def restart_browser(self):
"""Restarts browser while maintaining the same profile."""
return self.marionette.restart(clean=False, in_app=True)
def start_browser(self):
"""Start the browser."""
return self.marionette.start_session()
def quit_browser(self):
"""Quit the browser."""
return self.marionette.quit()
def install_addon(self):
"""Install a minimal addon."""
addon_name = "helloworld"
self._install_addon(addon_name)
def install_dynamic_addon(self):
"""Install a dynamic probe addon.
Source Code:
https://github.com/mozilla-extensions/dynamic-probe-telemetry-extension
"""
addon_name = "dynamic_addon/dynamic-probe-telemetry-extension-signed.xpi"
self._install_addon(addon_name, temp=False)
def _install_addon(self, addon_name, temp=True):
"""Logic to install addon and add its ID to self.addons.ids"""
resources_dir = os.path.join(os.path.dirname(__file__), "resources")
addon_path = os.path.abspath(os.path.join(resources_dir, addon_name))
try:
# Ensure the Environment has init'd so the installed addon
# triggers an "environment-change" ping.
script = """\
let [resolve] = arguments;
const { TelemetryEnvironment } = ChromeUtils.importESModule(
"resource://gre/modules/TelemetryEnvironment.sys.mjs"
);
TelemetryEnvironment.onInitialized().then(resolve);
"""
with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
self.marionette.execute_async_script(textwrap.dedent(script))
addons = Addons(self.marionette)
addon_id = addons.install(addon_path, temp=temp)
except MarionetteException as e:
self.fail("{} - Error installing addon: {} - ".format(e.cause, e))
else:
self.addon_ids.append(addon_id)
def set_persistent_profile_preferences(self, preferences):
"""Wrapper for setting persistent preferences on a user profile"""
return self.marionette.instance.profile.set_persistent_preferences(preferences)
def set_preferences(self, preferences):
"""Wrapper for setting persistent preferences on a user profile"""
return self.marionette.set_prefs(preferences)
@property
def client_id(self):
"""Return the ID of the current client."""
with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
return self.marionette.execute_script(
"""\
const { ClientID } = ChromeUtils.importESModule(
"resource://gre/modules/ClientID.sys.mjs"
);
return ClientID.getCachedClientID();
"""
)
@property
def subsession_id(self):
"""Return the ID of the current subsession."""
with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
ping_data = self.marionette.execute_script(
"""\
const { TelemetryController } = ChromeUtils.importESModule(
"resource://gre/modules/TelemetryController.sys.mjs"
);
return TelemetryController.getCurrentPingData(true);
"""
)
return ping_data["payload"]["info"]["subsessionId"]
def tearDown(self, *args, **kwargs):
"""Stop the ping server and tear down the testcase."""
super(TelemetryTestCase, self).tearDown()
self.ping_server.stop()
self.marionette.quit(in_app=False, clean=True)
|