File: test_awslambda.py

package info (click to toggle)
python-boto 2.49.0-4.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 9,888 kB
  • sloc: python: 86,396; makefile: 112
file content (117 lines) | stat: -rw-r--r-- 4,303 bytes parent folder | download | duplicates (12)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright (c) 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
import tempfile
import shutil
import os
import socket

from boto.compat import json
from boto.awslambda.layer1 import AWSLambdaConnection
from tests.unit import AWSMockServiceTestCase
from tests.compat import mock


class TestAWSLambda(AWSMockServiceTestCase):
    connection_class = AWSLambdaConnection
    
    def default_body(self):
        return b'{}'

    def test_upload_function_binary(self):
        self.set_http_response(status_code=201)
        function_data = b'This is my file'
        self.service_connection.upload_function(
            function_name='my-function',
            function_zip=function_data,
            role='myrole',
            handler='myhandler',
            mode='event',
            runtime='nodejs'
        )
        self.assertEqual(self.actual_request.body, function_data)
        self.assertEqual(
            self.actual_request.headers['Content-Length'],
            str(len(function_data))
        )
        self.assertEqual(
            self.actual_request.path,
            '/2014-11-13/functions/my-function?Handler=myhandler&Mode'
            '=event&Role=myrole&Runtime=nodejs'
        )

    def test_upload_function_file(self):
        self.set_http_response(status_code=201)
        rootdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, rootdir)

        filename = 'test_file'
        function_data = b'This is my file'
        full_path = os.path.join(rootdir, filename)

        with open(full_path, 'wb') as f:
            f.write(function_data)

        with open(full_path, 'rb') as f:
            self.service_connection.upload_function(
                function_name='my-function',
                function_zip=f,
                role='myrole',
                handler='myhandler',
                mode='event',
                runtime='nodejs'
            )
            self.assertEqual(self.actual_request.body.read(),
                             function_data)
            self.assertEqual(
                self.actual_request.headers['Content-Length'],
                str(len(function_data))
            )
            self.assertEqual(
                self.actual_request.path,
                '/2014-11-13/functions/my-function?Handler=myhandler&Mode'
                '=event&Role=myrole&Runtime=nodejs'
            )

    def test_upload_function_unseekable_file_no_tell(self):
        sock = socket.socket()
        with self.assertRaises(TypeError):
            self.service_connection.upload_function(
                function_name='my-function',
                function_zip=sock,
                role='myrole',
                handler='myhandler',
                mode='event',
                runtime='nodejs'
            )

    def test_upload_function_unseekable_file_cannot_tell(self):
        mock_file = mock.Mock()
        mock_file.tell.side_effect = IOError
        with self.assertRaises(TypeError):
            self.service_connection.upload_function(
                function_name='my-function',
                function_zip=mock_file,
                role='myrole',
                handler='myhandler',
                mode='event',
                runtime='nodejs'
            )