#!/usr/bin/env python

# These tests access the network.  python test.py runs a local test server and
# doesn't try to fetch anything over the internet, since the few tests here
# that do that are disabled by default since they have test tag "internet".

# thanks Moof (aka Giles Antonio Radford) for some of these

import errno
import os
import socket
import subprocess
import sys
import unittest
import urllib
import urllib2

import mechanize
from mechanize import CookieJar, HTTPCookieProcessor, \
     HTTPHandler, HTTPRefreshProcessor, \
     HTTPEquivProcessor, HTTPRedirectHandler, \
     HTTPRedirectDebugProcessor, HTTPResponseDebugProcessor
from mechanize._rfc3986 import urljoin
from mechanize._util import hide_experimental_warnings, \
    reset_experimental_warnings, write_file
import mechanize._opener
import mechanize._rfc3986
import mechanize._sockettimeout
import mechanize._testcase


#from cookielib import CookieJar
#from urllib2 import build_opener, install_opener, urlopen
#from urllib2 import HTTPCookieProcessor, HTTPHandler

#from mechanize import CreateBSDDBCookieJar

## import logging
## logger = logging.getLogger("mechanize")
## logger.addHandler(logging.StreamHandler(sys.stdout))
## #logger.setLevel(logging.DEBUG)
## logger.setLevel(logging.INFO)


class TestCase(mechanize._testcase.TestCase):

    # testprogram sets self.no_proxies on each TestCase to request explicitly
    # setting proxies so that http*_proxy environment variables are ignored

    def _configure_user_agent(self, ua):
        if self.no_proxies:
            ua.set_proxies({})

    def make_browser(self):
        browser = mechanize.Browser()
        self._configure_user_agent(browser)
        return browser

    def make_user_agent(self):
        ua = mechanize.UserAgent()
        self._configure_user_agent(ua)
        return ua

    def build_opener(self, handlers=(), build_opener=None):
        handlers += (mechanize.ProxyHandler(proxies={}),)
        if build_opener is None:
            build_opener = mechanize.build_opener
        return build_opener(*handlers)

    def setUp(self):
        mechanize._testcase.TestCase.setUp(self)
        self.test_uri = urljoin(self.uri, "test_fixtures")
        self.server = self.get_cached_fixture("server")
        if self.no_proxies:
            old_opener_m = mechanize._opener._opener
            old_opener_u = urllib2._opener
            mechanize.install_opener(mechanize.build_opener(
                    mechanize.ProxyHandler(proxies={})))
            urllib2.install_opener(urllib2.build_opener(
                    urllib2.ProxyHandler(proxies={})))
            def revert_install():
                mechanize.install_opener(old_opener_m)
                urllib2.install_opener(old_opener_u)
            self.add_teardown(revert_install)


def sanepathname2url(path):
    urlpath = urllib.pathname2url(path)
    if os.name == "nt" and urlpath.startswith("///"):
        urlpath = urlpath[2:]
    # XXX don't ask me about the mac...
    return urlpath


def read_file(filename):
    fh = open(filename)
    try:
        return fh.read()
    finally:
        fh.close()


class FtpTestCase(TestCase):

    def test_ftp(self):
        server = self.get_cached_fixture("ftp_server")
        browser = self.make_browser()
        path = self.make_temp_dir(dir_=server.root_path)
        file_path = os.path.join(path, "stuff")
        data = "data\nmore data"
        write_file(file_path, data)
        relative_path = os.path.join(os.path.basename(path), "stuff")
        r = browser.open("ftp://anon@localhost:%s/%s" %
                         (server.port, relative_path))
        self.assertEqual(r.read(), data)


class SocketTimeoutTest(TestCase):

    # the timeout tests in this module aren't full functional tests: in order
    # to speed things up, don't actually call .settimeout on the socket.  XXX
    # allow running the tests against a slow server with a real timeout

    def _monkey_patch_socket(self):
        class Delegator(object):
            def __init__(self, delegate):
                self._delegate = delegate
            def __getattr__(self, name):
                return getattr(self._delegate, name)

        assertEquals = self.assertEquals

        class TimeoutLog(object):
            AnyValue = object()
            def __init__(self):
                self._nr_sockets = 0
                self._timeouts = []
                self.start()
            def start(self):
                self._monitoring = True
            def stop(self):
                self._monitoring = False
            def socket_created(self):
                if self._monitoring:
                    self._nr_sockets += 1
            def settimeout_called(self, timeout):
                if self._monitoring:
                    self._timeouts.append(timeout)
            def verify(self, value=AnyValue):
                if sys.version_info[:2] < (2, 6):
                    # per-connection timeout not supported in Python 2.5
                    self.verify_default()
                else:
                    assertEquals(len(self._timeouts), self._nr_sockets)
                    if value is not self.AnyValue:
                        for timeout in self._timeouts:
                            assertEquals(timeout, value)
            def verify_default(self):
                assertEquals(len(self._timeouts), 0)

        log = TimeoutLog()
        def settimeout(timeout):
            log.settimeout_called(timeout)
        orig_socket = socket.socket
        def make_socket(*args, **kwds):
            sock = Delegator(orig_socket(*args, **kwds))
            log.socket_created()
            sock.settimeout = settimeout
            return sock
        self.monkey_patch(socket, "socket", make_socket)
        return log


class SimpleTests(SocketTimeoutTest):
    # thanks Moof (aka Giles Antonio Radford)

    def setUp(self):
        super(SimpleTests, self).setUp()
        self.browser = self.make_browser()

    def test_simple(self):
        self.browser.open(self.test_uri)
        self.assertEqual(self.browser.title(), 'Python bits')
        # relative URL
        self.browser.open('/mechanize/')
        self.assertEqual(self.browser.title(), 'mechanize')

    def test_basic_auth(self):
        uri = urljoin(self.uri, "basic_auth")
        self.assertRaises(mechanize.URLError, self.browser.open, uri)
        self.browser.add_password(uri, "john", "john")
        self.browser.open(uri)
        self.assertEqual(self.browser.title(), 'Basic Auth Protected Area')

    def test_digest_auth(self):
        uri = urljoin(self.uri, "digest_auth")
        self.assertRaises(mechanize.URLError, self.browser.open, uri)
        self.browser.add_password(uri, "digestuser", "digestuser")
        self.browser.open(uri)
        self.assertEqual(self.browser.title(), 'Digest Auth Protected Area')

    def test_open_with_default_timeout(self):
        timeout_log = self._monkey_patch_socket()
        self.browser.open(self.test_uri)
        self.assertEqual(self.browser.title(), 'Python bits')
        timeout_log.verify_default()

    def test_open_with_timeout(self):
        timeout_log = self._monkey_patch_socket()
        timeout = 10.
        self.browser.open(self.test_uri, timeout=timeout)
        self.assertEqual(self.browser.title(), 'Python bits')
        timeout_log.verify(timeout)

    def test_urlopen_with_default_timeout(self):
        timeout_log = self._monkey_patch_socket()
        response = mechanize.urlopen(self.test_uri)
        self.assert_contains(response.read(), "Python bits")
        timeout_log.verify_default()

    def test_urlopen_with_timeout(self):
        timeout_log = self._monkey_patch_socket()
        timeout = 10.
        response = mechanize.urlopen(self.test_uri, timeout=timeout)
        self.assert_contains(response.read(), "Python bits")
        timeout_log.verify(timeout)

    def test_redirect_with_timeout(self):
        timeout_log = self._monkey_patch_socket()
        timeout = 10.
        # 301 redirect due to missing final '/'
        req = mechanize.Request(urljoin(self.test_uri, "test_fixtures"),
                                timeout=timeout)
        r = self.browser.open(req)
        self.assert_("GeneralFAQ.html" in r.read(2048))
        timeout_log.verify(timeout)

    def test_302_and_404(self):
        # the combination of 302 and 404 (/redirected is configured to redirect
        # to a non-existent URL /nonexistent) has caused problems in the past
        # due to accidental double-wrapping of the error response
        self.assertRaises(
            mechanize.HTTPError,
            self.browser.open, urljoin(self.uri, "/redirected"),
            )

    def test_reread(self):
        # closing response shouldn't stop methods working (this happens also to
        # be true for e.g. mechanize.OpenerDirector when mechanize's own
        # handlers are in use, but is guaranteed to be true for
        # mechanize.Browser)
        r = self.browser.open(self.uri)
        data = r.read()
        r.close()
        r.seek(0)
        self.assertEqual(r.read(), data)
        self.assertEqual(self.browser.response().read(), data)

    def test_error_recovery(self):
        self.assertRaises(mechanize.URLError, self.browser.open,
                          'file:///c|thisnoexistyiufheiurgbueirgbue')
        self.browser.open(self.test_uri)
        self.assertEqual(self.browser.title(), 'Python bits')

    def test_redirect(self):
        # 301 redirect due to missing final '/'
        codes = []
        class ObservingHandler(mechanize.BaseHandler):
            def http_response(self, request, response):
                codes.append(response.code)
                return response
        self.browser.add_handler(ObservingHandler())
        r = self.browser.open(urljoin(self.uri, "test_fixtures"))
        self.assertEqual(r.code, 200)
        self.assertTrue(301 in codes)
        self.assert_("GeneralFAQ.html" in r.read(2048))

    def test_refresh(self):
        def refresh_request(seconds):
            uri = urljoin(self.uri, "/cgi-bin/cookietest.cgi")
            val = urllib.quote_plus('%d; url="%s"' % (seconds, self.uri))
            return uri + ("?refresh=%s" % val)
        self.browser.set_handle_refresh(True, honor_time=False)
        r = self.browser.open(refresh_request(5))
        self.assertEqual(r.geturl(), self.uri)
        # Set a maximum refresh time of 30 seconds (these long refreshes tend
        # to be there only because the website owner wants you to see the
        # latest news, or whatever -- they're not essential to the operation of
        # the site, and not really useful or appropriate when scraping).
        refresh_uri = refresh_request(60)
        self.browser.set_handle_refresh(True, max_time=30., honor_time=True)
        r = self.browser.open(refresh_uri)
        self.assertEqual(r.geturl(), refresh_uri)
        # allow long refreshes (but don't actually wait 60 seconds)
        self.browser.set_handle_refresh(True, max_time=None, honor_time=False)
        r = self.browser.open(refresh_request(60))
        self.assertEqual(r.geturl(), self.uri)

    def test_file_url(self):
        url = "file://%s" % sanepathname2url(
            os.path.abspath(os.path.join("test", "test_functional.py")))
        r = self.browser.open(url)
        self.assert_("this string appears in this file ;-)" in r.read())

    def test_open_local_file(self):
        # Since the file: URL scheme is not well standardised, Browser has a
        # special method to open files by name, for convenience:
        path = os.path.join("test", "test_functional.py")
        response = self.browser.open_local_file(path)
        self.assertIn("this string appears in this file ;-)",
                      response.get_data())

    def test_open_novisit(self):
        def test_state(br):
            self.assert_(br.request is None)
            self.assert_(br.response() is None)
            self.assertRaises(mechanize.BrowserStateError, br.back)
        test_state(self.browser)
        uri = urljoin(self.uri, "test_fixtures")
        # note this involves a redirect, which should itself be non-visiting
        r = self.browser.open_novisit(uri)
        test_state(self.browser)
        self.assert_("GeneralFAQ.html" in r.read(2048))

        # Request argument instead of URL
        r = self.browser.open_novisit(mechanize.Request(uri))
        test_state(self.browser)
        self.assert_("GeneralFAQ.html" in r.read(2048))

    def test_non_seekable(self):
        # check everything still works without response_seek_wrapper and
        # the .seek() method on response objects
        ua = self.make_user_agent()
        ua.set_seekable_responses(False)
        ua.set_handle_equiv(False)
        response = ua.open(self.test_uri)
        self.failIf(hasattr(response, "seek"))
        data = response.read()
        self.assert_("Python bits" in data)


class ResponseTests(TestCase):

    def test_seek(self):
        br = self.make_browser()
        r = br.open(self.uri)
        html = r.read()
        r.seek(0)
        self.assertEqual(r.read(), html)

    def test_seekable_response_opener(self):
        build_opener = mechanize.OpenerFactory(
            mechanize.SeekableResponseOpener).build_opener
        opener = self.build_opener(build_opener=build_opener)
        r = opener.open(urljoin(self.uri, "test_fixtures/cctest2.txt"))
        r.read()
        r.seek(0)
        self.assertEqual(r.read(),
                         r.get_data(),
                         "Hello ClientCookie functional test suite.\n")

    def test_seek_wrapper_class_name(self):
        opener = self.make_user_agent()
        opener.set_seekable_responses(True)
        try:
            opener.open(urljoin(self.uri, "nonexistent"))
        except mechanize.HTTPError, exc:
            self.assert_("HTTPError instance" in repr(exc))

    def test_no_seek(self):
        # should be possible to turn off UserAgent's .seek() functionality
        def check_no_seek(opener):
            r = opener.open(urljoin(self.uri, "test_fixtures/cctest2.txt"))
            self.assert_(not hasattr(r, "seek"))
            try:
                opener.open(urljoin(self.uri, "nonexistent"))
            except mechanize.HTTPError, exc:
                self.assert_(not hasattr(exc, "seek"))

        # mechanize.UserAgent
        opener = self.make_user_agent()
        opener.set_handle_equiv(False)
        opener.set_seekable_responses(False)
        opener.set_debug_http(False)
        check_no_seek(opener)

        # mechanize.OpenerDirector
        opener = self.build_opener()
        check_no_seek(opener)

    def test_consistent_seek(self):
        # if we explicitly request that returned response objects have the
        # .seek() method, then raised HTTPError exceptions should also have the
        # .seek() method
        def check(opener, excs_also):
            r = opener.open(urljoin(self.uri, "test_fixtures/cctest2.txt"))
            data = r.read()
            r.seek(0)
            self.assertEqual(data, r.read(), r.get_data())
            try:
                opener.open(urljoin(self.uri, "nonexistent"))
            except mechanize.HTTPError, exc:
                data = exc.read()
                if excs_also:
                    exc.seek(0)
                    self.assertEqual(data, exc.read(), exc.get_data())
            else:
                self.assert_(False)

        opener = self.make_user_agent()
        opener.set_debug_http(False)

        # Here, only the .set_handle_equiv() causes .seek() to be present, so
        # exceptions don't necessarily support the .seek() method (and do not,
        # at present).
        opener.set_handle_equiv(True)
        opener.set_seekable_responses(False)
        check(opener, excs_also=False)

        # Here, (only) the explicit .set_seekable_responses() causes .seek() to
        # be present (different mechanism from .set_handle_equiv()).  Since
        # there's an explicit request, ALL responses are seekable, even
        # exception responses (HTTPError instances).
        opener.set_handle_equiv(False)
        opener.set_seekable_responses(True)
        check(opener, excs_also=True)

    def test_set_response(self):
        br = self.make_browser()
        r = br.open(self.test_uri)
        html = r.read()
        self.assertEqual(br.title(), "Python bits")

        newhtml = """<html><body><a href="spam">click me</a></body></html>"""

        r.set_data(newhtml)
        self.assertEqual(r.read(), newhtml)
        self.assertEqual(br.response().read(), html)
        br.response().set_data(newhtml)
        self.assertEqual(br.response().read(), html)
        self.assertEqual(list(br.links())[0].url, "http://sourceforge.net/")

        br.set_response(r)
        self.assertEqual(br.response().read(), newhtml)
        self.assertEqual(list(br.links())[0].url, "spam")

    def test_new_response(self):
        br = self.make_browser()
        data = ("<html><head><title>Test</title></head>"
                "<body><p>Hello.</p></body></html>")
        response = mechanize.make_response(
            data,
            [("Content-type", "text/html")],
            "http://example.com/",
            200,
            "OK")
        br.set_response(response)
        self.assertEqual(br.response().get_data(), data)

    def hidden_test_close_pickle_load(self):
        print ("Test test_close_pickle_load is expected to fail unless Python "
               "standard library patch http://python.org/sf/1144636 has been "
               "applied")
        import pickle

        b = self.make_browser()
        r = b.open(urljoin(self.uri, "test_fixtures/cctest2.txt"))
        r.read()

        r.close()
        r.seek(0)
        self.assertEqual(r.read(),
                         "Hello ClientCookie functional test suite.\n")

        HIGHEST_PROTOCOL = -1
        p = pickle.dumps(b, HIGHEST_PROTOCOL)
        b = pickle.loads(p)
        r = b.response()
        r.seek(0)
        self.assertEqual(r.read(),
                         "Hello ClientCookie functional test suite.\n")


class FunctionalTests(SocketTimeoutTest):

    def test_referer(self):
        br = self.make_browser()
        br.set_handle_refresh(True, honor_time=False)
        referer = urljoin(self.uri, "test_fixtures/referertest.html")
        info = urljoin(self.uri, "/cgi-bin/cookietest.cgi")
        r = br.open(info)
        self.assert_(referer not in r.get_data())

        br.open(referer)
        r = br.follow_link(text="Here")
        self.assert_(referer in r.get_data())

    def test_cookies(self):
        # this test page depends on cookies, and an http-equiv refresh
        #cj = CreateBSDDBCookieJar("/home/john/db.db")
        cj = CookieJar()
        handlers = [
            HTTPCookieProcessor(cj),
            HTTPRefreshProcessor(max_time=None, honor_time=False),
            HTTPEquivProcessor(),

            HTTPRedirectHandler(),  # needed for Refresh handling in 2.4.0
#            HTTPHandler(True),
#            HTTPRedirectDebugProcessor(),
#            HTTPResponseDebugProcessor(),
            ]

        opener = self.build_opener(handlers)
        r = opener.open(urljoin(self.uri, "/cgi-bin/cookietest.cgi"))
        data = r.read()
        self.assert_(data.find("Your browser supports cookies!") >= 0)
        self.assertEquals(len(cj), 2)

        # test response.seek() (added by HTTPEquivProcessor)
        r.seek(0)
        samedata = r.read()
        r.close()
        self.assertEquals(samedata, data)

    def test_robots(self):
        plain_opener = self.build_opener(
            [mechanize.HTTPRobotRulesProcessor])
        browser = self.make_browser()
        for opener in plain_opener, browser:
            opener.open(urljoin(self.uri, "robots"))
            self.assertRaises(
                mechanize.RobotExclusionError,
                opener.open, urljoin(self.uri, "norobots"))

    def _check_retrieve(self, url, filename, headers):
        from urllib import urlopen
        self.assertEqual(headers.get('Content-Type'), 'text/html')
        if self.no_proxies:
            proxies = {}
        else:
            proxies = None
        self.assertEqual(read_file(filename),
                         urlopen(url, proxies=proxies).read())

    def test_retrieve_to_named_file(self):
        url = urljoin(self.uri, "/mechanize/")
        test_filename = os.path.join(self.make_temp_dir(), "python.html")
        opener = self.build_opener()
        verif = CallbackVerifier(self)
        filename, headers = opener.retrieve(url, test_filename, verif.callback)
        self.assertEqual(filename, test_filename)
        self._check_retrieve(url, filename, headers)
        self.assert_(os.path.isfile(filename))

    def test_retrieve(self):
        # not passing an explicit filename downloads to a temporary file
        # using a Request object instead of a URL works
        url = urljoin(self.uri, "/mechanize/")
        opener = self.build_opener()
        verif = CallbackVerifier(self)
        request = mechanize.Request(url)
        filename, headers = opener.retrieve(request, reporthook=verif.callback)
        self.assertEquals(request.visit, False)
        self._check_retrieve(url, filename, headers)
        opener.close()
        # closing the opener removed the temporary file
        self.failIf(os.path.isfile(filename))

    def test_urlretrieve(self):
        timeout_log = self._monkey_patch_socket()
        timeout = 10.
        url = urljoin(self.uri, "/mechanize/")
        verif = CallbackVerifier(self)
        filename, headers = mechanize.urlretrieve(url,
                                                  reporthook=verif.callback,
                                                  timeout=timeout)
        timeout_log.stop()
        self._check_retrieve(url, filename, headers)
        timeout_log.verify(timeout)

    def test_reload_read_incomplete(self):
        browser = self.make_browser()
        r1 = browser.open(urljoin(self.uri,
                                  "test_fixtures/mechanize_reload_test.html"))
        # if we don't do anything and go straight to another page, most of the
        # last page's response won't be .read()...
        r2 = browser.open(urljoin(self.uri, "mechanize"))
        self.assert_(len(r1.get_data()) < 4097)  # we only .read() a little bit
        # ...so if we then go back, .follow_link() for a link near the end (a
        # few kb in, past the point that always gets read in HTML files because
        # of HEAD parsing) will only work if it causes a .reload()...
        r3 = browser.back()
        browser.follow_link(text="near the end")
        # ... good, no LinkNotFoundError, so we did reload.
        # we have .read() the whole file
        self.assertEqual(len(r3._seek_wrapper__cache.getvalue()), 4202)

##     def test_cacheftp(self):
##         from mechanize import CacheFTPHandler, build_opener
##         o = build_opener(CacheFTPHandler())
##         r = o.open("ftp://ftp.python.org/pub/www.python.org/robots.txt")
##         data1 = r.read()
##         r.close()
##         r = o.open("ftp://ftp.python.org/pub/www.python.org/2.3.2/announce.txt")
##         data2 = r.read()
##         r.close()
##         self.assert_(data1 != data2)


class CommandFailedError(Exception):

    def __init__(self, message, rc):
        Exception.__init__(self, message)
        self.rc = rc


def get_cmd_stdout(args, **kwargs):
    process = subprocess.Popen(args, stdout=subprocess.PIPE, **kwargs)
    stdout, stderr = process.communicate()
    rc = process.returncode
    if rc != 0:
        raise CommandFailedError(
            "Command failed with return code %i: %s:\n%s" %
            (rc, args, stderr), rc)
    else:
        return stdout


class ExamplesTests(TestCase):

    tags = "internet"

    def check_download_script(self, name):
        python = sys.executable
        parent_dir = os.path.dirname(os.path.dirname(
                os.path.abspath(__file__)))
        temp_dir = self.make_temp_dir()
        get_cmd_stdout(
            [python, os.path.join(parent_dir, "examples", name)],
            cwd=temp_dir)
        [tarball] = os.listdir(temp_dir)
        self.assertTrue(tarball.endswith(".tar.gz"))

    def test_hack21(self):
        self.check_download_script("hack21.py")

    def test_pypi(self):
        self.check_download_script("pypi.py")


def add_to_path(env, name, value):
    old = env.get(name)
    if old is not None and old != "":
        value = old + ":" + value
    env[name] = value


class FormsExamplesTests(mechanize._testcase.GoldenTestCase):

    def check_forms_example(self, name, golden_path, fixup):
        self.get_cached_fixture("server")
        python = sys.executable
        this_dir = os.path.dirname(os.path.abspath(__file__))
        parent_dir = os.path.dirname(this_dir)
        forms_examples_dir = os.path.join(parent_dir, "examples", "forms")
        output_dir = self.make_temp_dir()
        env = os.environ.copy()
        add_to_path(env, "PYTHONPATH", parent_dir)
        output = get_cmd_stdout([python, name, self.uri],
                                env=env,
                                cwd=forms_examples_dir)
        output = fixup(output)
        write_file(os.path.join(output_dir, "output"), output)
        self.assert_golden(output_dir,
                           os.path.join(this_dir, golden_path))

    def test_simple(self):
        def fixup(output):
            return output.replace("POST %s" % self.uri.rstrip("/"),
                                  "POST http://127.0.0.1:8000")
        self.check_forms_example(
            "simple.py",
            os.path.join("functional_tests_golden",
                         "FormsExamplesTests.test_simple"),
            fixup)

    def test_example(self):
        def fixup(output):
            lines = [l for l in output.splitlines(True) if
                     not l.startswith("Vary:") and
                     not l.startswith("Server:") and
                     not l.startswith("Transfer-Encoding:") and
                     not l.startswith("Content-Length:")]
            output = "".join(lines)
            return output.replace(self.uri.rstrip("/"),
                                  "http://127.0.0.1:8000")
        self.check_forms_example(
            "example.py",
            os.path.join("functional_tests_golden",
                         "FormsExamplesTests.test_example"),
            fixup)


class CookieJarTests(TestCase):

    def _test_cookiejar(self, make_cookiejar, commit):
        cookiejar = make_cookiejar()
        br = self.make_browser()
        #br.set_debug_http(True)
        br.set_cookiejar(cookiejar)
        br.set_handle_refresh(False)
        url = urljoin(self.uri, "/cgi-bin/cookietest.cgi")
        # no cookie was set on the first request
        html = br.open(url).read()
        self.assertEquals(html.find("Your browser supports cookies!"), -1)
        self.assertEquals(len(cookiejar), 2)
        # ... but now we have the cookie
        html = br.open(url).read()
        self.assertIn("Your browser supports cookies!", html)
        self.assertIn("Received session cookie", html)
        commit(cookiejar)

        # should still have the cookie when we load afresh
        cookiejar = make_cookiejar()
        br.set_cookiejar(cookiejar)
        html = br.open(url).read()
        self.assertIn("Your browser supports cookies!", html)
        self.assertNotIn("Received session cookie", html)

    def test_mozilla_cookiejar(self):
        filename = os.path.join(self.make_temp_dir(), "cookies.txt")
        def make_cookiejar():
            cj = mechanize.MozillaCookieJar(filename=filename)
            try:
                cj.revert()
            except IOError, exc:
                if exc.errno != errno.ENOENT:
                    raise
            return cj
        def commit(cj):
            cj.save()
        self._test_cookiejar(make_cookiejar, commit)

    def test_firefox3_cookiejar(self):
        try:
            mechanize.Firefox3CookieJar
        except AttributeError:
            # firefox 3 cookiejar is only supported in Python 2.5 and later;
            # also, sqlite3 must be available
            raise unittest.SkipTest()

        filename = os.path.join(self.make_temp_dir(), "cookies.sqlite")
        def make_cookiejar():
            hide_experimental_warnings()
            try:
                return mechanize.Firefox3CookieJar(filename=filename)
            finally:
                reset_experimental_warnings()
        def commit(cj):
            pass
        self._test_cookiejar(make_cookiejar, commit)


class CallbackVerifier:
    # for .test_urlretrieve()
    def __init__(self, testcase):
        self._count = 0
        self._testcase = testcase
    def callback(self, block_nr, block_size, total_size):
        self._testcase.assertEqual(block_nr, self._count)
        self._count = self._count + 1


if __name__ == "__main__":
    unittest.main()
