1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
# call this via "python[3] script name"
import argparse
import json
import os
import zlib
import base64
import random
import time
try:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # Python 2
except ImportError:
from http.server import BaseHTTPRequestHandler, HTTPServer # Python 3
# Keep track of data received at each path
data = {}
metadata = {'posts': 0, 'fail_after': 0, 'fail_every': -1, 'decompress': False, 'userpwd': ''}
class MyHandler(BaseHTTPRequestHandler):
"""
POST'd data is kept in the data global dict.
Keys are the path, values are the raw received data.
Two post requests to <host>:<port>/post/endpoint means data looks like...
{"/post/endpoint": ["{\"msgnum\":\"00001\"}", "{\"msgnum\":\"00001\"}"]}
GET requests return all data posted to that endpoint as a json list.
Note that rsyslog usually sends escaped json data, so some parsing may be needed.
A get request for <host>:<post>/post/endpoint responds with...
["{\"msgnum\":\"00001\"}", "{\"msgnum\":\"00001\"}"]
"""
def validate_auth(self):
# header format for basic authentication
# 'Authorization: Basic <base 64 encoded uid:pwd>'
if 'Authorization' not in self.headers:
self.send_response(401)
self.end_headers()
self.wfile.write(b'missing "Authorization" header')
return False
auth_header = self.headers['Authorization']
_, b64userpwd = auth_header.split()
userpwd = base64.b64decode(b64userpwd).decode('utf-8')
if userpwd != metadata['userpwd']:
self.send_response(401)
self.end_headers()
self.wfile.write(b'invalid auth: {0}'.format(userpwd))
return False
return True
def do_POST(self):
metadata['posts'] += 1
if metadata['userpwd']:
if not self.validate_auth():
return
if metadata['fail_with_400_after'] != -1 and metadata['posts'] > metadata['fail_with_400_after']:
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
self.send_response(400)
self.end_headers()
self.wfile.write(b'BAD REQUEST')
return
if metadata['fail_with_401_or_403_after'] != -1 and metadata['posts'] > metadata['fail_with_401_or_403_after']:
status = random.choice([401, 403])
self.send_response(status)
self.end_headers()
self.wfile.write(b'BAD REQUEST')
return
if metadata['posts'] > 1 and metadata['fail_every'] != -1 and metadata['posts'] % metadata['fail_every'] == 0:
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
code = metadata['fail_with'] if metadata['fail_with'] else 500
self.send_response(code)
self.end_headers()
self.wfile.write(b'INTERNAL ERROR')
return
content_length = int(self.headers['Content-Length'] or 0)
raw_data = self.rfile.read(content_length)
if metadata['decompress']:
post_data = zlib.decompress(raw_data, 31)
else:
post_data = raw_data
if self.path not in data:
data[self.path] = []
data[self.path].append(post_data.decode('utf-8'))
res = json.dumps({'msg': 'ok'}).encode('utf8')
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(res))
self.end_headers()
self.wfile.write(res)
return
def do_GET(self):
if self.path in data:
result = data[self.path]
else:
result = []
res = json.dumps(result).encode('utf8')
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(res))
self.end_headers()
self.wfile.write(res)
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Archive and delete core app log files')
parser.add_argument('-p', '--port', action='store', type=int, default=8080, help='port')
parser.add_argument('--port-file', action='store', type=str, default='', help='file to store listen port number')
parser.add_argument('-i', '--interface', action='store', type=str, default='localhost', help='port')
parser.add_argument('--fail-after', action='store', type=int, default=0, help='start failing after n posts')
parser.add_argument('--fail-every', action='store', type=int, default=-1, help='fail every n posts')
parser.add_argument('--fail-with', action='store', type=int, default=500, help='on failure, fail with this code')
parser.add_argument('--fail-with-400-after', action='store', type=int, default=-1, help='fail with 400 after n posts')
parser.add_argument('--fail-with-401-or-403-after', action='store', type=int, default=-1, help='fail with 401 or 403 after n posts')
parser.add_argument('--fail-with-delay-secs', action='store', type=int, default=0, help='fail with n secs of delay')
parser.add_argument('--decompress', action='store_true', default=False, help='decompress posted data')
parser.add_argument('--userpwd', action='store', default='', help='only accept this user:password combination')
args = parser.parse_args()
metadata['fail_after'] = args.fail_after
metadata['fail_every'] = args.fail_every
metadata['fail_with'] = args.fail_with
metadata['fail_with_400_after'] = args.fail_with_400_after
metadata['fail_with_401_or_403_after'] = args.fail_with_401_or_403_after
metadata['fail_with_delay_secs'] = args.fail_with_delay_secs
metadata['decompress'] = args.decompress
metadata['userpwd'] = args.userpwd
server = HTTPServer((args.interface, args.port), MyHandler)
lstn_port = server.server_address[1]
pid = os.getpid()
print('starting omhttp test server at {interface}:{port} with pid {pid}'
.format(interface=args.interface, port=lstn_port, pid=pid))
if args.port_file != '':
f = open(args.port_file, "w")
f.write(str(lstn_port))
f.close()
server.serve_forever()
|