1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
"""
Lambda function for Python Driver testing
Creates the client that is cached for all requests, subscribes to
relevant events, and forces the connection pool to get populated.
"""
from __future__ import annotations
import json
import os
import warnings
from bson import has_c as has_bson_c
from pymongo import MongoClient
from pymongo import has_c as has_pymongo_c
from pymongo.monitoring import (
CommandListener,
ConnectionPoolListener,
ServerHeartbeatListener,
)
# Ensure there are no warnings raised in normal operation.
warnings.simplefilter("error")
open_connections = 0
heartbeat_count = 0
streaming_heartbeat_count = 0
total_heartbeat_duration = 0
total_commands = 0
total_command_duration = 0
# Ensure we are using C extensions
assert has_bson_c()
assert has_pymongo_c()
class CommandHandler(CommandListener):
def started(self, event):
print("command started", event)
def succeeded(self, event):
global total_commands, total_command_duration
total_commands += 1
total_command_duration += event.duration_micros / 1e6
print("command succeeded", event)
def failed(self, event):
global total_commands, total_command_duration
total_commands += 1
total_command_duration += event.duration_micros / 1e6
print("command failed", event)
class ServerHeartbeatHandler(ServerHeartbeatListener):
def started(self, event):
print("server heartbeat started", event)
def succeeded(self, event):
global heartbeat_count, total_heartbeat_duration, streaming_heartbeat_count
heartbeat_count += 1
total_heartbeat_duration += event.duration
if event.awaited:
streaming_heartbeat_count += 1
print("server heartbeat succeeded", event)
def failed(self, event):
global heartbeat_count, total_heartbeat_duration
heartbeat_count += 1
total_heartbeat_duration += event.duration
print("server heartbeat failed", event)
class ConnectionHandler(ConnectionPoolListener):
def connection_created(self, event):
global open_connections
open_connections += 1
print("connection created")
def connection_ready(self, event):
pass
def connection_closed(self, event):
global open_connections
open_connections -= 1
print("connection closed")
def connection_check_out_started(self, event):
pass
def connection_check_out_failed(self, event):
pass
def connection_checked_out(self, event):
pass
def connection_checked_in(self, event):
pass
def pool_created(self, event):
pass
def pool_ready(self, event):
pass
def pool_cleared(self, event):
pass
def pool_closed(self, event):
pass
listeners = [CommandHandler(), ServerHeartbeatHandler(), ConnectionHandler()]
print("Creating client")
client = MongoClient(os.environ["MONGODB_URI"], event_listeners=listeners)
# Populate the connection pool.
print("Connecting")
client.lambdaTest.list_collections()
print("Connected")
# Create the response to send back.
def create_response():
return dict(
averageCommandDuration=total_command_duration / total_commands,
averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count
if heartbeat_count
else 0,
openConnections=open_connections,
heartbeatCount=heartbeat_count,
)
# Reset the numbers.
def reset():
global \
open_connections, \
heartbeat_count, \
total_heartbeat_duration, \
total_commands, \
total_command_duration
open_connections = 0
heartbeat_count = 0
total_heartbeat_duration = 0
total_commands = 0
total_command_duration = 0
def lambda_handler(event, context):
"""
The handler function itself performs an insert/delete and returns the
id of the document in play.
"""
print("initializing")
db = client.lambdaTest
collection = db.test
result = collection.insert_one({"n": 1})
collection.delete_one({"_id": result.inserted_id})
# Create the response and then reset the numbers.
response = json.dumps(create_response())
reset()
print("finished!")
assert (
streaming_heartbeat_count == 0
), f"streaming_heartbeat_count was {streaming_heartbeat_count} not 0"
return dict(statusCode=200, body=response)
|