File: calendar.py

package info (click to toggle)
tryton-modules-calendar-classification 1.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 176 kB
  • ctags: 25
  • sloc: python: 225; makefile: 2
file content (157 lines) | stat: -rw-r--r-- 6,258 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#This file is part of Tryton.  The COPYRIGHT file at the top level of
#this repository contains the full copyright notices and license terms.
from trytond.model import ModelSQL
from trytond.tools import reduce_ids
import vobject


class Event(ModelSQL):
    _name = 'calendar.event'

    def __init__(self):
        super(Event, self).__init__()
        self._error_messages.update({
            'transparent': 'Free',
            'opaque': 'Busy',
            })

    def search(self, cursor, user, domain, offset=0, limit=None, order=None,
            context=None, count=False, query_string=False):
        if user:
            domain = domain[:]
            domain = [domain,
                    ['OR',
                        [
                            ('classification', '=', 'confidential'),
                            ['OR',
                                ('calendar.owner', '=', user),
                                ('calendar.write_users', '=', user),
                            ],
                        ],
                        ('classification', '!=', 'confidential'),
                    ]]
        return super(Event, self).search(cursor, user, domain, offset=offset,
                limit=limit, order=order, context=context, count=count,
                query_string=query_string)

    def create(self, cursor, user, values, context=None):
        new_id = super(Event, self).create(cursor, user, values, context=context)
        if self.search(cursor, user,
                [('id', '=', new_id)], context=context, count=True) != 1:
            self.raise_user_error(cursor, 'access_error',
                    self._description, context=context)
        return new_id

    def _clean_private(self, cursor, user, record, transp, context=None):
        '''
        Clean private record

        :param cursor: the database cursor
        :param user: the user id
        :param record: a dictionary with record values
        :param transp: the time transparency
        :param context: the context
        '''
        summary = self.raise_user_error(cursor, transp, raise_exception=False,
                context=context)
        if 'summary' in record:
            record['summary'] = summary

        vevent = None
        if 'vevent' in record:
            vevent = record['vevent']
            if vevent:
                vevent = vobject.readOne(vevent)
                if hasattr(vevent, 'summary'):
                    vevent.summary.value = summary

        for field, value in (
                ('description', ''),
                ('categories', []),
                ('location', False),
                ('status', ''),
                ('organizer', ''),
                ('attendees', []),
                ('alarms', [])):
            if field in record:
                record[field] = value
            if field + '.rec_name' in record:
                record[field + '.rec_name'] = ''
            if vevent:
                if hasattr(vevent, field):
                    delattr(vevent, field)
        if vevent:
            record['vevent'] = vevent.serialize()

    def read(self, cursor, user, ids, fields_names=None, context=None):
        rule_obj = self.pool.get('ir.rule')
        int_id = False
        if isinstance(ids, (int, long)):
            int_id = True
            ids = [ids]
        if len({}.fromkeys(ids)) != self.search(cursor, user,
                [('id', 'in', ids)], context=context, count=True):
            self.raise_user_error(cursor, 'access_error',
                    self._description, context=context)

        writable_ids = []
        domain1, domain2 = rule_obj.domain_get(cursor, user, self._name,
                mode='write', context=context)
        if domain1:
            for i in range(0, len(ids), cursor.IN_MAX):
                sub_ids = ids[i:i + cursor.IN_MAX]
                red_sql, red_ids = reduce_ids('id', sub_ids)
                cursor.execute('SELECT id FROM "' + self._table + '" ' \
                        'WHERE ' + red_sql + ' AND (' + domain1 + ')',
                        red_ids + domain2)
                writable_ids.extend(x[0] for x in cursor.fetchall())
        else:
            writable_ids = ids
        writable_ids = set(writable_ids)

        if fields_names is None:
            fields_names = []
        fields_names = fields_names[:]
        to_remove = set()
        for field in ('classification', 'calendar', 'transp'):
            if field not in fields_names:
                fields_names.append(field)
                to_remove.add(field)
        res = super(Event, self).read(cursor, user, ids,
                fields_names=fields_names, context=context)
        for record in res:
            if record['classification'] == 'private' \
                    and record['id'] not in writable_ids:
                self._clean_private(cursor, user, record, record['transp'],
                        context=context)
            for field in to_remove:
                del record[field]
        if int_id:
            return res[0]
        return res

    def write(self, cursor, user, ids, values, context=None):
        if isinstance(ids, (int, long)):
            ids = [ids]
        if len({}.fromkeys(ids)) != self.search(cursor, user,
                [('id', 'in', ids)], context=context, count=True):
            self.raise_user_error(cursor, 'access_error',
                    self._description, context=context)
        res = super(Event, self).write(cursor, user, ids, values,
                context=context)
        if len({}.fromkeys(ids)) != self.search(cursor, user,
                [('id', 'in', ids)], context=context, count=True):
            self.raise_user_error(cursor, 'access_error',
                    self._description, context=context)
        return res

    def delete(self, cursor, user, ids, context=None):
        if isinstance(ids, (int, long)):
            ids = [ids]
        if len({}.fromkeys(ids)) != self.search(cursor, user,
                [('id', 'in', ids)], context=context, count=True):
            self.raise_user_error(cursor, 'access_error',
                    self._description, context=context)
        return super(Event, self).delete(cursor, user, ids, context=context)

Event()