1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
# DistUpgradeFetcherCore.py
#
# Copyright (c) 2006 Canonical
#
# Author: Michael Vogt <michael.vogt@ubuntu.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
from string import Template
import os
import apt_pkg
import apt
import tarfile
import socket
import urlparse
import urllib2
import tempfile
import shutil
import sys
import GnuPGInterface
from gettext import gettext as _
from UpdateManager.Common.utils import country_mirror
class DistUpgradeFetcherCore(object):
" base class (without GUI) for the upgrade fetcher "
def __init__(self, new_dist, progress):
self.new_dist = new_dist
self._progress = progress
# options to pass to the release upgrader when it is run
self.run_options = []
def showReleaseNotes(self):
return True
def error(self, summary, message):
""" dummy implementation for error display, should be overwriten
by subclasses that want to more fancy method
"""
print summary
print message
return False
def authenticate(self):
if self.new_dist.upgradeToolSig:
f = self.tmpdir+"/"+os.path.basename(self.new_dist.upgradeTool)
sig = self.tmpdir+"/"+os.path.basename(self.new_dist.upgradeToolSig)
print "authenticate '%s' against '%s' " % (f,sig)
if not self.gpgauthenticate(f, sig):
return False
# we may return False here by default if we want to make a sig
# mandatory
return True
def gpgauthenticate(self, file, signature,
keyring='/etc/apt/trusted.gpg'):
""" authenticated a file against a given signature, if no keyring
is given use the apt default keyring
"""
gpg = GnuPGInterface.GnuPG()
gpg.options.extra_args = ['--no-options',
'--homedir',self.tmpdir,
'--no-default-keyring',
'--ignore-time-conflict',
'--keyring', keyring]
proc = gpg.run(['--verify', signature, file],
create_fhs=['status','logger','stderr'])
gpgres = proc.handles['status'].read()
try:
proc.wait()
except IOError,e:
# gnupg returned a problem (non-zero exit)
print "exception from gpg: %s" % e
print "Debug information: "
print proc.handles['status'].read()
print proc.handles['stderr'].read()
print proc.handles['logger'].read()
return False
if "VALIDSIG" in gpgres:
return True
print "invalid result from gpg:"
print gpgres
return False
def extractDistUpgrader(self):
# extract the tarbal
fname = os.path.join(self.tmpdir,os.path.basename(self.uri))
print "extracting '%s'" % fname
if not os.path.exists(fname):
return False
tar = tarfile.open(self.tmpdir+"/"+os.path.basename(self.uri),"r")
for tarinfo in tar:
tar.extract(tarinfo)
tar.close()
return True
def verifyDistUprader(self):
pass
# FIXME: check a internal dependency file to make sure
# that the script will run correctly
# see if we have a script file that we can run
self.script = script = "%s/%s" % (self.tmpdir, self.new_dist.name)
if not os.path.exists(script):
return error(_("Could not run the upgrade tool"),
_("This is most likely a bug in the upgrade tool. "
"Please report it as a bug"))
return True
def _expandUri(self, uri):
uri_template = Template(uri)
m = country_mirror()
new_uri = uri_template.safe_substitute(countrymirror=m)
# be paranoid and check if the given uri actually exists
host = urlparse.urlparse(new_uri)[1]
try:
socket.gethostbyname(host)
except socket.gaierror,e:
print >> sys.stderr, "host '%s' could not be resolved" % host
new_uri = uri_template.safe_substitute(countrymirror='')
return new_uri
def fetchDistUpgrader(self):
" download the tarball with the upgrade script "
self.tmpdir = tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
# turn debugging on here (if required)
#apt_pkg.Config.Set("Debug::Acquire::http","1")
fetcher = apt_pkg.GetAcquire(self._progress)
if self.new_dist.upgradeToolSig != None:
uri = self._expandUri(self.new_dist.upgradeToolSig)
af = apt_pkg.GetPkgAcqFile(fetcher,uri, descr=_("Upgrade tool signature"))
if self.new_dist.upgradeTool != None:
self.uri = self._expandUri(self.new_dist.upgradeTool)
af = apt_pkg.GetPkgAcqFile(fetcher,self.uri, descr=_("Upgrade tool"))
if fetcher.Run() != fetcher.ResultContinue:
return False
return True
return False
def runDistUpgrader(self):
#print "runing: %s" % script
if os.getuid() != 0:
os.execv("/usr/bin/sudo",["sudo",self.script])
else:
os.execv(self.script,[self.script]+self.run_options)
def cleanup(self):
# cleanup
os.chdir("..")
# del tmpdir
shutil.rmtree(self.tmpdir)
def run(self):
# see if we have release notes
if not self.showReleaseNotes():
return
if not self.fetchDistUpgrader():
self.error(_("Failed to fetch"),
_("Fetching the upgrade failed. There may be a network "
"problem. "))
return
if not self.extractDistUpgrader():
self.error(_("Failed to extract"),
_("Extracting the upgrade failed. There may be a problem "
"with the network or with the server. "))
return
if not self.verifyDistUprader():
self.error(_("Verfication failed"),
_("Verifying the upgrade failed. There may be a problem "
"with the network or with the server. "))
self.cleanup()
return
if not self.authenticate():
self.error(_("Authentication failed"),
_("Authenticating the upgrade failed. There may be a problem "
"with the network or with the server. "))
self.cleanup()
return
self.runDistUpgrader()
if __name__ == "__main__":
self.error("summary","message")
d = DistUpgradeFetcher(None,None)
print d.authenticate('/tmp/Release','/tmp/Release.gpg')
|