File: modelworkflow.py

package info (click to toggle)
tryton-server 1.6.1-2%2Bsqueeze2
  • links: PTS, VCS
  • area: main
  • in suites: squeeze-lts
  • size: 2,440 kB
  • ctags: 3,017
  • sloc: python: 24,380; xml: 3,771; sql: 506; sh: 126; makefile: 74
file content (145 lines) | stat: -rw-r--r-- 4,906 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#This file is part of Tryton.  The COPYRIGHT file at the top level of
#this repository contains the full copyright notices and license terms.

from trytond.model import ModelStorage


class ModelWorkflow(ModelStorage):
    '''
    Define a model with a workflow in Tryton.
    '''

    def __init__(self):
        super(ModelWorkflow, self).__init__()
        self._rpc.update({
            'workflow_trigger_validate': True,
        })

    def create(self, cursor, user, values, context=None):
        res = super(ModelWorkflow, self).create(cursor, user, values,
                context=context)
        self.workflow_trigger_create(cursor, user, res, context=context)
        return res

    def write(self, cursor, user, ids, values, context=None):
        res = super(ModelWorkflow, self).write(cursor, user, ids, values,
                context=context)
        self.workflow_trigger_write(cursor, user, ids, context=context)
        return res

    def delete(self, cursor, user, ids, context=None):
        instance_obj = self.pool.get('workflow.instance')

        if isinstance(ids, (int, long)):
            ids = [ids]

        if instance_obj.search(cursor, 0, [
            ('res_id', 'in', ids),
            ('res_type', '=', self._name),
            ('state', '!=', 'complete'),
            ], context=context):
            self.raise_user_error(cursor, 'delete_workflow_record',
                    context=context)
        res = super(ModelWorkflow, self).delete(cursor, user, ids,
                context=context)
        self.workflow_trigger_delete(cursor, user, ids, context=context)
        return res

    def workflow_trigger_create(self, cursor, user, ids, context=None):
        '''
        Trigger create event

        :param cursor: the database cursor
        :param user: the user id
        :param ids: a list of id or an id
        :param context: the context
        '''
        workflow_obj = self.pool.get('workflow')
        instance_obj = self.pool.get('workflow.instance')

        if isinstance(ids, (int, long)):
            ids = [ids]

        workflow_ids = workflow_obj.search(cursor, 0, [
            ('model', '=', self._name),
            ('on_create', '=', True),
            ], context=context)
        for res_id in ids:
            for wkf_id in workflow_ids:
                instance_obj.create(cursor, user, {
                    'res_type': self._name,
                    'res_id': res_id,
                    'workflow': wkf_id,
                    'state': 'active',
                    }, context=context)


    def workflow_trigger_write(self, cursor, user, ids, context=None):
        '''
        Trigger write event

        :param cursor: the database cursor
        :param user: the user id
        :param ids: a list of id or an id
        :param context: the context
        '''
        instance_obj = self.pool.get('workflow.instance')

        if isinstance(ids, (int, long)):
            ids = [ids]

        instance_ids = instance_obj.search(cursor, 0, [
            ('res_id', 'in', ids),
            ('res_type', '=', self._name),
            ('state', '=', 'active'),
            ], context=context)
        for instance in instance_obj.browse(cursor, 0, instance_ids,
                context=context):
            instance_obj.update(cursor, user, instance, context=context)

    def workflow_trigger_validate(self, cursor, user, ids, signal,
            context=None):
        '''
        Trigger validate event

        :param cursor: the database cursor
        :param user: the user id
        :param ids: a list of id or an id
        :param context: the context
        '''
        instance_obj = self.pool.get('workflow.instance')

        if isinstance(ids, (int, long)):
            ids = [ids]

        instance_ids = instance_obj.search(cursor, 0, [
            ('res_id', 'in', ids),
            ('res_type', '=', self._name),
            ('state', '=', 'active'),
            ], context=context)
        for instance in instance_obj.browse(cursor, 0, instance_ids,
                context=context):
            instance_obj.validate(cursor, user, instance, signal=signal,
                    context=context)

    def workflow_trigger_delete(self, cursor, user, ids, context=None):
        '''
        Trigger delete event

        :param cursor: the database cursor
        :param user: the user id
        :param ids: a list of id or an id
        :param context: the context
        '''
        if self._name == 'workflow.instance':
            return
        instance_obj = self.pool.get('workflow.instance')

        if isinstance(ids, (int, long)):
            ids = [ids]

        instance_ids = instance_obj.search(cursor, 0, [
            ('res_id', 'in', ids),
            ('res_type', '=', self._name),
            ], context=context)
        instance_obj.delete(cursor, 0, instance_ids, context=context)