File: app.py

package info (click to toggle)
pymongo 4.15.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 23,376 kB
  • sloc: python: 107,945; ansic: 4,601; javascript: 137; makefile: 38; sh: 10
file content (168 lines) | stat: -rw-r--r-- 4,458 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
Lambda function for Python Driver testing

Creates the client that is cached for all requests, subscribes to
relevant events, and forces the connection pool to get populated.
"""
from __future__ import annotations

import json
import os
import warnings

from bson import has_c as has_bson_c
from pymongo import MongoClient
from pymongo import has_c as has_pymongo_c
from pymongo.monitoring import (
    CommandListener,
    ConnectionPoolListener,
    ServerHeartbeatListener,
)

# Ensure there are no warnings raised in normal operation.
warnings.simplefilter("error")

open_connections = 0
heartbeat_count = 0
streaming_heartbeat_count = 0
total_heartbeat_duration = 0
total_commands = 0
total_command_duration = 0

# Ensure we are using C extensions
assert has_bson_c()
assert has_pymongo_c()


class CommandHandler(CommandListener):
    def started(self, event):
        print("command started", event)

    def succeeded(self, event):
        global total_commands, total_command_duration
        total_commands += 1
        total_command_duration += event.duration_micros / 1e6
        print("command succeeded", event)

    def failed(self, event):
        global total_commands, total_command_duration
        total_commands += 1
        total_command_duration += event.duration_micros / 1e6
        print("command failed", event)


class ServerHeartbeatHandler(ServerHeartbeatListener):
    def started(self, event):
        print("server heartbeat started", event)

    def succeeded(self, event):
        global heartbeat_count, total_heartbeat_duration, streaming_heartbeat_count
        heartbeat_count += 1
        total_heartbeat_duration += event.duration
        if event.awaited:
            streaming_heartbeat_count += 1
        print("server heartbeat succeeded", event)

    def failed(self, event):
        global heartbeat_count, total_heartbeat_duration
        heartbeat_count += 1
        total_heartbeat_duration += event.duration
        print("server heartbeat  failed", event)


class ConnectionHandler(ConnectionPoolListener):
    def connection_created(self, event):
        global open_connections
        open_connections += 1
        print("connection created")

    def connection_ready(self, event):
        pass

    def connection_closed(self, event):
        global open_connections
        open_connections -= 1
        print("connection closed")

    def connection_check_out_started(self, event):
        pass

    def connection_check_out_failed(self, event):
        pass

    def connection_checked_out(self, event):
        pass

    def connection_checked_in(self, event):
        pass

    def pool_created(self, event):
        pass

    def pool_ready(self, event):
        pass

    def pool_cleared(self, event):
        pass

    def pool_closed(self, event):
        pass


listeners = [CommandHandler(), ServerHeartbeatHandler(), ConnectionHandler()]
print("Creating client")
client = MongoClient(os.environ["MONGODB_URI"], event_listeners=listeners)


# Populate the connection pool.
print("Connecting")
client.lambdaTest.list_collections()
print("Connected")


# Create the response to send back.
def create_response():
    return dict(
        averageCommandDuration=total_command_duration / total_commands,
        averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count
        if heartbeat_count
        else 0,
        openConnections=open_connections,
        heartbeatCount=heartbeat_count,
    )


# Reset the numbers.
def reset():
    global \
        open_connections, \
        heartbeat_count, \
        total_heartbeat_duration, \
        total_commands, \
        total_command_duration
    open_connections = 0
    heartbeat_count = 0
    total_heartbeat_duration = 0
    total_commands = 0
    total_command_duration = 0


def lambda_handler(event, context):
    """
    The handler function itself performs an insert/delete and returns the
    id of the document in play.
    """
    print("initializing")
    db = client.lambdaTest
    collection = db.test
    result = collection.insert_one({"n": 1})
    collection.delete_one({"_id": result.inserted_id})
    # Create the response and then reset the numbers.
    response = json.dumps(create_response())
    reset()
    print("finished!")
    assert (
        streaming_heartbeat_count == 0
    ), f"streaming_heartbeat_count was {streaming_heartbeat_count} not 0"

    return dict(statusCode=200, body=response)