File: test-auth-server.py

package info (click to toggle)
gpodder 3.11.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,272 kB
  • sloc: python: 19,713; sh: 574; xml: 122; makefile: 112
file content (205 lines) | stat: -rwxr-xr-x 7,432 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Simple HTTP web server for testing HTTP Authentication (see bug 1539)
# from our crappy-but-does-the-job department
# Thomas Perl <thp.io/about>; 2012-01-20

import base64
import datetime
import hashlib
import http.server
import re
import sys
import threading
import time

USERNAME = 'user@example.com'    # Username used for HTTP Authentication
PASSWORD = 'secret'              # Password used for HTTP Authentication

HOST, PORT, RPORT = 'localhost', 8000, 8001   # Hostname and port for the HTTP server

# When the script contents change, the feed's episodes each get a new GUID
GUID = hashlib.sha1(open(__file__, mode='rb').read()).hexdigest()

URL = 'http://%(HOST)s:%(PORT)s' % locals()

FEEDNAME = sys.argv[0]        # The title of the RSS feed
REDIRECT = 'redirect.rss'     # The path for a redirection
REDIRECT_TO_BAD_HOST = 'redirect_bad'     # The path for a redirection
FEEDFILE = 'feed.rss'         # The "filename" of the feed on the server
EPISODES = 'episode'          # Base name for the episode files
TIMEOUT = 'timeout'           # The path to never return
EPISODES_EXT = '.mp3'         # Extension for the episode files
EPISODES_MIME = 'audio/mpeg'  # Mime type for the episode files
EP_COUNT = 7                  # Number of episodes in the feed
SIZE = 500000                 # Size (in bytes) of the episode downloads)


def mkpubdates(items):
    """Generate fake pubDates (one each day, recently)"""
    current = datetime.datetime.now() - datetime.timedelta(days=items + 3)
    for i in range(items):
        yield current.ctime()
        current += datetime.timedelta(days=1)


def mkrss(items=EP_COUNT):
    """Generate a dumm RSS feed with a given number of items"""
    ITEMS = '\n'.join("""
    <item>
        <title>Episode %(INDEX)s</title>
        <guid>tag:test.gpodder.org,2012:%(GUID)s,%(URL)s,%(INDEX)s</guid>
        <pubDate>%(PUBDATE)s</pubDate>
        <enclosure
          url="%(URL)s/%(EPISODES)s%(INDEX)s%(EPISODES_EXT)s"
          type="%(EPISODES_MIME)s"
          length="%(SIZE)s"/>
    </item>
    """ % dict(list(locals().items()) + list(globals().items()))
        for INDEX, PUBDATE in enumerate(mkpubdates(items)))
    ITEMS += """
    <item>
        <title>Missing Episode</title>
        <guid>tag:test.gpodder.org,2012:missing</guid>
        <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
        <enclosure
          url="%(URL)s/not_there%(EPISODES_EXT)s"
          type="%(EPISODES_MIME)s"
          length="%(SIZE)s"/>
    </item>""" % dict(list(locals().items()) + list(globals().items()))
    ITEMS += """
    <item>
        <title>Server Timeout Episode</title>
        <guid>tag:test.gpodder.org,2012:timeout</guid>
        <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
        <enclosure
          url="%(URL)s/%(TIMEOUT)s"
          type="%(EPISODES_MIME)s"
          length="%(SIZE)s"/>
    </item>""" % dict(list(locals().items()) + list(globals().items()))
    ITEMS += """
    <item>
        <title>Bad Host Episode</title>
        <guid>tag:test.gpodder.org,2012:timeout</guid>
        <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
        <enclosure
          url="%(URL)s/%(REDIRECT_TO_BAD_HOST)s"
          type="%(EPISODES_MIME)s"
          length="%(SIZE)s"/>
    </item>""" % dict(list(locals().items()) + list(globals().items()))
    ITEMS += """
    <item>
        <title>Space in url Episode</title>
        <guid>tag:test.gpodder.org,2012:timeout</guid>
        <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
        <enclosure
          url="%(URL)s/%(EPISODES)s with space%(EPISODES_EXT)s"
          type="%(EPISODES_MIME)s"
          length="%(SIZE)s"/>
    </item>""" % dict(list(locals().items()) + list(globals().items()))

    return """
    <rss>
    <channel><title>%(FEEDNAME)s</title><link>%(URL)s</link>
    %(ITEMS)s
    </channel>
    </rss>
    """ % dict(list(locals().items()) + list(globals().items()))


def mkdata(size=SIZE):
    """Generate dummy data of a given size (in bytes)"""
    return bytes([32 + (i % (127 - 32)) for i in range(size)])


class AuthRequestHandler(http.server.BaseHTTPRequestHandler):
    FEEDFILE_PATH = '/%s' % FEEDFILE
    EPISODES_PATH = '/%s' % EPISODES
    REDIRECT_PATH = '/%s' % REDIRECT
    REDIRECT_TO_BAD_HOST_PATH = '/%s' % REDIRECT_TO_BAD_HOST
    TIMEOUT_PATH = '/%s' % TIMEOUT

    def do_GET(self):
        authorized = False
        is_feed = False
        is_episode = False

        auth_header = self.headers.get('authorization', '')
        m = re.match(r'^Basic (.*)$', auth_header)
        if m is not None:
            auth_data = base64.b64decode(m.group(1)).decode().split(':', 1)
            if len(auth_data) == 2:
                username, password = auth_data
                print('Got username:', username)
                print('Got password:', password)
                if (username, password) == (USERNAME, PASSWORD):
                    print('Valid credentials provided.')
                    authorized = True

        if self.path == self.FEEDFILE_PATH:
            print('Feed request.')
            is_feed = True
        elif self.path.startswith(self.EPISODES_PATH):
            print('Episode request.')
            is_episode = True
        elif self.path == self.REDIRECT_PATH:
            print('Redirect request.')
            self.send_response(302)
            self.send_header('Location', '%s/%s' % (URL, FEEDFILE))
            self.end_headers()
            return
        elif self.path.startswith(self.REDIRECT_TO_BAD_HOST_PATH):
            print('Redirect request => bad host.')
            self.send_response(302)
            self.send_header('Location', '//notthere.gpodder.io/%s' % (FEEDFILE))
            self.end_headers()
            return
        elif self.path == self.TIMEOUT_PATH:
            # will need to restart the server or wait 80s before next request
            time.sleep(80)
            return

        if not authorized:
            print('Not authorized - sending WWW-Authenticate header.')
            self.send_response(401)
            self.send_header('WWW-Authenticate',
                             'Basic realm="%s"' % sys.argv[0])
            self.end_headers()
            return
        if not is_feed and not is_episode:
            print('Not there episode - sending 404.')
            self.send_response(404)
            self.end_headers()
            return

        self.send_response(200)
        self.send_header('Content-type',
                         'application/xml' if is_feed else 'audio/mpeg')
        self.end_headers()
        self.wfile.write(mkrss().encode('utf-8') if is_feed else mkdata())


def run(httpd):
    while True:
        httpd.handle_request()


if __name__ == '__main__':
    httpd = http.server.HTTPServer((HOST, PORT), AuthRequestHandler)
    print("""
    Feed URL: %(URL)s/%(FEEDFILE)s
    Redirect URL: http://%(HOST)s:%(RPORT)d/%(REDIRECT)s
    Timeout URL: %(URL)s/%(TIMEOUT)s
    Username: %(USERNAME)s
    Password: %(PASSWORD)s
    """ % locals())
    httpdr = http.server.HTTPServer((HOST, RPORT), AuthRequestHandler)
    t1 = threading.Thread(name='http', target=run, args=(httpd,), daemon=True)
    t1.start()
    t2 = threading.Thread(name='http redirect', target=run, args=(httpdr,), daemon=True)
    t2.start()
    try:
        t1.join()
        t2.join()
    except KeyboardInterrupt:
        pass