import asyncio
import functools
import random
import re
import socket
from datetime import timedelta
from unittest import SkipTest, mock

from statsd import StatsClient
from statsd import TCPStatsClient
from statsd import UnixSocketStatsClient


ADDR = (socket.gethostbyname('localhost'), 8125)
UNIX_SOCKET = 'tmp.socket'


# proto specific methods to get the socket method to send data
send_method = {
    'udp': lambda x: x.sendto,
    'tcp': lambda x: x.sendall,
    'unix': lambda x: x.sendall,
}


# proto specific methods to create the expected value
make_val = {
    'udp': lambda x, addr: mock.call(str.encode(x), addr),
    'tcp': lambda x, addr: mock.call(str.encode(x + '\n')),
    'unix': lambda x, addr: mock.call(str.encode(x + '\n')),
}


def eq_(a, b, msg=None):
    assert a == b, msg


def _udp_client(prefix=None, addr=None, port=None, ipv6=False):
    if not addr:
        addr = ADDR[0]
    if not port:
        port = ADDR[1]
    sc = StatsClient(host=addr, port=port, prefix=prefix, ipv6=ipv6)
    sc._sock = mock.Mock()
    return sc


def _tcp_client(prefix=None, addr=None, port=None, timeout=None, ipv6=False):
    if not addr:
        addr = ADDR[0]
    if not port:
        port = ADDR[1]
    sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout,
                        ipv6=ipv6)
    sc._sock = mock.Mock()
    return sc


def _unix_socket_client(prefix=None, socket_path=None):
    if not socket_path:
        socket_path = UNIX_SOCKET

    sc = UnixSocketStatsClient(socket_path=socket_path, prefix=prefix)
    sc._sock = mock.Mock()
    return sc


def _timer_check(sock, count, proto, start, end):
    send = send_method[proto](sock)
    eq_(send.call_count, count)
    value = send.call_args[0][0].decode('ascii')
    exp = re.compile(r'^%s:\d+|%s$' % (start, end))
    assert exp.match(value)


def _sock_check(sock, count, proto, val=None, addr=None):
    send = send_method[proto](sock)
    eq_(send.call_count, count)
    if not addr:
        addr = ADDR
    if val is not None:
        eq_(
            send.call_args,
            make_val[proto](val, addr),
        )


class assert_raises(object):
    """A context manager that asserts a given exception was raised.

    >>> with assert_raises(TypeError):
    ...     raise TypeError

    >>> with assert_raises(TypeError):
    ...     raise ValueError
    AssertionError: ValueError not in ['TypeError']

    >>> with assert_raises(TypeError):
    ...     pass
    AssertionError: No exception raised.

    Or you can specify any of a number of exceptions:

    >>> with assert_raises(TypeError, ValueError):
    ...     raise ValueError

    >>> with assert_raises(TypeError, ValueError):
    ...     raise KeyError
    AssertionError: KeyError not in ['TypeError', 'ValueError']

    You can also get the exception back later:

    >>> with assert_raises(TypeError) as cm:
    ...     raise TypeError('bad type!')
    >>> cm.exception
    TypeError('bad type!')
    >>> cm.exc_type
    TypeError
    >>> cm.traceback
    <traceback @ 0x3323ef0>

    Lowercase name because that it's a class is an implementation detail.

    """

    def __init__(self, *exc_cls):
        self.exc_cls = exc_cls

    def __enter__(self):
        # For access to the exception later.
        return self

    def __exit__(self, typ, value, tb):
        assert typ, 'No exception raised.'
        assert typ in self.exc_cls, '%s not in %s' % (
            typ.__name__, [e.__name__ for e in self.exc_cls])
        self.exc_type = typ
        self.exception = value
        self.traceback = tb

        # Swallow expected exceptions.
        return True


def _test_incr(cl, proto):
    cl.incr('foo')
    _sock_check(cl._sock, 1, proto, val='foo:1|c')

    cl.incr('foo', 10)
    _sock_check(cl._sock, 2, proto, val='foo:10|c')

    cl.incr('foo', 1.2)
    _sock_check(cl._sock, 3, proto, val='foo:1.2|c')

    cl.incr('foo', 10, rate=0.5)
    _sock_check(cl._sock, 4, proto, val='foo:10|c|@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_incr_udp():
    """StatsClient.incr works."""
    cl = _udp_client()
    _test_incr(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_incr_tcp():
    """TCPStatsClient.incr works."""
    cl = _tcp_client()
    _test_incr(cl, 'tcp')


@mock.patch.object(random, 'random', lambda: -1)
def test_incr_unix_socket():
    """TCPStatsClient.incr works."""
    cl = _unix_socket_client()
    _test_incr(cl, 'unix')


def _test_decr(cl, proto):
    cl.decr('foo')
    _sock_check(cl._sock, 1, proto, 'foo:-1|c')

    cl.decr('foo', 10)
    _sock_check(cl._sock, 2, proto, 'foo:-10|c')

    cl.decr('foo', 1.2)
    _sock_check(cl._sock, 3, proto, 'foo:-1.2|c')

    cl.decr('foo', 1, rate=0.5)
    _sock_check(cl._sock, 4, proto, 'foo:-1|c|@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_decr_udp():
    """StatsClient.decr works."""
    cl = _udp_client()
    _test_decr(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_decr_tcp():
    """TCPStatsClient.decr works."""
    cl = _tcp_client()
    _test_decr(cl, 'tcp')


@mock.patch.object(random, 'random', lambda: -1)
def test_decr_unix_socket():
    """TCPStatsClient.decr works."""
    cl = _unix_socket_client()
    _test_decr(cl, 'unix')


def _test_gauge(cl, proto):
    cl.gauge('foo', 30)
    _sock_check(cl._sock, 1, proto, 'foo:30|g')

    cl.gauge('foo', 1.2)
    _sock_check(cl._sock, 2, proto, 'foo:1.2|g')

    cl.gauge('foo', 70, rate=0.5)
    _sock_check(cl._sock, 3, proto, 'foo:70|g|@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_udp():
    """StatsClient.gauge works."""
    cl = _udp_client()
    _test_gauge(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_tcp():
    """TCPStatsClient.gauge works."""
    cl = _tcp_client()
    _test_gauge(cl, 'tcp')


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_unix_socket():
    """TCPStatsClient.decr works."""
    cl = _unix_socket_client()
    _test_gauge(cl, 'unix')


def _test_ipv6(cl, proto, addr):
    cl.gauge('foo', 30)
    _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr)


def test_ipv6_udp():
    """StatsClient can use to IPv6 address."""
    addr = ('::1', 8125, 0, 0)
    cl = _udp_client(addr=addr[0], ipv6=True)
    _test_ipv6(cl, 'udp', addr)


def test_ipv6_tcp():
    """TCPStatsClient can use to IPv6 address."""
    addr = ('::1', 8125, 0, 0)
    cl = _tcp_client(addr=addr[0], ipv6=True)
    _test_ipv6(cl, 'tcp', addr)


def _test_resolution(cl, proto, addr):
    cl.incr('foo')
    _sock_check(cl._sock, 1, proto, 'foo:1|c', addr=addr)


def test_ipv6_resolution_udp():
    raise SkipTest('IPv6 resolution is broken on Travis')
    cl = _udp_client(addr='localhost', ipv6=True)
    _test_resolution(cl, 'udp', ('::1', 8125, 0, 0))


def test_ipv6_resolution_tcp():
    cl = _tcp_client(addr='localhost', ipv6=True)
    _test_resolution(cl, 'tcp', ('::1', 8125, 0, 0))


def test_ipv4_resolution_udp():
    cl = _udp_client(addr='localhost')
    _test_resolution(cl, 'udp', ('127.0.0.1', 8125))


def test_ipv4_resolution_tcp():
    cl = _tcp_client(addr='localhost')
    _test_resolution(cl, 'tcp', ('127.0.0.1', 8125))


def _test_gauge_delta(cl, proto):
    tests = (
        (12, '+12'),
        (-13, '-13'),
        (1.2, '+1.2'),
        (-1.3, '-1.3'),
    )

    def _check(num, result):
        cl._sock.reset_mock()
        cl.gauge('foo', num, delta=True)
        _sock_check(cl._sock, 1, proto, 'foo:%s|g' % result)

    for num, result in tests:
        _check(num, result)


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_delta_udp():
    """StatsClient.gauge works with delta values."""
    cl = _udp_client()
    _test_gauge_delta(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_delta_tcp():
    """TCPStatsClient.gauge works with delta values."""
    cl = _tcp_client()
    _test_gauge_delta(cl, 'tcp')


def _test_gauge_absolute_negative(cl, proto):
    cl.gauge('foo', -5, delta=False)
    _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g')


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_absolute_negative_udp():
    """StatsClient.gauge works with absolute negative value."""
    cl = _udp_client()
    _test_gauge_delta(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_gauge_absolute_negative_tcp():
    """TCPStatsClient.gauge works with absolute negative value."""
    cl = _tcp_client()
    _test_gauge_delta(cl, 'tcp')


def _test_gauge_absolute_negative_rate(cl, proto, mock_random):
    mock_random.return_value = -1
    cl.gauge('foo', -1, rate=0.5, delta=False)
    _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g')

    mock_random.return_value = 2
    cl.gauge('foo', -2, rate=0.5, delta=False)
    # Should not have changed.
    _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g')


@mock.patch.object(random, 'random')
def test_gauge_absolute_negative_rate_udp(mock_random):
    """StatsClient.gauge works with absolute negative value and rate."""
    cl = _udp_client()
    _test_gauge_absolute_negative_rate(cl, 'udp', mock_random)


@mock.patch.object(random, 'random')
def test_gauge_absolute_negative_rate_tcp(mock_random):
    """TCPStatsClient.gauge works with absolute negative value and rate."""
    cl = _tcp_client()
    _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random)


def _test_set(cl, proto):
    cl.set('foo', 10)
    _sock_check(cl._sock, 1, proto, 'foo:10|s')

    cl.set('foo', 2.3)
    _sock_check(cl._sock, 2, proto, 'foo:2.3|s')

    cl.set('foo', 'bar')
    _sock_check(cl._sock, 3, proto, 'foo:bar|s')

    cl.set('foo', 2.3, 0.5)
    _sock_check(cl._sock, 4, proto, 'foo:2.3|s|@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_set_udp():
    """StatsClient.set works."""
    cl = _udp_client()
    _test_set(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_set_tcp():
    """TCPStatsClient.set works."""
    cl = _tcp_client()
    _test_set(cl, 'tcp')


def _test_timing(cl, proto):
    cl.timing('foo', 100)
    _sock_check(cl._sock, 1, proto, 'foo:100.000000|ms')

    cl.timing('foo', 350)
    _sock_check(cl._sock, 2, proto, 'foo:350.000000|ms')

    cl.timing('foo', 100, rate=0.5)
    _sock_check(cl._sock, 3, proto, 'foo:100.000000|ms|@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_timing_udp():
    """StatsClient.timing works."""
    cl = _udp_client()
    _test_timing(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_timing_tcp():
    """TCPStatsClient.timing works."""
    cl = _tcp_client()
    _test_timing(cl, 'tcp')


def test_timing_supports_timedelta():
    cl = _udp_client()
    proto = 'udp'

    cl.timing('foo', timedelta(seconds=1.5))
    _sock_check(cl._sock, 1, proto, 'foo:1500.000000|ms')

    cl.timing('foo', timedelta(days=1.5))
    _sock_check(cl._sock, 2, proto, 'foo:129600000.000000|ms')


@mock.patch.object(random, 'random', lambda: -1)
def test_timing_unix_socket():
    """UnixSocketStatsClient.timing works."""
    cl = _unix_socket_client()
    _test_timing(cl, 'unix')


def _test_prepare(cl, proto):
    tests = (
        ('foo:1|c', ('foo', '1|c', 1)),
        ('bar:50|ms|@0.5', ('bar', '50|ms', 0.5)),
        ('baz:23|g', ('baz', '23|g', 1)),
    )

    def _check(o, s, v, r):
        with mock.patch.object(random, 'random', lambda: -1):
            eq_(o, cl._prepare(s, v, r))

    for o, (s, v, r) in tests:
        _check(o, s, v, r)


@mock.patch.object(random, 'random', lambda: -1)
def test_prepare_udp():
    """Test StatsClient._prepare method."""
    cl = _udp_client()
    _test_prepare(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_prepare_tcp():
    """Test TCPStatsClient._prepare method."""
    cl = _tcp_client()
    _test_prepare(cl, 'tcp')


def _test_prefix(cl, proto):
    cl.incr('bar')
    _sock_check(cl._sock, 1, proto, 'foo.bar:1|c')


@mock.patch.object(random, 'random', lambda: -1)
def test_prefix_udp():
    """StatsClient.incr works."""
    cl = _udp_client(prefix='foo')
    _test_prefix(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_prefix_tcp():
    """TCPStatsClient.incr works."""
    cl = _tcp_client(prefix='foo')
    _test_prefix(cl, 'tcp')


@mock.patch.object(random, 'random', lambda: -1)
def test_prefix_unix_socket():
    """UnixSocketStatsClient.incr works."""
    cl = _unix_socket_client(prefix='foo')
    _test_prefix(cl, 'unix')


def _test_timer_manager(cl, proto):
    with cl.timer('foo'):
        pass

    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_timer_manager_udp():
    """StatsClient.timer can be used as manager."""
    cl = _udp_client()
    _test_timer_manager(cl, 'udp')


def test_timer_manager_tcp():
    """TCPStatsClient.timer can be used as manager."""
    cl = _tcp_client()
    _test_timer_manager(cl, 'tcp')


def _test_timer_decorator(cl, proto):
    @cl.timer('foo')
    def foo(a, b):
        return [a, b]

    @cl.timer('bar')
    def bar(a, b):
        return [b, a]

    # make sure it works with more than one decorator, called multiple
    # times, and that parameters are handled correctly
    eq_([4, 2], foo(4, 2))
    _timer_check(cl._sock, 1, proto, 'foo', 'ms')

    eq_([2, 4], bar(4, 2))
    _timer_check(cl._sock, 2, proto, 'bar', 'ms')

    eq_([6, 5], bar(5, 6))
    _timer_check(cl._sock, 3, proto, 'bar', 'ms')


def test_timer_decorator_udp():
    """StatsClient.timer is a thread-safe decorator (UDP)."""
    cl = _udp_client()
    _test_timer_decorator(cl, 'udp')


def test_timer_decorator_tcp():
    """StatsClient.timer is a thread-safe decorator (TCP)."""
    cl = _tcp_client()
    _test_timer_decorator(cl, 'tcp')


def _test_timer_capture(cl, proto):
    with cl.timer('woo') as result:
        eq_(result.ms, None)
    assert isinstance(result.ms, float)


def test_timer_capture_udp():
    """You can capture the output of StatsClient.timer (UDP)."""
    cl = _udp_client()
    _test_timer_capture(cl, 'udp')


def test_timer_capture_tcp():
    """You can capture the output of StatsClient.timer (TCP)."""
    cl = _tcp_client()
    _test_timer_capture(cl, 'tcp')


def _test_timer_context_rate(cl, proto):
    with cl.timer('foo', rate=0.5):
        pass

    _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_timer_context_rate_udp():
    """StatsClient.timer can be used as manager with rate."""
    cl = _udp_client()
    _test_timer_context_rate(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_timer_context_rate_tcp():
    """TCPStatsClient.timer can be used as manager with rate."""
    cl = _tcp_client()
    _test_timer_context_rate(cl, 'tcp')


def test_timer_decorator_partial_function():
    """TCPStatsClient.timer can be used as decorator on a partial function."""
    cl = _tcp_client()

    foo = functools.partial(lambda x: x * x, 2)
    func = cl.timer('foo')(foo)

    eq_(4, func())

    _timer_check(cl._sock, 1, 'tcp', 'foo', 'ms|@0.1')


def _test_timer_decorator_rate(cl, proto):
    @cl.timer('foo', rate=0.1)
    def foo(a, b):
        return [b, a]

    @cl.timer('bar', rate=0.2)
    def bar(a, b=2, c=3):
        return [c, b, a]

    eq_([2, 4], foo(4, 2))
    _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.1')

    eq_([3, 2, 5], bar(5))
    _timer_check(cl._sock, 2, proto, 'bar', 'ms|@0.2')


@mock.patch.object(random, 'random', lambda: -1)
def test_timer_decorator_rate_udp():
    """StatsClient.timer can be used as decorator with rate."""
    cl = _udp_client()
    _test_timer_decorator_rate(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_timer_decorator_rate_tcp():
    """TCPStatsClient.timer can be used as decorator with rate."""
    cl = _tcp_client()
    _test_timer_decorator_rate(cl, 'tcp')


def _test_timer_context_exceptions(cl, proto):
    with assert_raises(socket.timeout):
        with cl.timer('foo'):
            raise socket.timeout()

    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_timer_context_exceptions_udp():
    cl = _udp_client()
    _test_timer_context_exceptions(cl, 'udp')


def test_timer_context_exceptions_tcp():
    cl = _tcp_client()
    _test_timer_context_exceptions(cl, 'tcp')


def _test_timer_decorator_exceptions(cl, proto):
    @cl.timer('foo')
    def foo():
        raise ValueError()

    with assert_raises(ValueError):
        foo()

    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_coroutine_timer_decorator():
    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    already = False
    cl = _udp_client()

    def _send(*_):
        nonlocal already
        assert already is True, '_send called before coroutine completed'

    cl._send = _send

    @cl.timer('bar')
    async def inner():
        nonlocal already
        await asyncio.sleep(0)
        already = True
        return None

    event_loop.run_until_complete(inner())
    event_loop.close()


def test_timer_decorator_exceptions_udp():
    cl = _udp_client()
    _test_timer_decorator_exceptions(cl, 'udp')


def test_timer_decorator_exceptions_tcp():
    cl = _tcp_client()
    _test_timer_decorator_exceptions(cl, 'tcp')


def _test_timer_object(cl, proto):
    t = cl.timer('foo').start()
    t.stop()

    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_timer_object_udp():
    """StatsClient.timer works."""
    cl = _udp_client()
    _test_timer_object(cl, 'udp')


def test_timer_object_tcp():
    """TCPStatsClient.timer works."""
    cl = _tcp_client()
    _test_timer_object(cl, 'tcp')


def _test_timer_object_no_send(cl, proto):
    t = cl.timer('foo').start()
    t.stop(send=False)
    _sock_check(cl._sock, 0, proto)

    t.send()
    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_timer_object_no_send_udp():
    """Stop StatsClient.timer without sending."""
    cl = _udp_client()
    _test_timer_object_no_send(cl, 'udp')


def test_timer_object_no_send_tcp():
    """Stop TCPStatsClient.timer without sending."""
    cl = _tcp_client()
    _test_timer_object_no_send(cl, 'tcp')


def _test_timer_object_rate(cl, proto):
    t = cl.timer('foo', rate=0.5)
    t.start()
    t.stop()

    _timer_check(cl._sock, 1, proto, 'foo', 'ms@0.5')


@mock.patch.object(random, 'random', lambda: -1)
def test_timer_object_rate_udp():
    """StatsClient.timer works with rate."""
    cl = _udp_client()
    _test_timer_object_rate(cl, 'udp')


@mock.patch.object(random, 'random', lambda: -1)
def test_timer_object_rate_tcp():
    """TCPStatsClient.timer works with rate."""
    cl = _tcp_client()
    _test_timer_object_rate(cl, 'tcp')


def _test_timer_object_no_send_twice(cl):
    t = cl.timer('foo').start()
    t.stop()

    with assert_raises(RuntimeError):
        t.send()


def test_timer_object_no_send_twice_udp():
    """StatsClient.timer raises RuntimeError if send is called twice."""
    cl = _udp_client()
    _test_timer_object_no_send_twice(cl)


def test_timer_object_no_send_twice_tcp():
    """TCPStatsClient.timer raises RuntimeError if send is called twice."""
    cl = _tcp_client()
    _test_timer_object_no_send_twice(cl)


def _test_timer_send_without_stop(cl):
    with cl.timer('foo') as t:
        assert t.ms is None
        with assert_raises(RuntimeError):
            t.send()

    t = cl.timer('bar').start()
    assert t.ms is None
    with assert_raises(RuntimeError):
        t.send()


def test_timer_send_without_stop_udp():
    """StatsClient.timer raises error if send is called before stop."""
    cl = _udp_client()
    _test_timer_send_without_stop(cl)


def test_timer_send_without_stop_tcp():
    """TCPStatsClient.timer raises error if send is called before stop."""
    cl = _tcp_client()
    _test_timer_send_without_stop(cl)


def _test_timer_object_stop_without_start(cl):
    with assert_raises(RuntimeError):
        cl.timer('foo').stop()


def test_timer_object_stop_without_start_udp():
    """StatsClient.timer raises error if stop is called before start."""
    cl = _udp_client()
    _test_timer_object_stop_without_start(cl)


def test_timer_object_stop_without_start_tcp():
    """TCPStatsClient.timer raises error if stop is called before start."""
    cl = _tcp_client()
    _test_timer_object_stop_without_start(cl)


def _test_pipeline(cl, proto):
    pipe = cl.pipeline()
    pipe.incr('foo')
    pipe.decr('bar')
    pipe.timing('baz', 320)
    pipe.send()
    _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:320.000000|ms')


def test_pipeline_udp():
    """StatsClient.pipeline works."""
    cl = _udp_client()
    _test_pipeline(cl, 'udp')


def test_pipeline_tcp():
    """TCPStatsClient.pipeline works."""
    cl = _tcp_client()
    _test_pipeline(cl, 'tcp')


def _test_pipeline_null(cl, proto):
    pipe = cl.pipeline()
    pipe.send()
    _sock_check(cl._sock, 0, proto)


def test_pipeline_null_udp():
    """Ensure we don't error on an empty pipeline (UDP)."""
    cl = _udp_client()
    _test_pipeline_null(cl, 'udp')


def test_pipeline_null_tcp():
    """Ensure we don't error on an empty pipeline (TCP)."""
    cl = _tcp_client()
    _test_pipeline_null(cl, 'tcp')


def _test_pipeline_manager(cl, proto):
    with cl.pipeline() as pipe:
        pipe.incr('foo')
        pipe.decr('bar')
        pipe.gauge('baz', 15)
    _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:15|g')


def test_pipeline_manager_udp():
    """StatsClient.pipeline can be used as manager."""
    cl = _udp_client()
    _test_pipeline_manager(cl, 'udp')


def test_pipeline_manager_tcp():
    """TCPStatsClient.pipeline can be used as manager."""
    cl = _tcp_client()
    _test_pipeline_manager(cl, 'tcp')


def _test_pipeline_timer_manager(cl, proto):
    with cl.pipeline() as pipe:
        with pipe.timer('foo'):
            pass
    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_pipeline_timer_manager_udp():
    """Timer manager can be retrieve from UDP Pipeline manager."""
    cl = _udp_client()
    _test_pipeline_timer_manager(cl, 'udp')


def test_pipeline_timer_manager_tcp():
    """Timer manager can be retrieve from TCP Pipeline manager."""
    cl = _tcp_client()
    _test_pipeline_timer_manager(cl, 'tcp')


def _test_pipeline_timer_decorator(cl, proto):
    with cl.pipeline() as pipe:
        @pipe.timer('foo')
        def foo():
            pass
        foo()
    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_pipeline_timer_decorator_udp():
    """UDP Pipeline manager can be used as decorator."""
    cl = _udp_client()
    _test_pipeline_timer_decorator(cl, 'udp')


def test_pipeline_timer_decorator_tcp():
    """TCP Pipeline manager can be used as decorator."""
    cl = _tcp_client()
    _test_pipeline_timer_decorator(cl, 'tcp')


def _test_pipeline_timer_object(cl, proto):
    with cl.pipeline() as pipe:
        t = pipe.timer('foo').start()
        t.stop()
        _sock_check(cl._sock, 0, proto)
    _timer_check(cl._sock, 1, proto, 'foo', 'ms')


def test_pipeline_timer_object_udp():
    """Timer from UDP Pipeline manager works."""
    cl = _udp_client()
    _test_pipeline_timer_object(cl, 'udp')


def test_pipeline_timer_object_tcp():
    """Timer from TCP Pipeline manager works."""
    cl = _tcp_client()
    _test_pipeline_timer_object(cl, 'tcp')


def _test_pipeline_empty(cl):
    with cl.pipeline() as pipe:
        pipe.incr('foo')
        eq_(1, len(pipe._stats))
    eq_(0, len(pipe._stats))


def test_pipeline_empty_udp():
    """Pipelines should be empty after a send() call (UDP)."""
    cl = _udp_client()
    _test_pipeline_empty(cl)


def test_pipeline_empty_tcp():
    """Pipelines should be empty after a send() call (TCP)."""
    cl = _tcp_client()
    _test_pipeline_empty(cl)


def _test_pipeline_negative_absolute_gauge(cl, proto):
    with cl.pipeline() as pipe:
        pipe.gauge('foo', -10, delta=False)
        pipe.incr('bar')
    _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-10|g\nbar:1|c')


def test_pipeline_negative_absolute_gauge_udp():
    """Negative absolute gauges use an internal pipeline (UDP)."""
    cl = _udp_client()
    _test_pipeline_negative_absolute_gauge(cl, 'udp')


def test_pipeline_negative_absolute_gauge_tcp():
    """Negative absolute gauges use an internal pipeline (TCP)."""
    cl = _tcp_client()
    _test_pipeline_negative_absolute_gauge(cl, 'tcp')


def _test_big_numbers(cl, proto):
    num = 1234568901234
    tests = (
        # Explicitly create strings so we avoid the bug we're trying to test.
        ('gauge', 'foo:1234568901234|g'),
        ('incr', 'foo:1234568901234|c'),
        ('timing', 'foo:1234568901234.000000|ms'),
    )

    def _check(method, result):
        cl._sock.reset_mock()
        getattr(cl, method)('foo', num)
        _sock_check(cl._sock, 1, proto, result)

    for method, result in tests:
        _check(method, result)


def test_big_numbers_udp():
    """Test big numbers with UDP client."""
    cl = _udp_client()
    _test_big_numbers(cl, 'udp')


def test_big_numbers_tcp():
    """Test big numbers with TCP client."""
    cl = _tcp_client()
    _test_big_numbers(cl, 'tcp')


def _test_rate_no_send(cl, proto):
    cl.incr('foo', rate=0.5)
    _sock_check(cl._sock, 0, proto)


@mock.patch.object(random, 'random', lambda: 2)
def test_rate_no_send_udp():
    """Rate below random value prevents sending with StatsClient.incr."""
    cl = _udp_client()
    _test_rate_no_send(cl, 'udp')


@mock.patch.object(random, 'random', lambda: 2)
def test_rate_no_send_tcp():
    """Rate below random value prevents sending with TCPStatsClient.incr."""
    cl = _tcp_client()
    _test_rate_no_send(cl, 'tcp')


def test_socket_error():
    """Socket error on StatsClient should be ignored."""
    cl = _udp_client()
    cl._sock.sendto.side_effect = socket.timeout()
    cl.incr('foo')
    _sock_check(cl._sock, 1, 'udp', 'foo:1|c')


def test_pipeline_packet_size():
    """Pipelines shouldn't send packets larger than 512 bytes (UDP only)."""
    sc = _udp_client()
    pipe = sc.pipeline()
    for x in range(32):
        # 32 * 16 = 512, so this will need 2 packets.
        pipe.incr('sixteen_char_str')
    pipe.send()
    eq_(2, sc._sock.sendto.call_count)
    assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512
    assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512


@mock.patch.object(socket, 'socket')
def test_tcp_raises_exception_to_user(mock_socket):
    """Socket errors in TCPStatsClient should be raised to user."""
    addr = ('127.0.0.1', 1234)
    cl = _tcp_client(addr=addr[0], port=addr[1])
    cl.incr('foo')
    eq_(1, cl._sock.sendall.call_count)
    cl._sock.sendall.side_effect = socket.error
    with assert_raises(socket.error):
        cl.incr('foo')


@mock.patch.object(socket, 'socket')
def test_tcp_timeout(mock_socket):
    """Timeout on TCPStatsClient should be set on socket."""
    test_timeout = 321
    cl = TCPStatsClient(timeout=test_timeout)
    cl.incr('foo')
    cl._sock.settimeout.assert_called_once_with(test_timeout)


@mock.patch.object(socket, 'socket')
def test_unix_socket_timeout(mock_socket):
    """Timeout on UnixSocketStatsClient should be set on socket."""
    test_timeout = 321
    cl = UnixSocketStatsClient(UNIX_SOCKET, timeout=test_timeout)
    cl.incr('foo')
    cl._sock.settimeout.assert_called_once_with(test_timeout)
