File: routes.py

package info (click to toggle)
tryton-modules-inbound-email 7.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 436 kB
  • sloc: python: 804; xml: 219; makefile: 11; sh: 3
file content (38 lines) | stat: -rw-r--r-- 1,115 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# This file is part of Tryton.  The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.

import json

from trytond.protocols.wrappers import (
    HTTPStatus, Response, abort, with_pool, with_transaction)
from trytond.wsgi import app


@app.route(
    '/<database_name>/inbound_email/inbox/<identifier>', methods={'POST'})
@with_pool
@with_transaction()
def inbound_email(request, pool, identifier):
    Inbox = pool.get('inbound.email.inbox')
    Email = pool.get('inbound.email')

    try:
        inbox, = Inbox.search([
                ('identifier', '=', identifier),
                ])
    except ValueError:
        abort(HTTPStatus.NOT_FOUND)

    data_type = request.args.get('type', 'raw')

    if request.form:
        data = json.dumps(request.form.to_dict()).encode()
    else:
        data = request.data
    emails = Email.from_webhook(inbox, data, data_type)
    if not emails:
        abort(HTTPStatus.BAD_REQUEST)
    for email in emails:
        inbox.process(email)
    Email.save(emails)
    return Response(status=HTTPStatus.NO_CONTENT)