File: event_service.py

package info (click to toggle)
python-openleadr-python 0.5.34%2Bdfsg.1-2
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,496 kB
  • sloc: python: 6,942; xml: 663; makefile: 32; sh: 18
file content (140 lines) | stat: -rw-r--r-- 6,972 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# SPDX-License-Identifier: Apache-2.0

# Copyright 2020 Contributors to OpenLEADR

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import service, handler, VTNService
import asyncio
from openleadr import utils, errors, enums
import logging
logger = logging.getLogger('openleadr')


@service('EiEvent')
class EventService(VTNService):

    def __init__(self, vtn_id, polling_method='internal'):
        super().__init__(vtn_id)
        self.polling_method = polling_method
        self.events = {}
        self.completed_event_ids = {}   # Holds the ids of completed events
        self.event_callbacks = {}
        self.event_opt_types = {}
        self.event_delivery_callbacks = {}

    @handler('oadrRequestEvent')
    async def request_event(self, payload):
        """
        The VEN requests us to send any events we have.
        """
        ven_id = payload['ven_id']
        if self.polling_method == 'internal':
            if ven_id in self.events and self.events[ven_id]:
                events = utils.order_events(self.events[ven_id])
                for event in events:
                    event_status = utils.getmember(event, 'event_descriptor.event_status')
                    # Pop the event from the events so that this is the last time it is communicated
                    if event_status == enums.EVENT_STATUS.COMPLETED:
                        if ven_id not in self.completed_event_ids:
                            self.completed_event_ids[ven_id] = []
                        event_id = utils.getmember(event, 'event_descriptor.event_id')
                        self.completed_event_ids[ven_id].append(event_id)
                        self.events[ven_id].pop(self.events[ven_id].index(event))
            else:
                events = None
        else:
            result = self.on_request_event(ven_id=payload['ven_id'])
            if asyncio.iscoroutine(result):
                result = await result
            if result is None:
                events = None
            else:
                events = utils.order_events(result)

        if events is None:
            return 'oadrResponse', {}
        else:
            # Fire the delivery callbacks, if any
            for event in events:
                event_id = utils.getmember(event, 'event_descriptor.event_id')
                if event_id in self.event_delivery_callbacks:
                    await utils.await_if_required(self.event_delivery_callbacks[event_id]())
            return 'oadrDistributeEvent', {'events': events}
        return 'oadrResponse', result

    def on_request_event(self, ven_id):
        """
        Placeholder for the on_request_event handler.
        """
        logger.warning("You should implement and register your own on_request_event handler "
                       "that returns the next event for a VEN. This handler will receive a "
                       "ven_id as its only argument, and should return None (if no events are "
                       "available), a single Event, or a list of Events.")
        return None

    @handler('oadrCreatedEvent')
    async def created_event(self, payload):
        """
        The VEN informs us that they created an EiEvent.
        """
        ven_id = payload['ven_id']
        if self.polling_method == 'internal':
            for event_response in payload['event_responses']:
                event_id = event_response['event_id']
                modification_number = event_response['modification_number']
                opt_type = event_response['opt_type']
                event = utils.find_by(self.events[ven_id],
                                      'event_descriptor.event_id', event_id,
                                      'event_descriptor.modification_number', modification_number)
                if not event:
                    if event_id not in self.completed_event_ids.get(ven_id, []):
                        logger.warning(f"""Got an oadrCreatedEvent message from ven '{ven_id}' """
                                       f"""for event '{event_id}' with modification number """
                                       f"""{modification_number} that does not exist.""")
                        raise errors.InvalidIdError
                # Remove the event from the events list if the cancellation is confirmed.
                if utils.getmember(event, 'event_descriptor.event_status') == enums.EVENT_STATUS.CANCELLED:
                    utils.pop_by(self.events[ven_id], 'event_descriptor.event_id', event_id)
                if event_response['event_id'] in self.event_callbacks:
                    event, callback = self.event_callbacks.pop(event_id)
                    if isinstance(callback, asyncio.Future):
                        if callback.done():
                            logger.warning(f"Got a second response '{opt_type}' from ven '{ven_id}' "
                                           f"to event '{event_id}', which we cannot use because the "
                                           "callback future you provided was already completed during "
                                           "the first response.")
                        else:
                            callback.set_result(opt_type)
                    else:
                        result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
                        if asyncio.iscoroutine(result):
                            result = await result
        else:
            for event_response in payload['event_responses']:
                event_id = event_response['event_id']
                opt_type = event_response['opt_type']
                result = await utils.await_if_required(self.on_created_event(ven_id=ven_id,
                                                                             event_id=event_id,
                                                                             opt_type=opt_type))
        return 'oadrResponse', {}

    def on_created_event(self, ven_id, event_id, opt_type):
        """
        Placeholder for the on_created_event handler.
        """
        logger.warning("You should implement and register you own on_created_event handler "
                       "to receive the opt status for an Event that you sent to the VEN. This "
                       "handler will receive a ven_id, event_id and opt_status. "
                       "You don't need to return anything from this handler.")
        return None