1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
|
import datetime
from itertools import count
import os
import threading
import time
import urllib.parse
import pytest
import cherrypy
from cherrypy.lib import httputil
from cherrypy.test import helper
curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
gif_bytes = (
b'GIF89a\x01\x00\x01\x00\x82\x00\x01\x99"\x1e\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x02\x03\x02\x08\t\x00;'
)
class CacheTest(helper.CPWebCase):
@staticmethod
def setup_server():
@cherrypy.config(**{'tools.caching.on': True})
class Root:
def __init__(self):
self.counter = 0
self.control_counter = 0
self.longlock = threading.Lock()
@cherrypy.expose
def index(self):
self.counter += 1
msg = 'visit #%s' % self.counter
return msg
@cherrypy.expose
def control(self):
self.control_counter += 1
return 'visit #%s' % self.control_counter
@cherrypy.expose
def a_gif(self):
cherrypy.response.headers[
'Last-Modified'] = httputil.HTTPDate()
return gif_bytes
@cherrypy.expose
def long_process(self, seconds='1'):
try:
self.longlock.acquire()
time.sleep(float(seconds))
finally:
self.longlock.release()
return 'success!'
@cherrypy.expose
def clear_cache(self, path):
cherrypy._cache.store[cherrypy.request.base + path].clear()
@cherrypy.config(**{
'tools.caching.on': True,
'tools.response_headers.on': True,
'tools.response_headers.headers': [
('Vary', 'Our-Varying-Header')
],
})
class VaryHeaderCachingServer(object):
def __init__(self):
self.counter = count(1)
@cherrypy.expose
def index(self):
return 'visit #%s' % next(self.counter)
@cherrypy.config(**{
'tools.expires.on': True,
'tools.expires.secs': 60,
'tools.staticdir.on': True,
'tools.staticdir.dir': 'static',
'tools.staticdir.root': curdir,
})
class UnCached(object):
@cherrypy.expose
@cherrypy.config(**{'tools.expires.secs': 0})
def force(self):
cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
self._cp_config['tools.expires.force'] = True
self._cp_config['tools.expires.secs'] = 0
return 'being forceful'
@cherrypy.expose
def dynamic(self):
cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
cherrypy.response.headers['Cache-Control'] = 'private'
return 'D-d-d-dynamic!'
@cherrypy.expose
def cacheable(self):
cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
return "Hi, I'm cacheable."
@cherrypy.expose
@cherrypy.config(**{'tools.expires.secs': 86400})
def specific(self):
cherrypy.response.headers[
'Etag'] = 'need_this_to_make_me_cacheable'
return 'I am being specific'
class Foo(object):
pass
@cherrypy.expose
@cherrypy.config(**{'tools.expires.secs': Foo()})
def wrongtype(self):
cherrypy.response.headers[
'Etag'] = 'need_this_to_make_me_cacheable'
return 'Woops'
@cherrypy.config(**{
'tools.gzip.mime_types': ['text/*', 'image/*'],
'tools.caching.on': True,
'tools.staticdir.on': True,
'tools.staticdir.dir': 'static',
'tools.staticdir.root': curdir
})
class GzipStaticCache(object):
pass
cherrypy.tree.mount(Root())
cherrypy.tree.mount(UnCached(), '/expires')
cherrypy.tree.mount(VaryHeaderCachingServer(), '/varying_headers')
cherrypy.tree.mount(GzipStaticCache(), '/gzip_static_cache')
cherrypy.config.update({'tools.gzip.on': True})
def testCaching(self):
elapsed = 0.0
for trial in range(10):
self.getPage('/')
# The response should be the same every time,
# except for the Age response header.
self.assertBody('visit #1')
if trial != 0:
age = int(self.assertHeader('Age'))
assert age >= elapsed
elapsed = age
# POST, PUT, DELETE should not be cached.
self.getPage('/', method='POST')
self.assertBody('visit #2')
# Because gzip is turned on, the Vary header should always Vary for
# content-encoding
self.assertHeader('Vary', 'Accept-Encoding')
# The previous request should have invalidated the cache,
# so this request will recalc the response.
self.getPage('/', method='GET')
self.assertBody('visit #3')
# ...but this request should get the cached copy.
self.getPage('/', method='GET')
self.assertBody('visit #3')
self.getPage('/', method='DELETE')
self.assertBody('visit #4')
# The previous request should have invalidated the cache,
# so this request will recalc the response.
self.getPage('/', method='GET', headers=[('Accept-Encoding', 'gzip')])
self.assertHeader('Content-Encoding', 'gzip')
self.assertHeader('Vary')
self.assertEqual(
cherrypy.lib.encoding.decompress(self.body), b'visit #5')
# Now check that a second request gets the gzip header and gzipped body
# This also tests a bug in 3.0 to 3.0.2 whereby the cached, gzipped
# response body was being gzipped a second time.
self.getPage('/', method='GET', headers=[('Accept-Encoding', 'gzip')])
self.assertHeader('Content-Encoding', 'gzip')
self.assertEqual(
cherrypy.lib.encoding.decompress(self.body), b'visit #5')
# Now check that a third request that doesn't accept gzip
# skips the cache (because the 'Vary' header denies it).
self.getPage('/', method='GET')
self.assertNoHeader('Content-Encoding')
self.assertBody('visit #6')
def testVaryHeader(self):
self.getPage('/varying_headers/')
self.assertStatus('200 OK')
self.assertHeaderItemValue('Vary', 'Our-Varying-Header')
self.assertBody('visit #1')
# Now check that different 'Vary'-fields don't evict each other.
# This test creates 2 requests with different 'Our-Varying-Header'
# and then tests if the first one still exists.
self.getPage('/varying_headers/',
headers=[('Our-Varying-Header', 'request 2')])
self.assertStatus('200 OK')
self.assertBody('visit #2')
self.getPage('/varying_headers/',
headers=[('Our-Varying-Header', 'request 2')])
self.assertStatus('200 OK')
self.assertBody('visit #2')
self.getPage('/varying_headers/')
self.assertStatus('200 OK')
self.assertBody('visit #1')
def testExpiresTool(self):
# test setting an expires header
self.getPage('/expires/specific')
self.assertStatus('200 OK')
self.assertHeader('Expires')
# test exceptions for bad time values
self.getPage('/expires/wrongtype')
self.assertStatus(500)
self.assertInBody('TypeError')
# static content should not have "cache prevention" headers
self.getPage('/expires/index.html')
self.assertStatus('200 OK')
self.assertNoHeader('Pragma')
self.assertNoHeader('Cache-Control')
self.assertHeader('Expires')
# dynamic content that sets indicators should not have
# "cache prevention" headers
self.getPage('/expires/cacheable')
self.assertStatus('200 OK')
self.assertNoHeader('Pragma')
self.assertNoHeader('Cache-Control')
self.assertHeader('Expires')
self.getPage('/expires/dynamic')
self.assertBody('D-d-d-dynamic!')
# the Cache-Control header should be untouched
self.assertHeader('Cache-Control', 'private')
self.assertHeader('Expires')
# configure the tool to ignore indicators and replace existing headers
self.getPage('/expires/force')
self.assertStatus('200 OK')
# This also gives us a chance to test 0 expiry with no other headers
self.assertHeader('Pragma', 'no-cache')
if cherrypy.server.protocol_version == 'HTTP/1.1':
self.assertHeader('Cache-Control', 'no-cache, must-revalidate')
self.assertHeader('Expires', 'Sun, 28 Jan 2007 00:00:00 GMT')
# static content should now have "cache prevention" headers
self.getPage('/expires/index.html')
self.assertStatus('200 OK')
self.assertHeader('Pragma', 'no-cache')
if cherrypy.server.protocol_version == 'HTTP/1.1':
self.assertHeader('Cache-Control', 'no-cache, must-revalidate')
self.assertHeader('Expires', 'Sun, 28 Jan 2007 00:00:00 GMT')
# the cacheable handler should now have "cache prevention" headers
self.getPage('/expires/cacheable')
self.assertStatus('200 OK')
self.assertHeader('Pragma', 'no-cache')
if cherrypy.server.protocol_version == 'HTTP/1.1':
self.assertHeader('Cache-Control', 'no-cache, must-revalidate')
self.assertHeader('Expires', 'Sun, 28 Jan 2007 00:00:00 GMT')
self.getPage('/expires/dynamic')
self.assertBody('D-d-d-dynamic!')
# dynamic sets Cache-Control to private but it should be
# overwritten here ...
self.assertHeader('Pragma', 'no-cache')
if cherrypy.server.protocol_version == 'HTTP/1.1':
self.assertHeader('Cache-Control', 'no-cache, must-revalidate')
self.assertHeader('Expires', 'Sun, 28 Jan 2007 00:00:00 GMT')
def _assert_resp_len_and_enc_for_gzip(self, uri):
"""
Test that after querying gzipped content it's remains valid in
cache and available non-gzipped as well.
"""
ACCEPT_GZIP_HEADERS = [('Accept-Encoding', 'gzip')]
content_len = None
for _ in range(3):
self.getPage(uri, method='GET', headers=ACCEPT_GZIP_HEADERS)
if content_len is not None:
# all requests should get the same length
self.assertHeader('Content-Length', content_len)
self.assertHeader('Content-Encoding', 'gzip')
content_len = dict(self.headers)['Content-Length']
# check that we can still get non-gzipped version
self.getPage(uri, method='GET')
self.assertNoHeader('Content-Encoding')
# non-gzipped version should have a different content length
self.assertNoHeaderItemValue('Content-Length', content_len)
def testGzipStaticCache(self):
"""Test that cache and gzip tools play well together when both enabled.
Ref GitHub issue #1190.
"""
GZIP_STATIC_CACHE_TMPL = '/gzip_static_cache/{}'
resource_files = ('index.html', 'dirback.jpg')
for f in resource_files:
uri = GZIP_STATIC_CACHE_TMPL.format(f)
self._assert_resp_len_and_enc_for_gzip(uri)
def testLastModified(self):
self.getPage('/a.gif')
self.assertStatus(200)
self.assertBody(gif_bytes)
lm1 = self.assertHeader('Last-Modified')
# this request should get the cached copy.
self.getPage('/a.gif')
self.assertStatus(200)
self.assertBody(gif_bytes)
self.assertHeader('Age')
lm2 = self.assertHeader('Last-Modified')
self.assertEqual(lm1, lm2)
# this request should match the cached copy, but raise 304.
self.getPage('/a.gif', [('If-Modified-Since', lm1)])
self.assertStatus(304)
self.assertNoHeader('Last-Modified')
if not getattr(cherrypy.server, 'using_apache', False):
self.assertHeader('Age')
@pytest.mark.xfail(reason='#1536')
def disabled_test_antistampede(self):
SECONDS = 4
slow_url = '/long_process?seconds={SECONDS}'.format(**locals())
# We MUST make an initial synchronous request in order to create the
# AntiStampedeCache object, and populate its selecting_headers,
# before the actual stampede.
self.getPage(slow_url)
self.assertBody('success!')
path = urllib.parse.quote(slow_url, safe='')
self.getPage('/clear_cache?path=' + path)
self.assertStatus(200)
start = datetime.datetime.now()
def run():
self.getPage(slow_url)
# The response should be the same every time
self.assertBody('success!')
ts = [threading.Thread(target=run) for i in range(100)]
for t in ts:
t.start()
for t in ts:
t.join()
finish = datetime.datetime.now()
# Allow for overhead, two seconds for slow hosts
allowance = SECONDS + 2
self.assertEqualDates(start, finish, seconds=allowance)
def test_cache_control(self):
self.getPage('/control')
self.assertBody('visit #1')
self.getPage('/control')
self.assertBody('visit #1')
self.getPage('/control', headers=[('Cache-Control', 'no-cache')])
self.assertBody('visit #2')
self.getPage('/control')
self.assertBody('visit #2')
self.getPage('/control', headers=[('Pragma', 'no-cache')])
self.assertBody('visit #3')
self.getPage('/control')
self.assertBody('visit #3')
time.sleep(1)
self.getPage('/control', headers=[('Cache-Control', 'max-age=0')])
self.assertBody('visit #4')
self.getPage('/control')
self.assertBody('visit #4')
|