from collections import defaultdict
from decimal import Decimal
import re
from secrets import token_urlsafe
from textwrap import wrap
from timeit import timeit
from types import MappingProxyType

from sqlalchemy import bindparam
from sqlalchemy import column


def test_case(fn=None, *, number=None):
    def wrap(fn):
        fn.__test_case__ = True
        if number is not None:
            fn.__number__ = number
        return fn

    if fn is None:
        return wrap
    else:
        return wrap(fn)


class Case:
    """Base test case. Mark test cases with ``test_case``"""

    IMPLEMENTATIONS = {}
    "Keys are the impl name, values are callable to load it"
    NUMBER = 1_000_000

    _CASES = []

    def __init__(self, impl):
        self.impl = impl
        self.init_objects()

    def __init_subclass__(cls):
        if not cls.__name__.startswith("_"):
            Case._CASES.append(cls)

    def init_objects(self):
        pass

    @classmethod
    def _load(cls, fn):
        try:
            return fn()
        except Exception as e:
            print(f"Error loading {fn}: {e}")

    @classmethod
    def import_object(cls):
        impl = []
        for name, fn in cls.IMPLEMENTATIONS.items():
            obj = cls._load(fn)
            if obj:
                impl.append((name, obj))
        return impl

    @classmethod
    def _divide_results(cls, results, num, div, name):
        "utility method to create ratios of two implementation"
        if div in results and num in results:
            results[name] = {
                m: results[num][m] / results[div][m] for m in results[div]
            }

    @classmethod
    def update_results(cls, results):
        pass

    @classmethod
    def run_case(cls, factor, filter_):
        objects = cls.import_object()
        number = max(1, int(cls.NUMBER * factor))

        stack = [c for c in cls.mro() if c not in {object, Case}]
        methods = []
        while stack:
            curr = stack.pop(0)
            # dict keeps the definition order, dir is instead sorted
            methods += [
                m
                for m, fn in curr.__dict__.items()
                if hasattr(fn, "__test_case__")
            ]

        if filter_:
            methods = [m for m in methods if re.search(filter_, m)]

        results = defaultdict(dict)
        for name, impl in objects:
            print(f"Running {name} ", end="", flush=True)
            impl_case = cls(impl)
            fails = []
            for m in methods:
                call = getattr(impl_case, m)
                try:
                    t_num = number
                    fn_num = getattr(call, "__number__", None)
                    if fn_num is not None:
                        t_num = max(1, int(fn_num * factor))
                    value = timeit(call, number=t_num)
                    print(".", end="", flush=True)
                except Exception as e:
                    fails.append(f"{name}::{m} error: {e}")
                    print("x", end="", flush=True)
                    value = float("nan")

                results[name][m] = value
            print(" Done")
            for f in fails:
                print("\t", f)

        cls.update_results(results)
        return results


class ImmutableDict(Case):
    @staticmethod
    def python():
        from sqlalchemy.util._py_collections import immutabledict

        return immutabledict

    @staticmethod
    def c():
        from sqlalchemy.cimmutabledict import immutabledict

        return immutabledict

    @staticmethod
    def cython():
        from sqlalchemy.cyextension.immutabledict import immutabledict

        return immutabledict

    IMPLEMENTATIONS = {
        "python": python.__func__,
        "c": c.__func__,
        "cython": cython.__func__,
    }

    def init_objects(self):
        self.small = {"a": 5, "b": 4}
        self.large = {f"k{i}": f"v{i}" for i in range(50)}
        self.d1 = self.impl({"x": 5, "y": 4})
        self.d2 = self.impl({f"key{i}": f"value{i}" for i in range(50)})

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "c", "python", "c / py")
        cls._divide_results(results, "cython", "python", "cy / py")
        cls._divide_results(results, "cython", "c", "cy / c")

    @test_case
    def init_empty(self):
        self.impl()

    @test_case
    def init(self):
        self.impl(self.small)

    @test_case
    def init_large(self):
        self.impl(self.large)

    @test_case
    def len(self):
        len(self.d1) + len(self.d2)

    @test_case
    def getitem(self):
        self.d1["x"]
        self.d2["key42"]

    @test_case
    def union(self):
        self.d1.union(self.small)

    @test_case
    def union_large(self):
        self.d2.union(self.large)

    @test_case
    def merge_with(self):
        self.d1.merge_with(self.small)

    @test_case
    def merge_with_large(self):
        self.d2.merge_with(self.large)

    @test_case
    def get(self):
        self.d1.get("x")
        self.d2.get("key42")

    @test_case
    def get_miss(self):
        self.d1.get("xxx")
        self.d2.get("xxx")

    @test_case
    def keys(self):
        self.d1.keys()
        self.d2.keys()

    @test_case
    def items(self):
        self.d1.items()
        self.d2.items()

    @test_case
    def values(self):
        self.d1.values()
        self.d2.values()

    @test_case
    def iter(self):
        list(self.d1)
        list(self.d2)

    @test_case
    def in_case(self):
        "x" in self.d1
        "key42" in self.d1

    @test_case
    def in_miss(self):
        "xx" in self.d1
        "xx" in self.d1

    @test_case
    def eq(self):
        self.d1 == self.d1
        self.d2 == self.d2

    @test_case
    def eq_dict(self):
        self.d1 == dict(self.d1)
        self.d2 == dict(self.d2)

    @test_case
    def eq_other(self):
        self.d1 == self.d2
        self.d1 == "foo"

    @test_case
    def ne(self):
        self.d1 != self.d1
        self.d2 != self.d2

    @test_case
    def ne_dict(self):
        self.d1 != dict(self.d1)
        self.d2 != dict(self.d2)

    @test_case
    def ne_other(self):
        self.d1 != self.d2
        self.d1 != "foo"


class Processor(Case):
    @staticmethod
    def python():
        from sqlalchemy.engine import processors

        return processors

    @staticmethod
    def c():
        from sqlalchemy import cprocessors as mod

        mod.to_decimal_processor_factory = (
            lambda t, s: mod.DecimalResultProcessor(t, "%%.%df" % s).process
        )

        return mod

    @staticmethod
    def cython():
        from sqlalchemy.cyextension import processors as mod

        mod.to_decimal_processor_factory = (
            lambda t, s: mod.DecimalResultProcessor(t, "%%.%df" % s).process
        )

        return mod

    IMPLEMENTATIONS = {
        "python": python.__func__,
        "c": c.__func__,
        "cython": cython.__func__,
    }
    NUMBER = 500_000

    def init_objects(self):
        self.to_dec = self.impl.to_decimal_processor_factory(Decimal, 10)

        self.bytes = token_urlsafe(2048).encode()
        self.text = token_urlsafe(2048)

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "c", "python", "c / py")
        cls._divide_results(results, "cython", "python", "cy / py")
        cls._divide_results(results, "cython", "c", "cy / c")

    @test_case
    def int_to_boolean(self):
        self.impl.int_to_boolean(None)
        self.impl.int_to_boolean(10)
        self.impl.int_to_boolean(1)
        self.impl.int_to_boolean(-10)
        self.impl.int_to_boolean(0)

    @test_case
    def to_str(self):
        self.impl.to_str(None)
        self.impl.to_str(123)
        self.impl.to_str(True)
        self.impl.to_str(self)

    @test_case
    def to_float(self):
        self.impl.to_float(None)
        self.impl.to_float(123)
        self.impl.to_float(True)
        self.impl.to_float(42)
        self.impl.to_float(0)
        self.impl.to_float(42.0)

    @test_case
    def str_to_datetime(self):
        self.impl.str_to_datetime(None)
        self.impl.str_to_datetime("2020-01-01 20:10:34")
        self.impl.str_to_datetime("2030-11-21 01:04:34.123456")

    @test_case
    def str_to_time(self):
        self.impl.str_to_time(None)
        self.impl.str_to_time("20:10:34")
        self.impl.str_to_time("01:04:34.123456")

    @test_case
    def str_to_date(self):
        self.impl.str_to_date(None)
        self.impl.str_to_date("2020-01-01")

    @test_case
    def to_decimal(self):
        self.to_dec(None) is None
        self.to_dec(123.44)
        self.to_dec(99)
        self.to_dec(99)


class DistillParam(Case):
    NUMBER = 2_000_000

    @staticmethod
    def python():
        from sqlalchemy.engine import _py_util

        return _py_util

    @staticmethod
    def cython():
        from sqlalchemy.cyextension import util as mod

        return mod

    IMPLEMENTATIONS = {
        "python": python.__func__,
        "cython": cython.__func__,
    }

    def init_objects(self):
        self.tup_tup = tuple(tuple(range(10)) for _ in range(100))
        self.list_tup = list(self.tup_tup)
        self.dict = {f"c{i}": i for i in range(100)}
        self.mapping = MappingProxyType(self.dict)
        self.tup_dic = (self.dict, self.dict)
        self.list_dic = [self.dict, self.dict]

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "c", "python", "c / py")
        cls._divide_results(results, "cython", "python", "cy / py")
        cls._divide_results(results, "cython", "c", "cy / c")

    @test_case
    def none_20(self):
        self.impl._distill_params_20(None)

    @test_case
    def empty_sequence_20(self):
        self.impl._distill_params_20(())
        self.impl._distill_params_20([])

    @test_case
    def list_20(self):
        self.impl._distill_params_20(self.list_tup)

    @test_case
    def tuple_20(self):
        self.impl._distill_params_20(self.tup_tup)

    @test_case
    def list_dict_20(self):
        self.impl._distill_params_20(self.list_tup)

    @test_case
    def tuple_dict_20(self):
        self.impl._distill_params_20(self.dict)

    @test_case
    def mapping_20(self):
        self.impl._distill_params_20(self.mapping)

    @test_case
    def raw_none(self):
        self.impl._distill_raw_params(None)

    @test_case
    def raw_empty_sequence(self):
        self.impl._distill_raw_params(())
        self.impl._distill_raw_params([])

    @test_case
    def raw_list(self):
        self.impl._distill_raw_params(self.list_tup)

    @test_case
    def raw_tuple(self):
        self.impl._distill_raw_params(self.tup_tup)

    @test_case
    def raw_list_dict(self):
        self.impl._distill_raw_params(self.list_tup)

    @test_case
    def raw_tuple_dict(self):
        self.impl._distill_raw_params(self.dict)

    @test_case
    def raw_mapping(self):
        self.impl._distill_raw_params(self.mapping)


class IdentitySet(Case):
    @staticmethod
    def set_fn():
        return set

    @staticmethod
    def python():
        from sqlalchemy.util._py_collections import IdentitySet

        return IdentitySet

    @staticmethod
    def cython():
        from sqlalchemy.cyextension import collections

        return collections.IdentitySet

    IMPLEMENTATIONS = {
        "set": set_fn.__func__,
        "python": python.__func__,
        "cython": cython.__func__,
    }
    NUMBER = 10

    def init_objects(self):
        self.val1 = list(range(10))
        self.val2 = list(wrap(token_urlsafe(4 * 2048), 4))

        self.imp_1 = self.impl(self.val1)
        self.imp_2 = self.impl(self.val2)

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "python", "set", "py / set")
        cls._divide_results(results, "cython", "python", "cy / py")
        cls._divide_results(results, "cython", "set", "cy / set")

    @test_case
    def init_empty(self):
        i = self.impl
        for _ in range(10000):
            i()

    @test_case
    def init(self):
        i, v = self.impl, self.val2
        for _ in range(500):
            i(v)

    @test_case
    def init_from_impl(self):
        for _ in range(500):
            self.impl(self.imp_2)

    @test_case
    def add(self):
        ii = self.impl()
        for _ in range(10):
            for i in range(1000):
                ii.add(str(i))

    @test_case
    def contains(self):
        ii = self.impl(self.val2)
        for _ in range(500):
            for x in self.val1 + self.val2:
                x in ii

    @test_case
    def remove(self):
        v = [str(i) for i in range(7500)]
        ii = self.impl(v)
        for x in v[:5000]:
            ii.remove(x)

    @test_case
    def discard(self):
        v = [str(i) for i in range(7500)]
        ii = self.impl(v)
        for x in v[:5000]:
            ii.discard(x)

    @test_case
    def pop(self):
        for x in range(1000):
            ii = self.impl(self.val1)
            for x in self.val1:
                ii.pop()

    @test_case
    def clear(self):
        i, v = self.impl, self.val1
        for _ in range(5000):
            ii = i(v)
            ii.clear()

    @test_case
    def eq(self):
        for x in range(1000):
            self.imp_1 == self.imp_1
            self.imp_1 == self.imp_2
            self.imp_1 == self.val2

    @test_case
    def ne(self):
        for x in range(1000):
            self.imp_1 != self.imp_1
            self.imp_1 != self.imp_2
            self.imp_1 != self.val2

    @test_case
    def issubset(self):
        for _ in range(250):
            self.imp_1.issubset(self.imp_1)
            self.imp_1.issubset(self.imp_2)
            self.imp_1.issubset(self.val1)
            self.imp_1.issubset(self.val2)

    @test_case
    def le(self):
        for x in range(1000):
            self.imp_1 <= self.imp_1
            self.imp_1 <= self.imp_2
            self.imp_2 <= self.imp_1
            self.imp_2 <= self.imp_2

    @test_case
    def lt(self):
        for x in range(2500):
            self.imp_1 < self.imp_1
            self.imp_1 < self.imp_2
            self.imp_2 < self.imp_1
            self.imp_2 < self.imp_2

    @test_case
    def issuperset(self):
        for _ in range(250):
            self.imp_1.issuperset(self.imp_1)
            self.imp_1.issuperset(self.imp_2)
            self.imp_1.issubset(self.val1)
            self.imp_1.issubset(self.val2)

    @test_case
    def ge(self):
        for x in range(1000):
            self.imp_1 >= self.imp_1
            self.imp_1 >= self.imp_2
            self.imp_2 >= self.imp_1
            self.imp_2 >= self.imp_2

    @test_case
    def gt(self):
        for x in range(2500):
            self.imp_1 > self.imp_1
            self.imp_2 > self.imp_2
            self.imp_2 > self.imp_1
            self.imp_2 > self.imp_2

    @test_case
    def union(self):
        for _ in range(250):
            self.imp_1.union(self.imp_2)

    @test_case
    def or_test(self):
        for _ in range(250):
            self.imp_1 | self.imp_2

    @test_case
    def update(self):
        ii = self.impl(self.val1)
        for _ in range(250):
            ii.update(self.imp_2)

    @test_case
    def ior(self):
        ii = self.impl(self.val1)
        for _ in range(250):
            ii |= self.imp_2

    @test_case
    def difference(self):
        for _ in range(250):
            self.imp_1.difference(self.imp_2)
            self.imp_1.difference(self.val2)

    @test_case
    def sub(self):
        for _ in range(500):
            self.imp_1 - self.imp_2

    @test_case
    def difference_update(self):
        ii = self.impl(self.val1)
        for _ in range(250):
            ii.difference_update(self.imp_2)
            ii.difference_update(self.val2)

    @test_case
    def isub(self):
        ii = self.impl(self.val1)
        for _ in range(500):
            ii -= self.imp_2

    @test_case
    def intersection(self):
        for _ in range(250):
            self.imp_1.intersection(self.imp_2)
            self.imp_1.intersection(self.val2)

    @test_case
    def and_test(self):
        for _ in range(500):
            self.imp_1 & self.imp_2

    @test_case
    def intersection_up(self):
        ii = self.impl(self.val1)
        for _ in range(250):
            ii.intersection_update(self.imp_2)
            ii.intersection_update(self.val2)

    @test_case
    def iand(self):
        ii = self.impl(self.val1)
        for _ in range(500):
            ii &= self.imp_2

    @test_case
    def symmetric_diff(self):
        for _ in range(125):
            self.imp_1.symmetric_difference(self.imp_2)
            self.imp_1.symmetric_difference(self.val2)

    @test_case
    def xor(self):
        for _ in range(250):
            self.imp_1 ^ self.imp_2

    @test_case
    def symmetric_diff_up(self):
        ii = self.impl(self.val1)
        for _ in range(125):
            ii.symmetric_difference_update(self.imp_2)
            ii.symmetric_difference_update(self.val2)

    @test_case
    def ixor(self):
        ii = self.impl(self.val1)
        for _ in range(250):
            ii ^= self.imp_2

    @test_case
    def copy(self):
        for _ in range(250):
            self.imp_1.copy()
            self.imp_2.copy()

    @test_case
    def len(self):
        for x in range(5000):
            len(self.imp_1)
            len(self.imp_2)

    @test_case
    def iter(self):
        for _ in range(2000):
            list(self.imp_1)
            list(self.imp_2)

    @test_case
    def repr(self):
        for _ in range(250):
            str(self.imp_1)
            str(self.imp_2)


class OrderedSet(IdentitySet):
    @staticmethod
    def set_fn():
        return set

    @staticmethod
    def python():
        from sqlalchemy.util._py_collections import OrderedSet

        return OrderedSet

    @staticmethod
    def cython():
        from sqlalchemy.cyextension import collections

        return collections.OrderedSet

    @staticmethod
    def ordered_lib():
        from orderedset import OrderedSet

        return OrderedSet

    IMPLEMENTATIONS = {
        "set": set_fn.__func__,
        "python": python.__func__,
        "cython": cython.__func__,
        "ordsetlib": ordered_lib.__func__,
    }

    @classmethod
    def update_results(cls, results):
        super().update_results(results)
        cls._divide_results(results, "ordsetlib", "set", "ordlib/set")
        cls._divide_results(results, "cython", "ordsetlib", "cy / ordlib")

    @test_case
    def add_op(self):
        ii = self.impl(self.val1)
        v2 = self.impl(self.val2)
        for _ in range(1000):
            ii + v2

    @test_case
    def getitem(self):
        ii = self.impl(self.val1)
        for _ in range(1000):
            for i in range(len(self.val1)):
                ii[i]

    @test_case
    def insert(self):
        ii = self.impl(self.val1)
        for _ in range(5):
            for i in range(1000):
                ii.insert(-i % 2, 1)


class TupleGetter(Case):
    NUMBER = 2_000_000

    @staticmethod
    def python():
        from sqlalchemy.engine._py_row import tuplegetter

        return tuplegetter

    @staticmethod
    def c():
        from sqlalchemy import cresultproxy

        return cresultproxy.tuplegetter

    @staticmethod
    def cython():
        from sqlalchemy.cyextension import resultproxy

        return resultproxy.tuplegetter

    IMPLEMENTATIONS = {
        "python": python.__func__,
        "c": c.__func__,
        "cython": cython.__func__,
    }

    def init_objects(self):
        self.impl_tg = self.impl

        self.tuple = tuple(range(1000))
        self.tg_inst = self.impl_tg(42)
        self.tg_inst_m = self.impl_tg(42, 420, 99, 9, 1)
        self.tg_inst_seq = self.impl_tg(*range(70, 75))

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "c", "python", "c / py")
        cls._divide_results(results, "cython", "python", "cy / py")
        cls._divide_results(results, "cython", "c", "cy / c")

    @test_case
    def tuplegetter_one(self):
        self.tg_inst(self.tuple)

    @test_case
    def tuplegetter_many(self):
        self.tg_inst_m(self.tuple)

    @test_case
    def tuplegetter_seq(self):
        self.tg_inst_seq(self.tuple)

    @test_case
    def tuplegetter_new_one(self):
        self.impl_tg(42)(self.tuple)

    @test_case
    def tuplegetter_new_many(self):
        self.impl_tg(42, 420, 99, 9, 1)(self.tuple)

    @test_case
    def tuplegetter_new_seq(self):
        self.impl_tg(40, 41, 42, 43, 44)(self.tuple)


class BaseRow(Case):
    @staticmethod
    def python():
        from sqlalchemy.engine._py_row import BaseRow

        return BaseRow

    @staticmethod
    def c():
        from sqlalchemy.cresultproxy import BaseRow

        return BaseRow

    @staticmethod
    def cython():
        from sqlalchemy.cyextension import resultproxy

        return resultproxy.BaseRow

    IMPLEMENTATIONS = {
        "python": python.__func__,
        # "c": c.__func__,
        "cython": cython.__func__,
    }

    def init_objects(self):
        from sqlalchemy.engine.result import SimpleResultMetaData
        from string import ascii_letters

        self.parent = SimpleResultMetaData(("a", "b", "c"))
        self.row_args = (
            self.parent,
            self.parent._processors,
            self.parent._key_to_index,
            (1, 2, 3),
        )
        self.parent_long = SimpleResultMetaData(tuple(ascii_letters))
        self.row_long_args = (
            self.parent_long,
            self.parent_long._processors,
            self.parent_long._key_to_index,
            tuple(range(len(ascii_letters))),
        )
        self.row = self.impl(*self.row_args)
        self.row_long = self.impl(*self.row_long_args)
        assert isinstance(self.row, self.impl), type(self.row)

        class Row(self.impl):
            pass

        self.Row = Row
        self.row_sub = Row(*self.row_args)

        self.row_state = self.row.__getstate__()
        self.row_long_state = self.row_long.__getstate__()

        assert len(ascii_letters) == 52
        self.parent_proc = SimpleResultMetaData(
            tuple(ascii_letters),
            _processors=[None, int, float, None, str] * 10,  # cut the last 2
        )
        self.row_proc_args = (
            self.parent_proc,
            self.parent_proc._processors,
            self.parent_proc._key_to_index,
            tuple(range(len(ascii_letters))),
        )

        self.parent_proc_none = SimpleResultMetaData(
            tuple(ascii_letters), _processors=[None] * 52
        )
        self.row_proc_none_args = (
            self.parent_proc_none,
            # NOTE: usually the code calls _effective_processors that returns
            # None for this case of all None.
            self.parent_proc_none._processors,
            self.parent_proc_none._key_to_index,
            tuple(range(len(ascii_letters))),
        )

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "c", "python", "c / py")
        cls._divide_results(results, "cython", "python", "cy / py")
        cls._divide_results(results, "cython", "c", "cy / c")

    @test_case
    def base_row_new(self):
        self.impl(*self.row_args)
        self.impl(*self.row_long_args)

    @test_case
    def row_new(self):
        self.Row(*self.row_args)
        self.Row(*self.row_long_args)

    @test_case
    def base_row_new_proc(self):
        self.impl(*self.row_proc_args)

    @test_case
    def row_new_proc(self):
        self.Row(*self.row_proc_args)

    @test_case
    def brow_new_proc_none(self):
        self.impl(*self.row_proc_none_args)

    @test_case
    def row_new_proc_none(self):
        self.Row(*self.row_proc_none_args)

    @test_case
    def row_dumps(self):
        self.row.__getstate__()
        self.row_long.__getstate__()

    @test_case
    def row_loads(self):
        self.impl.__new__(self.impl).__setstate__(self.row_state)
        self.impl.__new__(self.impl).__setstate__(self.row_long_state)

    @test_case
    def row_values_impl(self):
        self.row._values_impl()
        self.row_long._values_impl()

    @test_case
    def row_iter(self):
        list(self.row)
        list(self.row_long)

    @test_case
    def row_len(self):
        len(self.row)
        len(self.row_long)

    @test_case
    def row_hash(self):
        hash(self.row)
        hash(self.row_long)

    @test_case
    def getitem(self):
        self.row[0]
        self.row[1]
        self.row[-1]
        self.row_long[0]
        self.row_long[1]
        self.row_long[-1]

    @test_case
    def getitem_slice(self):
        self.row[0:1]
        self.row[1:-1]
        self.row_long[0:1]
        self.row_long[1:-1]

    @test_case
    def get_by_key(self):
        self.row._get_by_key_impl_mapping("a")
        self.row._get_by_key_impl_mapping("b")
        self.row_long._get_by_key_impl_mapping("s")
        self.row_long._get_by_key_impl_mapping("a")

    @test_case
    def getattr(self):
        self.row.a
        self.row.b
        self.row_long.x
        self.row_long.y

    @test_case(number=50_000)
    def get_by_key_recreate(self):
        self.init_objects()
        row = self.row
        for _ in range(25):
            row._get_by_key_impl_mapping("a")
        l_row = self.row_long
        for _ in range(25):
            l_row._get_by_key_impl_mapping("f")
            l_row._get_by_key_impl_mapping("o")
            l_row._get_by_key_impl_mapping("r")
            l_row._get_by_key_impl_mapping("t")
            l_row._get_by_key_impl_mapping("y")
            l_row._get_by_key_impl_mapping("t")
            l_row._get_by_key_impl_mapping("w")
            l_row._get_by_key_impl_mapping("o")

    @test_case(number=50_000)
    def getattr_recreate(self):
        self.init_objects()
        row = self.row
        for _ in range(25):
            row.a
        l_row = self.row_long
        for _ in range(25):
            l_row.f
            l_row.o
            l_row.r
            l_row.t
            l_row.y
            l_row.t
            l_row.w
            l_row.o


class CacheAnonMap(Case):
    @staticmethod
    def python():
        from sqlalchemy.sql._py_util import cache_anon_map

        return cache_anon_map

    @staticmethod
    def cython():
        from sqlalchemy.cyextension.util import cache_anon_map

        return cache_anon_map

    IMPLEMENTATIONS = {"python": python.__func__, "cython": cython.__func__}

    NUMBER = 1000000

    def init_objects(self):
        self.object_1 = column("x")
        self.object_2 = bindparam("y")

        self.impl_w_non_present = self.impl()
        self.impl_w_present = iwp = self.impl()
        iwp.get_anon(self.object_1)
        iwp.get_anon(self.object_2)

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "cython", "python", "cy / py")

    @test_case
    def test_get_anon_non_present(self):
        self.impl_w_non_present.get_anon(self.object_1)

    @test_case
    def test_get_anon_present(self):
        self.impl_w_present.get_anon(self.object_1)

    @test_case
    def test_has_key_non_present(self):
        id(self.object_1) in self.impl_w_non_present

    @test_case
    def test_has_key_present(self):
        id(self.object_1) in self.impl_w_present


class PrefixAnonMap(Case):
    @staticmethod
    def python():
        from sqlalchemy.sql._py_util import prefix_anon_map

        return prefix_anon_map

    @staticmethod
    def cython():
        from sqlalchemy.cyextension.util import prefix_anon_map

        return prefix_anon_map

    IMPLEMENTATIONS = {"python": python.__func__, "cython": cython.__func__}

    NUMBER = 1000000

    def init_objects(self):
        from sqlalchemy.sql.elements import _anonymous_label

        self.name = _anonymous_label.safe_construct(58243, "some_column_name")

        self.impl_w_non_present = self.impl()
        self.impl_w_present = iwp = self.impl()
        self.name.apply_map(iwp)

    @classmethod
    def update_results(cls, results):
        cls._divide_results(results, "cython", "python", "cy / py")

    @test_case
    def test_apply_non_present(self):
        self.name.apply_map(self.impl_w_non_present)

    @test_case
    def test_apply_present(self):
        self.name.apply_map(self.impl_w_present)


def tabulate(results, inverse):
    dim = 11
    header = "{:<20}|" + (" {:<%s} |" % dim) * len(results)
    num_format = "{:<%s.9f}" % dim
    row = "{:<20}|" + " {} |" * len(results)
    names = list(results)
    print(header.format("", *names))

    for meth in inverse:
        strings = [
            num_format.format(inverse[meth][name])[:dim] for name in names
        ]
        print(row.format(meth, *strings))


def main():
    import argparse

    cases = Case._CASES

    parser = argparse.ArgumentParser(
        description="Compare implementation between them"
    )
    parser.add_argument(
        "case",
        help="Case to run",
        nargs="+",
        choices=["all"] + [c.__name__ for c in cases],
    )
    parser.add_argument("--filter", help="filter the test for this regexp")
    parser.add_argument(
        "--factor", help="scale number passed to timeit", type=float, default=1
    )
    parser.add_argument("--csv", help="save to csv", action="store_true")

    args = parser.parse_args()

    if "all" in args.case:
        to_run = cases
    else:
        to_run = [c for c in cases if c.__name__ in args.case]

    for case in to_run:
        print("Running case", case.__name__)
        result = case.run_case(args.factor, args.filter)

        inverse = defaultdict(dict)
        for name in result:
            for meth in result[name]:
                inverse[meth][name] = result[name][meth]

        tabulate(result, inverse)

        if args.csv:
            import csv

            file_name = f"{case.__name__}.csv"
            with open(file_name, "w", newline="") as f:
                w = csv.DictWriter(f, ["", *result])
                w.writeheader()
                for n in inverse:
                    w.writerow({"": n, **inverse[n]})
            print("Wrote file", file_name)


if __name__ == "__main__":
    main()
