1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
|
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (C) 2009 Canonical
#
# Authors:
# Michael Vogt
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; version 3.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import os
from gi.repository import GObject
GObject.threads_init()
from gi.repository import GObject
import time
import threading
import logging
from softwarecenter.distro import get_distro
from launchpadlib.launchpad import Launchpad
from launchpadlib.credentials import RequestTokenAuthorizationEngine
from launchpadlib.uris import EDGE_SERVICE_ROOT
from softwarecenter.paths import SOFTWARE_CENTER_CACHE_DIR
# py3 compat
try:
from queue import Queue
Queue # pyflakes
except ImportError:
from Queue import Queue
from login import LoginBackend
# LP to use
SERVICE_ROOT = EDGE_SERVICE_ROOT
# internal
# the various states that the login can be in
LOGIN_STATE_UNKNOWN = "unkown"
LOGIN_STATE_ASK_USER_AND_PASS = "ask-user-and-pass"
LOGIN_STATE_HAS_USER_AND_PASS = "has-user-pass"
LOGIN_STATE_SUCCESS = "success"
LOGIN_STATE_SUCCESS_PENDING = "success-pending"
LOGIN_STATE_AUTH_FAILURE = "auth-fail"
LOGIN_STATE_USER_CANCEL = "user-cancel"
class UserCancelException(Exception):
""" user pressed cancel """
pass
class LaunchpadlibWorker(threading.Thread):
"""The launchpadlib worker thread - it does not touch the UI
and only communicates via the following:
"login_state" - the current LOGIN_STATE_* value
To input reviews call "queue_review()"
When no longer needed, call "shutdown()"
"""
def __init__(self):
# init parent
threading.Thread.__init__(self)
# the current login state, this is used accross multiple threads
self.login_state = LOGIN_STATE_UNKNOWN
# the username/pw to use
self.login_username = ""
self.login_password = ""
self._launchpad = None
self._pending_requests = Queue()
self._shutdown = False
self._logger = logging.getLogger("softwarecenter.backend")
def run(self):
"""
Main thread run interface, logs into launchpad
"""
self._logger.debug("lp worker thread run")
# login
self._lp_login()
# loop
self._wait_for_commands()
def shutdown(self):
"""Request shutdown"""
self._shutdown = True
def queue_request(self, func, args, result_callback):
# FIXME: add support to pass strings instead of callable
self._pending_requests.put((func, args, result_callback))
def _wait_for_commands(self):
"""internal helper that waits for commands"""
while True:
while not self._pending_requests.empty():
self._logger.debug("found pending request")
(func, args, result_callback) = self._pending_requests.get()
# run func async
res = func(*args)
# provide result to the callback
result_callback(res)
self._pending_requests.task_done()
# wait a bit
time.sleep(0.1)
if (self._shutdown and
self._pending_requests.empty()):
return
def _lp_login(self, access_level=['READ_PRIVATE']):
""" internal LP login code """
self._logger.debug("lp_login")
# use cachedir
cachedir = SOFTWARE_CENTER_CACHE_DIR
if not os.path.exists(cachedir):
os.makedirs(cachedir)
# login into LP with GUI
try:
self._launchpad = Launchpad.login_with(
'software-center', SERVICE_ROOT, cachedir,
allow_access_levels = access_level,
authorizer_class=AuthorizeRequestTokenFromThread)
self.display_name = self._launchpad.me.display_name
except Exception as e:
if type(e) == UserCancelException:
return
self._logger.exception("Launchpad.login_with()")
# remove token on failure, it may be e.g. expired
# FIXME: store the token in a different place and to avoid
# having to use _get_paths()
(service_root, launchpadlib_dir, cache_path,
service_root_dir) = Launchpad._get_paths(SERVICE_ROOT, cachedir)
credentials_path = os.path.join(service_root_dir, 'credentials')
consumer_credentials_path = os.path.join(credentials_path, 'software-center')
# ---
if os.path.exists(consumer_credentials_path):
os.remove(consumer_credentials_path)
self._lp_login(access_level)
return
self.login_state = LOGIN_STATE_SUCCESS
self._logger.debug("/done %s" % self._launchpad)
class AuthorizeRequestTokenFromThread(RequestTokenAuthorizationEngine):
""" Internal helper that updates the login_state of
the modul global lp_worker_thread object
"""
def __init__ (self, *args, **kwargs):
super(AuthorizeRequestTokenFromThread, self).__init__(*args, **kwargs)
self._logger = logging.getLogger("softwarecenter.backend")
# we need this to give the engine a place to store the state
# for the UI
def __new__(cls, *args, **kwargs):
o = object.__new__(cls)
# keep the state here (the lp_worker_thead global to this module)
o.lp_worker = lp_worker_thread
return o
def input_username(self, cached_username, suggested_message):
self._logger.debug( "input_username: %s" %self.lp_worker.login_state)
# otherwise go into ASK state
if not self.lp_worker.login_state in (LOGIN_STATE_ASK_USER_AND_PASS,
LOGIN_STATE_AUTH_FAILURE,
LOGIN_STATE_USER_CANCEL):
self.lp_worker.login_state = LOGIN_STATE_ASK_USER_AND_PASS
# check if user canceled and if so just return ""
if self.lp_worker.login_state == LOGIN_STATE_USER_CANCEL:
raise UserCancelException
# wait for username to become available
while not self.lp_worker.login_state in (LOGIN_STATE_HAS_USER_AND_PASS,
LOGIN_STATE_USER_CANCEL):
time.sleep(0.2)
# note: returning None here make lplib open a registration page
# in the browser
return self.lp_worker.login_username
def input_password(self, suggested_message):
self._logger.debug( "Input password size %s" % len(self.lp_worker.login_password))
return self.lp_worker.login_password
def input_access_level(self, available_levels, suggested_message,
only_one_option=None):
"""Collect the desired level of access from the end-user."""
self._logger.debug("input_access_level")
return "WRITE_PUBLIC"
def startup(self, suggested_messages):
self._logger.debug("startup")
def authentication_failure(self, suggested_message):
"""The user entered invalid credentials."""
self._logger.debug("auth failure")
# ignore auth failures if the user canceled
if self.lp_worker.login_state == LOGIN_STATE_USER_CANCEL:
return
self.lp_worker.login_state = LOGIN_STATE_AUTH_FAILURE
def success(self, suggested_message):
"""The token was successfully authorized."""
self._logger.debug("success")
self.lp_worker.login_state = LOGIN_STATE_SUCCESS_PENDING
class GLaunchpad(LoginBackend):
""" A launchpad connection that uses GObject signals
for communication and async tasks
"""
NEW_ACCOUNT_URL = "https://login.launchpad.net/+standalone-login"
FORGOT_PASSWORD_URL = "https://login.launchpad.net/+standalone-login"
def __init__(self):
LoginBackend.__init__(self)
self.distro = get_distro()
self.oauth_token = None
def connect_to_server(self):
""" Connects to launchpad and emits one of:
- need-username-password (use enter_username_password() then)
- login-successful
- login-failed
"""
GObject.timeout_add(200, self._wait_for_login)
lp_worker_thread.start()
def shutdown(self):
""" shutdown the server connection thread """
lp_worker_thread.shutdown()
def enter_username_password(self, user, password):
"""
provider username and password, ususally used when the
need-username-password signal was send
"""
lp_worker_thread.login_username = user
lp_worker_thread.login_password = password
lp_worker_thread.login_state = LOGIN_STATE_HAS_USER_AND_PASS
def login(self, username=None, password=None):
if username and password:
self.enter_username_password(username, password)
else:
self.connect_to_server()
def cancel_login(self):
lp_worker_thread.login_state = LOGIN_STATE_USER_CANCEL
def get_subscribed_archives(self):
""" return list of sources.list entries """
urls = lp_worker_thread._launchpad.me.getArchiveSubscriptionURLs()
return self._format_archive_subscription_urls_as_deb_lines(urls)
def _format_archive_subscription_urls_as_deb_lines(self, urls):
deb_lines = ["deb %s %s main" % (url, self.distro.get_codename()) \
for url in urls]
return deb_lines
def get_subscribed_archives_async(self, callback):
""" get the available subscribed archives and run 'callback' when
they become availalbe
"""
def _result_cb(urls):
# format as deb lines
callback(self._format_archive_subscription_urls_as_deb_lines(urls))
#func = "me.getArchiveSubscriptionURLs"
func = lp_worker_thread._launchpad.me.getArchiveSubscriptionURLs
lp_worker_thread.queue_request(func, (), _result_cb)
def _wait_for_login(self):
state = lp_worker_thread.login_state
if state == LOGIN_STATE_AUTH_FAILURE:
self.emit("login-failed")
elif state == LOGIN_STATE_ASK_USER_AND_PASS:
self.emit("need-username-password")
elif state == LOGIN_STATE_SUCCESS:
self.emit("login-successful", self.oauth_token)
return False
elif state == LOGIN_STATE_USER_CANCEL:
return False
return True
# IMPORTANT: create one (module) global LP worker thread here
lp_worker_thread = LaunchpadlibWorker()
# daemon threads make it crash on cancel
lp_worker_thread.daemon = True
# test code
def _login_success(lp):
print ("success %s" % lp)
print(lp.get_subscribed_archives())
print(lp.get_subscribed_archives_async(_result_callback))
def _login_failed(lp):
print ("fail %s" % lp)
def _result_callback(result_list):
print("_result_callback %s" % result_list)
def _login_need_user_and_password(lp):
import sys
sys.stdout.write("user: ")
sys.stdout.flush()
user = sys.stdin.readline().strip()
sys.stdout.write("pass: ")
sys.stdout.flush()
password = sys.stdin.readline().strip()
lp.enter_username_password(user, password)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
lp = GLaunchpad()
lp.connect("login-successful", _login_success)
lp.connect("login-failed", _login_failed)
lp.connect("need-username-password", _login_need_user_and_password)
lp.connect_to_server()
# wait
try:
GObject.MainLoop().run()
except KeyboardInterrupt:
lp_worker_thread.shutdown()
|