File: __init__.py

package info (click to toggle)
kopanocore 8.7.0-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 15,400 kB
  • sloc: cpp: 175,422; python: 24,623; perl: 7,319; php: 6,056; sh: 2,172; makefile: 1,294; xml: 45; ansic: 1
file content (122 lines) | stat: -rw-r--r-- 3,969 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/python3
# SPDX-License-Identifier: AGPL-3.0-or-later
from .version import __version__

import sys
import os
import time
import kopano
import grp
from kopano import Config, log_exc
from contextlib import closing

if sys.hexversion >= 0x03000000:
    import bsddb3 as bsddb
else: # pragma: no cover
    import bsddb

"""
kopano-spamd - ICS driven spam learning daemon for Kopano / SpamAssasin
"""

CONFIG = {
    'spam_dir': Config.string(default="/var/lib/kopano/spamd/spam"),
    'ham_dir': Config.string(default="/var/lib/kopano/spamd/ham"),
    'spam_db': Config.string(default="/var/lib/kopano/spamd/spam.db"),
    'sa_group': Config.string(default="amavis"),
    'run_as_user': Config.string(default="kopano"),
    'run_as_group': Config.string(default="kopano"),
    'learn_ham': Config.boolean(default=True),
    'header_tag': Config.string(default="x-spam-flag")
}


class Service(kopano.Service):
    def main(self):
        server = self.server
        state = server.state # start from current state
        importer = Importer(self)
        with log_exc(self.log):
            while True:
                state = server.sync(importer, state)
                time.sleep(1)


class Importer:
    def __init__(self, service):
        self.log = service.log
        self.spamdb = service.config['spam_db']
        self.spamdir = service.config['spam_dir']
        self.hamdir = service.config['ham_dir']
        self.sagroup = service.config['sa_group']
        self.learnham = service.config['learn_ham']
        self.headertag = service.config['header_tag'].lower()

    def mark_spam(self, searchkey):
        if not isinstance(searchkey, bytes): # python3
            searchkey = searchkey.encode('ascii')
        with closing(bsddb.btopen(self.spamdb, 'c')) as db:
            db[searchkey] = ''

    def was_spam(self, searchkey):
        if not isinstance(searchkey, bytes): # python3
            searchkey = searchkey.encode('ascii')
        with closing(bsddb.btopen(self.spamdb, 'c')) as db:
            return searchkey in db

    def update(self, item, flags):
        with log_exc(self.log):
            if item.message_class != 'IPM.Note': # TODO None?
                return

            searchkey = item.searchkey
            header = item.header(self.headertag)

            if (item.folder == item.store.junk and \
                (not header or header.upper() != 'YES')):

                fn = os.path.join(self.hamdir, searchkey + '.eml')
                if os.path.isfile(fn):
                    os.unlink(fn)

                self.log.info("Learning message as SPAM, entryid: %s", item.entryid)
                self.learn(item, searchkey, True)

            elif (item.folder == item.store.inbox and \
                  self.learnham and \
                  self.was_spam(searchkey)):

                fn = os.path.join(self.spamdir, searchkey + '.eml')
                if os.path.isfile(fn):
                    os.unlink(fn)

                self.log.info("Learning message as HAM, entryid: %s", item.entryid)
                self.learn(item, searchkey, False)

    def learn(self, item, searchkey, spam):
        spameml = item.eml()
        dir_ = self.spamdir if spam else self.hamdir
        emlfilename = os.path.join(dir_, searchkey + '.eml')

        with closing(open(emlfilename, "wb")) as fh:
            fh.write(spameml)

        uid = os.getuid()
        gid = grp.getgrnam(self.sagroup).gr_gid
        os.chown(emlfilename, uid, gid)
        os.chmod(emlfilename, 0o660)

        if spam:
            self.mark_spam(searchkey)

def main():
    parser = kopano.parser('ckpsFl')  # select common cmd-line options
    options = parser.parse_args()[0]
    service = Service('spamd', config=CONFIG, options=options)
    if service.config['learn_ham'] == True and not os.path.exists(service.config['ham_dir']):
        os.makedirs(service.config['ham_dir'])
    service.start()


if __name__ == '__main__':
    main()