File: unattended-upgrade

package info (click to toggle)
unattended-upgrades 0.2
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 60 kB
  • ctags: 6
  • sloc: python: 184; makefile: 57
file content (245 lines) | stat: -rwxr-xr-x 9,161 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#!/usr/bin/python2.4
#
# (c) 2005 Canonical
# Author: Michael Vogt <michael.vogt@ubuntu.com>
#
# Released under the GPL

import apt_pkg, apt_inst
import sys, os, string, datetime
from optparse import OptionParser
from subprocess import Popen, PIPE

import warnings
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
import apt
import logging

class MyCache(apt.Cache):
    def __init__(self):
        apt.Cache.__init__(self)
    def clear(self):
        self._depcache.Init()
        assert self._depcache.InstCount == 0 and \
               self._depcache.BrokenCount == 0 \
               and self._depcache.DelCount == 0
        

def is_allowed_origin(pkg, allowed_origins):
    for origin in pkg.candidateOrigin:
        for allowed in allowed_origins:
            if origin.origin == allowed[0] and origin.archive == allowed[1]:
                return True
    return False

def check_changes_for_sanity(cache, allowed_origins, blacklist):
    if cache._depcache.BrokenCount != 0:
        return False
    for pkg in cache:
        if pkg.markedDelete:
            return False
        if pkg.markedInstall or pkg.markedUpgrade:
            if not is_allowed_origin(pkg, allowed_origins):
                return False
            if pkg.name in blacklist:
                return False
    return True

def pkgname_from_deb(debfile):
    # FIXME: add error checking here
    control = apt_inst.debExtractControl(open(debfile))
    sections = apt_pkg.ParseSection(control)
    return sections["Package"]

def conffile_prompt(destFile):
    logging.debug("check_conffile_prompt('%s')" % destFile)
    pkgname = pkgname_from_deb(destFile)
    status_file = apt_pkg.Config.Find("Dir::State::status")
    parse = apt_pkg.ParseTagFile(open(status_file,"r"))
    while parse.Step() == 1:
        if parse.Section.get("Package") == pkgname:
            logging.debug("found pkg: %s" % pkgname)
            if parse.Section.has_key("Conffiles"):
                conffiles = parse.Section.get("Conffiles")
                # Conffiles:
                # /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c
                for line in string.split(conffiles,"\n"):
                    logging.debug("conffile line: %s", line)
                    l = string.split(string.strip(line))
                    file = l[0]
                    md5 = l[1]
                    if len(l) > 2:
                        obs = l[2]
                    else:
                        obs = None
                    if os.path.exists(file) and obs != "obsolete":
                        current_md5 = apt_pkg.md5sum(open(file).read())
                        if current_md5 != md5:
                            return True
    return False


if __name__ == "__main__":

    # init the logging
    logdir = apt_pkg.Config.FindDir("APT::UnattendedUpgrades::LogDir",
                                    "/var/log/unattended-upgrades/")
    logfile = logdir+apt_pkg.Config.Find("APT::UnattendedUpgrades::LogFile",
                                         "unattended-upgrades.log")
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s',
                        filename=logfile)

    # init the options
    parser = OptionParser()
    parser.add_option("-d", "--debug",
                      action="store_true", dest="debug", default=False,
                      help="print debug messages")
    (options, args) = parser.parse_args()
    if options.debug:
        logging.getLogger().setLevel(logging.DEBUG)
        pass

    #dldir = "/tmp/pyapt-test"
    #try:
    #    os.mkdir(dldir)
    #    os.mkdir(dldir+"/partial")
    #except OSError:
    #    pass
    #apt_pkg.Config.Set("Dir::Cache::archives",dldir)

    # format (origin, archive), e.g. ("Ubuntu","dapper-security")
    allowed_origins = map(string.split, apt_pkg.Config.ValueList("Unattended-Upgrade::Allowed-Origins"))

    # pkgs that are (for some reason) not save to install
    blacklisted_pkgs = apt_pkg.Config.ValueList("Unattended-Upgrade::Package-Blacklist")
    logging.info("Initial blacklisted packages: %s", "".join(blacklisted_pkgs))

    logging.info("Starting unattended upgrades script")

    # display available origin
    logging.info("Allowed origins are: %s" % map(str,allowed_origins))
    
    # get a cache
    cache = MyCache()

    # find out about the packages that are upgradable (in a allowed_origin)
    pkgs_to_upgrade = []
    for pkg in cache:
        if options.debug and pkg.isUpgradable:
            logging.debug("Checking: %s (%s)" % (pkg.name,map(str, pkg.candidateOrigin)))
        if pkg.isUpgradable and \
               is_allowed_origin(pkg,allowed_origins):
            try:
                pkg.markUpgrade()
                if check_changes_for_sanity(cache, allowed_origins,
                                            blacklisted_pkgs):
                    pkgs_to_upgrade.append(pkg)
            except SystemError:
                # can't upgrade
                pass
            else:
                cache.clear()
                for pkg2 in pkgs_to_upgrade:
                    pkg2.markUpgrade()

    pkgs = "\n".join([pkg.name for pkg in pkgs_to_upgrade])
    logging.debug("pkgs that look like they should be upgraded: %s" % pkgs)
           
    # download what looks good
    if options.debug:
        fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress())
    else:
        fetcher = apt_pkg.GetAcquire()
    list = apt_pkg.GetPkgSourceList()
    list.ReadMainList()
    recs = cache._records
    pm = apt_pkg.GetPackageManager(cache._depcache)
    pm.GetArchives(fetcher,list,recs)
    res = fetcher.Run()

    # now check the downloaded debs for conffile conflicts and build
    # a blacklist
    for item in fetcher.Items:
        logging.debug("%s" % item)
        if item.Status == item.StatError:
            print "A error ocured: '%s'" % item.ErrorText
        if item.Complete == False:
            print "The URI '%s' failed to download, aborting" % item.DescURI
            sys.exit(1)
        if item.IsTrusted == False:
            blacklisted_pkgs.append(pkgname_from_deb(item.DestFile))
        if conffile_prompt(item.DestFile):
            # FIXME: skip package (means to re-run the whole marking again
            # and making sure that the package will not be pulled in by
            # some other package again!
            logging.debug("pkg '%s' has conffile prompt" % pkgname_from_deb(item.DestFile))
            blacklisted_pkgs.append(pkgname_from_deb(item.DestFile))


    # redo the selection about the packages to upgrade based on the new
    # blacklist
    logging.debug("blacklist: %s" % blacklisted_pkgs)
    # find out about the packages that are upgradable (in a allowed_origin)
    if len(blacklisted_pkgs) > 0:
        cache.clear()
        old_pkgs_to_upgrade = pkgs_to_upgrade[:]
        pkgs_to_upgrade = []
        for pkg in old_pkgs_to_upgrade:
            logging.debug("Checking (blacklist): %s" % (pkg.name))
            pkg.markUpgrade()
            if check_changes_for_sanity(cache, allowed_origins,
                                        blacklisted_pkgs):
                 pkgs_to_upgrade.append(pkg)
            else:
                logging.info("package '%s' not upgraded" % pkg.name)
                cache.clear()
                for pkg2 in pkgs_to_upgrade:
                    pkg2.markUpgrade()

    logging.debug("InstCount=%i DelCount=%i BrokenCout=%i" % (cache._depcache.InstCount, cache._depcache.DelCount, cache._depcache.BrokenCount))

    # check what we have
    if len(pkgs_to_upgrade) == 0:
        logging.info("No packages found that can be upgraded unattended")
        sys.exit(0)    

    # do the install based on the new list of pkgs
    pkgs = " ".join([pkg.name for pkg in pkgs_to_upgrade])
    logging.info("Packages that are upgraded: %s" % pkgs)

    # set debconf to NON_INTERACTIVE, redirect output
    os.putenv("DEBIAN_FRONTEND","noninteractive");
    os.putenv("APT_LISTCHANGES_FRONTEND","none");
    
    # redirect to log
    REDIRECT_INPUT = os.devnull
    fd = os.open(REDIRECT_INPUT, os.O_RDWR)
    os.dup2(fd,0)

    now = datetime.datetime.now()
    logfile_dpkg = logdir+'unattended-upgrades-dpkg_%s.log' % now.isoformat('_')
    logging.info("Writing dpkg log to '%s'" % logfile_dpkg)
    fd = os.open(logfile_dpkg,os.O_RDWR|os.O_CREAT)
    os.dup2(fd,1)
    os.dup2(fd,2)

    # create a new package-manager. the blacklist may have changed
    # the markings in the depcache
    if options.debug:
        apt_pkg.Config.Set("Debug::pkgDPkgPM","1")
    #apt_pkg.Config.Set("Debug::pkgDPkgPM","1")    
    pm = apt_pkg.GetPackageManager(cache._depcache)
    pm.GetArchives(fetcher,list,recs)
    try:
        res = pm.DoInstall()
    except SystemError,e:
        logging.error("Installing the upgrades failed!")
        logging.error("error message: '%s'" % e)
        res = False
                
    if res == pm.ResultFailed:
        logging.error("dpkg returned a error! See '%s' for details" % logfile_dpkg)
    else:
        logging.info("All upgrades installed")