import collections
import datetime
import importlib.metadata
import typing
from unittest import mock

import pytest
from bottle import MultiDict as BotMultiDict
from django.utils.datastructures import MultiValueDict as DjMultiDict
from marshmallow import (
    EXCLUDE,
    INCLUDE,
    RAISE,
    Schema,
    missing,
    pre_load,
    validates_schema,
)
from packaging.version import Version
from werkzeug.datastructures import MultiDict as WerkMultiDict

from webargs import ValidationError, fields
from webargs.core import Parser, get_mimetype, is_json
from webargs.multidictproxy import MultiDictProxy

MARSHMALLOW_VERSION = Version(importlib.metadata.version("marshmallow"))


class MockHTTPError(Exception):
    def __init__(self, status_code, headers):
        self.status_code = status_code
        self.headers = headers
        super().__init__(self, "HTTP Error occurred")


class MockRequestParser(Parser):
    """A minimal parser implementation that parses mock requests."""

    def load_querystring(self, req, schema):
        return self._makeproxy(req.query, schema)

    def load_form(self, req, schema):
        return MultiDictProxy(req.form, schema)

    def load_json(self, req, schema):
        return req.json

    def load_cookies(self, req, schema):
        return req.cookies


@pytest.fixture(scope="function")
def web_request():
    req = mock.Mock()
    req.query = {}
    yield req
    req.query = {}


@pytest.fixture
def parser():
    return MockRequestParser()


# Parser tests


@mock.patch("webargs.core.Parser.load_json")
def test_load_json_called_by_parse_default(load_json, web_request):
    schema = Schema.from_dict({"foo": fields.Raw()})()
    load_json.return_value = {"foo": 1}
    p = Parser()
    p.parse(schema, web_request)
    load_json.assert_called_with(web_request, schema)


@pytest.mark.parametrize(
    "location", ["querystring", "form", "headers", "cookies", "files"]
)
def test_load_nondefault_called_by_parse_with_location(location, web_request):
    with (
        mock.patch(f"webargs.core.Parser.load_{location}") as mock_loadfunc,
        mock.patch("webargs.core.Parser.load_json") as load_json,
    ):
        mock_loadfunc.return_value = {}
        load_json.return_value = {}
        p = Parser()

        # ensure that without location=..., the loader is not called (json is
        # called)
        p.parse({"foo": fields.Raw()}, web_request)
        assert mock_loadfunc.call_count == 0
        assert load_json.call_count == 1

        # but when location=... is given, the loader *is* called and json is
        # not called
        p.parse({"foo": fields.Raw()}, web_request, location=location)
        assert mock_loadfunc.call_count == 1
        # it was already 1, should not go up
        assert load_json.call_count == 1


@pytest.mark.asyncio
@pytest.mark.parametrize("method", ["parse", "async_parse"])
async def test_parse(parser, web_request, method):
    web_request.json = {"username": 42, "password": 42}
    argmap = {"username": fields.Raw(), "password": fields.Raw()}
    if method == "async_parse":
        ret = await parser.async_parse(argmap, web_request)
    else:
        ret = parser.parse(argmap, web_request)
    assert {"username": 42, "password": 42} == ret


@pytest.mark.parametrize(
    "set_location",
    [
        "schema_instance",
        "parse_call",
        "parser_default",
        "parser_class_default",
    ],
)
def test_parse_with_unknown_behavior_specified(parser, web_request, set_location):
    web_request.json = {"username": 42, "password": 42, "fjords": 42}

    class CustomSchema(Schema):
        username = fields.Raw()
        password = fields.Raw()

    def parse_with_desired_behavior(value):
        if set_location == "schema_instance":
            if value is not None:
                # pass 'unknown=None' to parse() in order to indicate that the
                # schema setting should be respected
                return parser.parse(
                    CustomSchema(unknown=value), web_request, unknown=None
                )
            else:
                return parser.parse(CustomSchema(), web_request)
        elif set_location == "parse_call":
            return parser.parse(CustomSchema(), web_request, unknown=value)
        elif set_location == "parser_default":
            parser.unknown = value
            return parser.parse(CustomSchema(), web_request)
        elif set_location == "parser_class_default":

            class CustomParser(MockRequestParser):
                DEFAULT_UNKNOWN_BY_LOCATION = {"json": value}

            return CustomParser().parse(CustomSchema(), web_request)
        else:
            raise NotImplementedError

    # with no unknown setting or unknown=RAISE, it blows up
    with pytest.raises(ValidationError, match="Unknown field."):
        parse_with_desired_behavior(None)
    with pytest.raises(ValidationError, match="Unknown field."):
        parse_with_desired_behavior(RAISE)

    # with unknown=EXCLUDE the data is omitted
    ret = parse_with_desired_behavior(EXCLUDE)
    assert {"username": 42, "password": 42} == ret
    # with unknown=INCLUDE it is added even though it isn't part of the schema
    ret = parse_with_desired_behavior(INCLUDE)
    assert {"username": 42, "password": 42, "fjords": 42} == ret


def test_parse_with_explicit_unknown_overrides_schema(parser, web_request):
    web_request.json = {"username": 42, "password": 42, "fjords": 42}

    class CustomSchema(Schema):
        username = fields.Raw()
        password = fields.Raw()

    # setting RAISE in the parse call overrides schema setting
    with pytest.raises(ValidationError, match="Unknown field."):
        parser.parse(CustomSchema(unknown=EXCLUDE), web_request, unknown=RAISE)
    with pytest.raises(ValidationError, match="Unknown field."):
        parser.parse(CustomSchema(unknown=INCLUDE), web_request, unknown=RAISE)

    # and the reverse -- setting EXCLUDE or INCLUDE in the parse call overrides
    # a schema with RAISE already set
    ret = parser.parse(CustomSchema(unknown=RAISE), web_request, unknown=EXCLUDE)
    assert {"username": 42, "password": 42} == ret
    ret = parser.parse(CustomSchema(unknown=RAISE), web_request, unknown=INCLUDE)
    assert {"username": 42, "password": 42, "fjords": 42} == ret


@pytest.mark.parametrize("clear_method", ["custom_class", "instance_setting", "both"])
def test_parse_with_default_unknown_cleared_uses_schema_value(
    parser, web_request, clear_method
):
    web_request.json = {"username": 42, "password": 42, "fjords": 42}

    class CustomSchema(Schema):
        username = fields.Raw()
        password = fields.Raw()

    if clear_method == "custom_class":

        class CustomParser(MockRequestParser):
            DEFAULT_UNKNOWN_BY_LOCATION = {}

        parser = CustomParser()
    elif clear_method == "instance_setting":
        parser = MockRequestParser(unknown=None)
    elif clear_method == "both":
        # setting things in multiple ways should not result in errors
        class CustomParser(MockRequestParser):
            DEFAULT_UNKNOWN_BY_LOCATION = {}

        parser = CustomParser(unknown=None)
    else:
        raise NotImplementedError

    with pytest.raises(ValidationError, match="Unknown field."):
        parser.parse(CustomSchema(), web_request)
    with pytest.raises(ValidationError, match="Unknown field."):
        parser.parse(CustomSchema(unknown=RAISE), web_request)

    ret = parser.parse(CustomSchema(unknown=EXCLUDE), web_request)
    assert {"username": 42, "password": 42} == ret
    ret = parser.parse(CustomSchema(unknown=INCLUDE), web_request)
    assert {"username": 42, "password": 42, "fjords": 42} == ret


@pytest.mark.asyncio
@pytest.mark.parametrize("method", ["parse", "async_parse"])
async def test_parse_required_arg_raises_validation_error(parser, web_request, method):
    web_request.json = {}
    args = {"foo": fields.Raw(required=True)}
    with pytest.raises(ValidationError, match="Missing data for required field."):
        if method == "parse":
            parser.parse(args, web_request)
        else:
            await parser.async_parse(args, web_request)


def test_arg_not_required_excluded_in_parsed_output(parser, web_request):
    web_request.json = {"first": "Steve"}
    args = {"first": fields.Str(), "last": fields.Str()}
    result = parser.parse(args, web_request)
    assert result == {"first": "Steve"}


def test_arg_allow_none(parser, web_request):
    web_request.json = {"first": "Steve", "last": None}
    args = {"first": fields.Str(), "last": fields.Str(allow_none=True)}
    result = parser.parse(args, web_request)
    assert result == {"first": "Steve", "last": None}


def test_parse_required_arg(parser, web_request):
    web_request.json = {"foo": 42}
    result = parser.parse({"foo": fields.Raw(required=True)}, web_request)
    assert result == {"foo": 42}


def test_parse_required_list(parser, web_request):
    web_request.json = {"bar": []}
    args = {"foo": fields.List(fields.Raw(), required=True)}
    with pytest.raises(ValidationError) as excinfo:
        parser.parse(args, web_request)
    assert (
        excinfo.value.messages["json"]["foo"][0] == "Missing data for required field."
    )


# Regression test for https://github.com/marshmallow-code/webargs/issues/107
def test_parse_list_allow_none(parser, web_request):
    web_request.json = {"foo": None}
    args = {"foo": fields.List(fields.Raw(allow_none=True), allow_none=True)}
    assert parser.parse(args, web_request) == {"foo": None}


def test_parse_list_dont_allow_none(parser, web_request):
    web_request.json = {"foo": None}
    args = {"foo": fields.List(fields.Raw(), allow_none=False)}
    with pytest.raises(ValidationError) as excinfo:
        parser.parse(args, web_request)
    assert excinfo.value.messages["json"]["foo"][0] == "Field may not be null."


def test_parse_empty_list(parser, web_request):
    web_request.json = {"things": []}
    args = {"things": fields.List(fields.Raw())}
    assert parser.parse(args, web_request) == {"things": []}


def test_parse_missing_list(parser, web_request):
    web_request.json = {}
    args = {"things": fields.List(fields.Raw())}
    assert parser.parse(args, web_request) == {}


def test_default_location():
    assert Parser.DEFAULT_LOCATION == "json"


def test_missing_with_default(parser, web_request):
    web_request.json = {}
    args = {"val": fields.Raw(load_default="pizza")}
    result = parser.parse(args, web_request)
    assert result["val"] == "pizza"


def test_default_can_be_none(parser, web_request):
    web_request.json = {}
    args = {"val": fields.Raw(load_default=None, allow_none=True)}
    result = parser.parse(args, web_request)
    assert result["val"] is None


# Regression test for issue #11
def test_arg_with_default_and_location(parser, web_request):
    web_request.json = {}
    args = {
        "p": fields.Int(
            load_default=1,
            validate=lambda p: p > 0,
            metadata={
                "error": "La page demandée n'existe pas",
                "location": "querystring",
            },
        )
    }
    assert parser.parse(args, web_request) == {"p": 1}


def test_value_error_raised_if_parse_called_with_invalid_location(parser, web_request):
    field = fields.Raw()
    with pytest.raises(ValueError, match="Invalid location argument: invalidlocation"):
        parser.parse({"foo": field}, web_request, location="invalidlocation")


@mock.patch("webargs.core.Parser.handle_error")
def test_handle_error_called_when_parsing_raises_error(handle_error, web_request):
    # handle_error must raise an error to be valid
    handle_error.side_effect = ValidationError("parsing failed")

    def always_fail(*args, **kwargs):
        raise ValidationError("error occurred")

    p = Parser()
    assert handle_error.call_count == 0
    with pytest.raises(ValidationError):
        p.parse({"foo": fields.Raw()}, web_request, validate=always_fail)
    assert handle_error.call_count == 1
    with pytest.raises(ValidationError):
        p.parse({"foo": fields.Raw()}, web_request, validate=always_fail)
    assert handle_error.call_count == 2


@pytest.mark.asyncio
async def test_handle_error_called_when_async_parsing_raises_error(web_request):
    with mock.patch("webargs.core.Parser.handle_error") as handle_error:
        # handle_error must raise an error to be valid
        handle_error.side_effect = ValidationError("parsing failed")

        def always_fail(*args, **kwargs):
            raise ValidationError("error occurred")

        p = Parser()
        assert handle_error.call_count == 0
        with pytest.raises(ValidationError):
            await p.async_parse(
                {"foo": fields.Raw()}, web_request, validate=always_fail
            )
        assert handle_error.call_count == 1
        with pytest.raises(ValidationError):
            await p.async_parse(
                {"foo": fields.Raw()}, web_request, validate=always_fail
            )
        assert handle_error.call_count == 2


def test_handle_error_reraises_errors(web_request):
    p = Parser()
    with pytest.raises(ValidationError):
        p.handle_error(
            ValidationError("error raised"),
            web_request,
            Schema(),
            error_status_code=422,
            error_headers={},
        )


@mock.patch("webargs.core.Parser.load_headers")
def test_location_as_init_argument(load_headers, web_request):
    p = Parser(location="headers")
    load_headers.return_value = {}
    p.parse({"foo": fields.Raw()}, web_request)
    assert load_headers.called


def test_custom_error_handler(web_request):
    class CustomError(Exception):
        pass

    def error_handler(error, req, schema, *, error_status_code, error_headers):
        assert isinstance(schema, Schema)
        raise CustomError(error)

    def failing_validate_func(args):
        raise ValidationError("parsing failed")

    class MySchema(Schema):
        foo = fields.Int()

    myschema = MySchema()
    web_request.json = {"foo": "hello world"}

    p = Parser(error_handler=error_handler)
    with pytest.raises(CustomError):
        p.parse(myschema, web_request, validate=failing_validate_func)


def test_custom_error_handler_decorator(web_request):
    class CustomError(Exception):
        pass

    mock_schema = mock.Mock(spec=Schema)
    mock_schema.strict = True
    mock_schema.load.side_effect = ValidationError("parsing json failed")
    parser = Parser()

    @parser.error_handler
    def handle_error(error, req, schema, *, error_status_code, error_headers):
        assert isinstance(schema, Schema)
        raise CustomError(error)

    with pytest.raises(CustomError):
        parser.parse(mock_schema, web_request)


@pytest.mark.asyncio
@pytest.mark.parametrize("async_handler", [True, False])
async def test_custom_error_handler_decorator_in_async_parse(
    web_request, async_handler
):
    class CustomError(Exception):
        pass

    mock_schema = mock.Mock(spec=Schema)
    mock_schema.strict = True
    mock_schema.load.side_effect = ValidationError("parsing json failed")
    parser = Parser()

    if async_handler:

        @parser.error_handler
        async def handle_error(error, req, schema, *, error_status_code, error_headers):
            assert isinstance(schema, Schema)
            raise CustomError(error)

    else:

        @parser.error_handler
        def handle_error(error, req, schema, *, error_status_code, error_headers):
            assert isinstance(schema, Schema)
            raise CustomError(error)

    with pytest.raises(CustomError):
        await parser.async_parse(mock_schema, web_request)


def test_custom_error_handler_must_reraise(web_request):
    class CustomError(Exception):
        pass

    mock_schema = mock.Mock(spec=Schema)
    mock_schema.strict = True
    mock_schema.load.side_effect = ValidationError("parsing json failed")
    parser = Parser()

    @parser.error_handler
    def handle_error(error, req, schema, *, error_status_code, error_headers):
        pass

    # because the handler above does not raise a new error, the parser should
    # raise a ValueError -- indicating a programming error
    with pytest.raises(ValueError):
        parser.parse(mock_schema, web_request)


def test_custom_location_loader(web_request):
    web_request.data = {"foo": 42}

    parser = Parser()

    @parser.location_loader("data")
    def load_data(req, schema):
        return req.data

    result = parser.parse({"foo": fields.Int()}, web_request, location="data")
    assert result["foo"] == 42


def test_custom_location_loader_with_data_key(web_request):
    web_request.data = {"X-Foo": 42}
    parser = Parser()

    @parser.location_loader("data")
    def load_data(req, schema):
        return req.data

    result = parser.parse(
        {"x_foo": fields.Int(data_key="X-Foo")}, web_request, location="data"
    )
    assert result["x_foo"] == 42


def test_full_input_validation(parser, web_request):
    web_request.json = {"foo": 41, "bar": 42}

    def validator(args):
        if args["foo"] <= args["bar"]:
            raise ValidationError("foo must be > bar")

    args = {"foo": fields.Int(), "bar": fields.Int()}
    with pytest.raises(ValidationError):
        # Test that `validate` receives dictionary of args
        parser.parse(args, web_request, validate=validator)


def test_full_input_validation_with_multiple_validators(web_request, parser):
    def validate1(args):
        if args["a"] > args["b"]:
            raise ValidationError("b must be > a")

    def validate2(args):
        if args["b"] > args["a"]:
            raise ValidationError("a must be > b")

    args = {"a": fields.Int(), "b": fields.Int()}
    web_request.json = {"a": 2, "b": 1}
    validators = [validate1, validate2]
    with pytest.raises(ValidationError, match="b must be > a"):
        parser.parse(args, web_request, validate=validators)

    web_request.json = {"a": 1, "b": 2}
    with pytest.raises(ValidationError, match="a must be > b"):
        parser.parse(args, web_request, validate=validators)


def test_required_with_custom_error(parser, web_request):
    web_request.json = {}
    args = {
        "foo": fields.Str(required=True, error_messages={"required": "We need foo"})
    }
    with pytest.raises(ValidationError) as excinfo:
        # Test that `validate` receives dictionary of args
        parser.parse(args, web_request)

    assert "We need foo" in excinfo.value.messages["json"]["foo"]


@pytest.mark.filterwarnings("ignore:Returning `False` from a validator is deprecated")
@pytest.mark.skipif(
    MARSHMALLOW_VERSION.major >= 4,
    reason="marshmallow 4+ does not support validators returning False",
)
def test_required_with_custom_error_and_validation_error(parser, web_request):
    web_request.json = {"foo": ""}
    args = {
        "foo": fields.Str(
            required="We need foo",
            validate=lambda s: len(s) > 1,
            error_messages={"validator_failed": "foo required length is 3"},
        )
    }
    with pytest.raises(ValidationError) as excinfo:
        # Test that `validate` receives dictionary of args
        parser.parse(args, web_request)

    assert "foo required length is 3" in excinfo.value.args[0]["foo"]


def test_full_input_validator_receives_nonascii_input(web_request):
    def validate(val):
        return False

    text = "øœ∑∆∑"
    web_request.json = {"text": text}
    parser = MockRequestParser()
    args = {"text": fields.Str()}
    with pytest.raises(ValidationError) as excinfo:
        parser.parse(args, web_request, validate=validate)
    assert excinfo.value.messages == {"json": ["Invalid value."]}


def test_invalid_argument_for_validate(web_request, parser):
    with pytest.raises(ValueError) as excinfo:
        parser.parse({}, web_request, validate="notcallable")
    assert "not a callable or list of callables." in excinfo.value.args[0]


def create_bottle_multi_dict():
    d = BotMultiDict()
    d["foos"] = "a"
    d["foos"] = "b"
    return d


multidicts = [
    WerkMultiDict([("foos", "a"), ("foos", "b")]),
    create_bottle_multi_dict(),
    DjMultiDict({"foos": ["a", "b"]}),
]


@pytest.mark.parametrize("input_dict", multidicts)
def test_multidict_proxy(input_dict):
    class ListSchema(Schema):
        foos = fields.List(fields.Str())

    class StrSchema(Schema):
        foos = fields.Str()

    # this MultiDictProxy is aware that "foos" is a list field and will
    # therefore produce a list with __getitem__
    list_wrapped_multidict = MultiDictProxy(input_dict, ListSchema())

    # this MultiDictProxy is under the impression that "foos" is just a string
    # and it should return "a" or "b"
    # the decision between "a" and "b" in this case belongs to the framework
    str_wrapped_multidict = MultiDictProxy(input_dict, StrSchema())

    assert list_wrapped_multidict["foos"] == ["a", "b"]
    assert str_wrapped_multidict["foos"] in ("a", "b")


def test_parse_with_data_key(web_request):
    web_request.json = {"Content-Type": "application/json"}

    parser = MockRequestParser()
    args = {"content_type": fields.Raw(data_key="Content-Type")}
    parsed = parser.parse(args, web_request)
    assert parsed == {"content_type": "application/json"}


def test_parse_with_data_key_retains_field_name_in_error(web_request):
    web_request.json = {"Content-Type": 12345}

    parser = MockRequestParser()
    args = {"content_type": fields.Str(data_key="Content-Type")}
    with pytest.raises(ValidationError) as excinfo:
        parser.parse(args, web_request)
    assert "json" in excinfo.value.messages
    assert "Content-Type" in excinfo.value.messages["json"]
    assert excinfo.value.messages["json"]["Content-Type"] == ["Not a valid string."]


def test_parse_nested_with_data_key(web_request):
    parser = MockRequestParser()
    web_request.json = {"nested_arg": {"wrong": "OK"}}
    args = {"nested_arg": fields.Nested({"right": fields.Raw(data_key="wrong")})}

    parsed = parser.parse(args, web_request)
    assert parsed == {"nested_arg": {"right": "OK"}}


def test_parse_nested_with_missing_key_and_data_key(web_request):
    parser = MockRequestParser()

    web_request.json = {"nested_arg": {}}
    args = {
        "nested_arg": fields.Nested(
            {"found": fields.Raw(load_default=None, allow_none=True, data_key="miss")}
        )
    }

    parsed = parser.parse(args, web_request)
    assert parsed == {"nested_arg": {"found": None}}


def test_parse_nested_with_default(web_request):
    parser = MockRequestParser()

    web_request.json = {"nested_arg": {}}
    args = {"nested_arg": fields.Nested({"miss": fields.Raw(load_default="<foo>")})}

    parsed = parser.parse(args, web_request)
    assert parsed == {"nested_arg": {"miss": "<foo>"}}


def test_nested_many(web_request, parser):
    web_request.json = {"pets": [{"name": "Pips"}, {"name": "Zula"}]}
    args = {"pets": fields.Nested({"name": fields.Str()}, required=True, many=True)}
    parsed = parser.parse(args, web_request)
    assert parsed == {"pets": [{"name": "Pips"}, {"name": "Zula"}]}
    web_request.json = {}
    with pytest.raises(ValidationError):
        parser.parse(args, web_request)


def test_use_args(web_request, parser):
    user_args = {"username": fields.Str(), "password": fields.Str()}
    web_request.json = {"username": "foo", "password": "bar"}

    @parser.use_args(user_args, web_request)
    def viewfunc(args):
        return args

    assert viewfunc() == {"username": "foo", "password": "bar"}


@pytest.mark.asyncio
async def test_use_args_on_async(web_request, parser):
    user_args = {"username": fields.Str(), "password": fields.Str()}
    web_request.json = {"username": "foo", "password": "bar"}

    @parser.use_args(user_args, web_request)
    async def viewfunc(args):
        return args

    data = await viewfunc()
    assert data == {"username": "foo", "password": "bar"}


def test_use_args_stacked(web_request, parser):
    query_args = {"page": fields.Int()}
    json_args = {"username": fields.Str()}
    web_request.json = {"username": "foo"}
    web_request.query = {"page": 42}

    @parser.use_args(query_args, web_request, location="query")
    @parser.use_args(json_args, web_request)
    def viewfunc(query_parsed, json_parsed):
        return {"json": json_parsed, "query": query_parsed}

    assert viewfunc() == {"json": {"username": "foo"}, "query": {"page": 42}}


def test_use_args_forbids_invalid_usages(parser):
    with pytest.raises(
        ValueError, match="arg_name and as_kwargs are mutually exclusive"
    ):
        parser.use_args({}, web_request, arg_name="foo", as_kwargs=True)


def test_use_kwargs_stacked(web_request, parser):
    query_args = {
        "page": fields.Int(error_messages={"invalid": "{input} not a valid integer"})
    }
    json_args = {"username": fields.Str()}
    web_request.json = {"username": "foo"}
    web_request.query = {"page": 42}

    @parser.use_kwargs(query_args, web_request, location="query")
    @parser.use_kwargs(json_args, web_request)
    def viewfunc(page, username):
        return {"json": {"username": username}, "query": {"page": page}}

    assert viewfunc() == {"json": {"username": "foo"}, "query": {"page": 42}}


@pytest.mark.parametrize("decorator_name", ["use_args", "use_kwargs"])
def test_decorators_dont_change_docstring(parser, decorator_name):
    decorator = getattr(parser, decorator_name)

    @decorator({"val": fields.Int()})
    def viewfunc(*args, **kwargs):
        """View docstring"""
        pass

    assert viewfunc.__doc__ == "View docstring"


def test_list_allowed_missing(web_request, parser):
    args = {"name": fields.List(fields.Str())}
    web_request.json = {}
    result = parser.parse(args, web_request)
    assert result == {}


def test_int_list_allowed_missing(web_request, parser):
    args = {"name": fields.List(fields.Int())}
    web_request.json = {}
    result = parser.parse(args, web_request)
    assert result == {}


def test_multiple_arg_required_with_int_conversion(web_request, parser):
    args = {"ids": fields.List(fields.Int(), required=True)}
    web_request.json = {}
    with pytest.raises(ValidationError) as excinfo:
        parser.parse(args, web_request)
    assert excinfo.value.messages == {
        "json": {"ids": ["Missing data for required field."]}
    }


def test_parse_with_callable(web_request, parser):
    web_request.json = {"foo": 42}

    class MySchema(Schema):
        foo = fields.Raw()

    def make_schema(req):
        assert req is web_request
        return MySchema()

    result = parser.parse(make_schema, web_request)

    assert result == {"foo": 42}


def test_use_args_callable(web_request, parser):
    class HelloSchema(Schema):
        name = fields.Str()

    web_request.json = {"name": "foo"}

    def make_schema(req):
        assert req is web_request
        return HelloSchema()

    @parser.use_args(make_schema, web_request)
    def viewfunc(args):
        return args

    assert viewfunc() == {"name": "foo"}


class TestPassingSchema:
    class UserSchema(Schema):
        id = fields.Int(dump_only=True)
        email = fields.Email()
        password = fields.Str(load_only=True)

    def test_passing_schema_to_parse(self, parser, web_request):
        web_request.json = {"email": "foo@bar.com", "password": "bar"}

        result = parser.parse(self.UserSchema(), web_request)

        assert result == {"email": "foo@bar.com", "password": "bar"}

    def test_use_args_can_be_passed_a_schema(self, web_request, parser):
        web_request.json = {"email": "foo@bar.com", "password": "bar"}

        @parser.use_args(self.UserSchema(), web_request)
        def viewfunc(args):
            return args

        assert viewfunc() == {"email": "foo@bar.com", "password": "bar"}

    def test_passing_schema_factory_to_parse(self, parser, web_request):
        web_request.json = {"email": "foo@bar.com", "password": "bar"}

        def factory(req):
            assert req is web_request
            return self.UserSchema()

        result = parser.parse(factory, web_request)

        assert result == {"email": "foo@bar.com", "password": "bar"}

    def test_use_args_can_be_passed_a_schema_factory(self, web_request, parser):
        web_request.json = {"email": "foo@bar.com", "password": "bar"}

        def factory(req):
            assert req is web_request
            return self.UserSchema()

        @parser.use_args(factory, web_request)
        def viewfunc(args):
            return args

        assert viewfunc() == {"email": "foo@bar.com", "password": "bar"}

    def test_use_kwargs_can_be_passed_a_schema(self, web_request, parser):
        web_request.json = {"email": "foo@bar.com", "password": "bar"}

        @parser.use_kwargs(self.UserSchema(), web_request)
        def viewfunc(email, password):
            return {"email": email, "password": password}

        assert viewfunc() == {"email": "foo@bar.com", "password": "bar"}

    def test_use_kwargs_can_be_passed_a_schema_factory(self, web_request, parser):
        web_request.json = {"email": "foo@bar.com", "password": "bar"}

        def factory(req):
            assert req is web_request
            return self.UserSchema()

        @parser.use_kwargs(factory, web_request)
        def viewfunc(email, password):
            return {"email": email, "password": password}

        assert viewfunc() == {"email": "foo@bar.com", "password": "bar"}

    def test_use_kwargs_stacked(self, web_request, parser):
        web_request.json = {"email": "foo@bar.com", "password": "bar", "page": 42}

        @parser.use_kwargs({"page": fields.Int()}, web_request, unknown=EXCLUDE)
        @parser.use_kwargs(self.UserSchema(), web_request, unknown=EXCLUDE)
        def viewfunc(email, password, page):
            return {"email": email, "password": password, "page": page}

        assert viewfunc() == {"email": "foo@bar.com", "password": "bar", "page": 42}

    # Regression test for https://github.com/marshmallow-code/webargs/issues/146
    def test_parse_does_not_add_missing_values_to_schema_validator(
        self, web_request, parser
    ):
        class UserSchema(Schema):
            name = fields.Str()
            location = fields.Raw(required=False)

            @validates_schema(pass_original=True)
            def validate_schema(self, data, original_data, **kwargs):
                assert "location" not in original_data
                return True

        web_request.json = {"name": "Eric Cartman"}
        res = parser.parse(UserSchema, web_request)
        assert res == {"name": "Eric Cartman"}


def test_use_args_with_custom_location_in_parser(web_request, parser):
    custom_args = {"foo": fields.Str()}
    web_request.json = {}
    parser.location = "custom"

    @parser.location_loader("custom")
    def load_custom(schema, req):
        return {"foo": "bar"}

    @parser.use_args(custom_args, web_request)
    def viewfunc(args):
        return args

    assert viewfunc() == {"foo": "bar"}


def test_use_kwargs(web_request, parser):
    user_args = {"username": fields.Str(), "password": fields.Str()}
    web_request.json = {"username": "foo", "password": "bar"}

    @parser.use_kwargs(user_args, web_request)
    def viewfunc(username, password):
        return {"username": username, "password": password}

    assert viewfunc() == {"username": "foo", "password": "bar"}


def test_use_kwargs_with_arg_missing(web_request, parser):
    user_args = {"username": fields.Str(required=True), "password": fields.Str()}
    web_request.json = {"username": "foo"}

    @parser.use_kwargs(user_args, web_request)
    def viewfunc(username, **kwargs):
        assert "password" not in kwargs
        return {"username": username}

    assert viewfunc() == {"username": "foo"}


def test_delimited_list_empty_string(web_request, parser):
    web_request.json = {"dates": ""}
    schema_cls = Schema.from_dict({"dates": fields.DelimitedList(fields.Str())})
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["dates"] == []

    data = schema.dump(parsed)
    assert data["dates"] == ""


def test_delimited_list_default_delimiter(web_request, parser):
    web_request.json = {"ids": "1,2,3"}
    schema_cls = Schema.from_dict({"ids": fields.DelimitedList(fields.Int())})
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == [1, 2, 3]

    data = schema.dump(parsed)
    assert data["ids"] == "1,2,3"


def test_delimited_tuple_default_delimiter(web_request, parser):
    """
    Test load and dump from DelimitedTuple, including the use of a datetime
    type (similar to a DelimitedList test below) which confirms that we aren't
    relying on __str__, but are properly de/serializing the included fields
    """
    web_request.json = {"ids": "1,2,2020-05-04"}
    schema_cls = Schema.from_dict(
        {
            "ids": fields.DelimitedTuple(
                (fields.Int, fields.Int, fields.DateTime(format="%Y-%m-%d"))
            )
        }
    )
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == (1, 2, datetime.datetime(2020, 5, 4))

    data = schema.dump(parsed)
    assert data["ids"] == "1,2,2020-05-04"


def test_delimited_tuple_incorrect_arity(web_request, parser):
    web_request.json = {"ids": "1,2"}
    schema_cls = Schema.from_dict(
        {"ids": fields.DelimitedTuple((fields.Int, fields.Int, fields.Int))}
    )
    schema = schema_cls()

    with pytest.raises(ValidationError):
        parser.parse(schema, web_request)


def test_delimited_list_with_datetime(web_request, parser):
    """
    Test that DelimitedList(DateTime(format=...)) correctly parses and dumps
    dates to and from strings -- indicates that we're doing proper
    serialization of values in dump() and not just relying on __str__ producing
    correct results
    """
    web_request.json = {"dates": "2018-11-01,2018-11-02"}
    schema_cls = Schema.from_dict(
        {"dates": fields.DelimitedList(fields.DateTime(format="%Y-%m-%d"))}
    )
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["dates"] == [
        datetime.datetime(2018, 11, 1),
        datetime.datetime(2018, 11, 2),
    ]

    data = schema.dump(parsed)
    assert data["dates"] == "2018-11-01,2018-11-02"


def test_delimited_list_custom_delimiter(web_request, parser):
    web_request.json = {"ids": "1|2|3"}
    schema_cls = Schema.from_dict(
        {"ids": fields.DelimitedList(fields.Int(), delimiter="|")}
    )
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == [1, 2, 3]

    data = schema.dump(parsed)
    assert data["ids"] == "1|2|3"


def test_delimited_tuple_custom_delimiter(web_request, parser):
    web_request.json = {"ids": "1|2"}
    schema_cls = Schema.from_dict(
        {"ids": fields.DelimitedTuple((fields.Int, fields.Int), delimiter="|")}
    )
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == (1, 2)

    data = schema.dump(parsed)
    assert data["ids"] == "1|2"


def test_delimited_list_load_list_errors(web_request, parser):
    web_request.json = {"ids": [1, 2, 3]}
    schema_cls = Schema.from_dict({"ids": fields.DelimitedList(fields.Int())})
    schema = schema_cls()

    with pytest.raises(ValidationError) as excinfo:
        parser.parse(schema, web_request)
    exc = excinfo.value
    assert isinstance(exc, ValidationError)
    errors = exc.args[0]
    assert errors["ids"] == ["Not a valid delimited list."]


def test_delimited_tuple_load_list_errors(web_request, parser):
    web_request.json = {"ids": [1, 2]}
    schema_cls = Schema.from_dict(
        {"ids": fields.DelimitedTuple((fields.Int, fields.Int))}
    )
    schema = schema_cls()

    with pytest.raises(ValidationError) as excinfo:
        parser.parse(schema, web_request)
    exc = excinfo.value
    assert isinstance(exc, ValidationError)
    errors = exc.args[0]
    assert errors["ids"] == ["Not a valid delimited tuple."]


# Regresion test for https://github.com/marshmallow-code/webargs/issues/149
def test_delimited_list_passed_invalid_type(web_request, parser):
    web_request.json = {"ids": 1}
    schema_cls = Schema.from_dict({"ids": fields.DelimitedList(fields.Int())})
    schema = schema_cls()

    with pytest.raises(ValidationError) as excinfo:
        parser.parse(schema, web_request)
    assert excinfo.value.messages == {"json": {"ids": ["Not a valid delimited list."]}}


def test_delimited_tuple_passed_invalid_type(web_request, parser):
    web_request.json = {"ids": 1}
    schema_cls = Schema.from_dict({"ids": fields.DelimitedTuple((fields.Int,))})
    schema = schema_cls()

    with pytest.raises(ValidationError) as excinfo:
        parser.parse(schema, web_request)
    assert excinfo.value.messages == {"json": {"ids": ["Not a valid delimited tuple."]}}


def test_delimited_list_custom_empty_value(web_request, parser):
    class ZeroList(fields.DelimitedList):
        empty_value = 0

    web_request.json = {"ids": "1,,3"}
    schema_cls = Schema.from_dict({"ids": ZeroList(fields.Int())})
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == [1, 0, 3]


def test_delimited_tuple_custom_empty_value(web_request, parser):
    class ZeroTuple(fields.DelimitedTuple):
        empty_value = 0

    web_request.json = {"ids": "1,,3"}
    schema_cls = Schema.from_dict(
        {"ids": ZeroTuple((fields.Int, fields.Int, fields.Int))}
    )
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == (1, 0, 3)


def test_delimited_list_using_missing_for_empty(web_request, parser):
    # this is "future" because we plan to make this the default for webargs v9.0
    class FutureList(fields.DelimitedList):
        empty_value = missing

    web_request.json = {"ids": "foo,,bar"}
    schema_cls = Schema.from_dict(
        {"ids": FutureList(fields.String(load_default="nil"))}
    )
    schema = schema_cls()

    parsed = parser.parse(schema, web_request)
    assert parsed["ids"] == ["foo", "nil", "bar"]


def test_missing_list_argument_not_in_parsed_result(web_request, parser):
    # arg missing in request
    web_request.json = {}
    args = {"ids": fields.List(fields.Int())}

    result = parser.parse(args, web_request)
    assert "ids" not in result


def test_type_conversion_with_multiple_required(web_request, parser):
    web_request.json = {}
    args = {"ids": fields.List(fields.Int(), required=True)}
    msg = "Missing data for required field."
    with pytest.raises(ValidationError, match=msg):
        parser.parse(args, web_request)


@pytest.mark.parametrize("input_dict", multidicts)
@pytest.mark.parametrize(
    "setting",
    [
        "is_multiple_true",
        "is_multiple_false",
        "is_multiple_notset",
        "list_field",
        "tuple_field",
        "added_to_known",
    ],
)
def test_is_multiple_detection(web_request, parser, input_dict, setting):
    # this custom class "multiplexes" in that it can be given a single value or
    # list of values -- a single value is treated as a string, and a list of
    # values is treated as a list of strings
    class CustomMultiplexingField(fields.String):
        def _deserialize(self, value, attr, data, **kwargs):
            if isinstance(value, str):
                return super()._deserialize(value, attr, data, **kwargs)
            return [
                self._deserialize(v, attr, data, **kwargs)
                for v in value
                if isinstance(v, str)
            ]

        def _serialize(self, value, attr, **kwargs):
            if isinstance(value, str):
                return super()._serialize(value, attr, **kwargs)
            return [
                self._serialize(v, attr, **kwargs) for v in value if isinstance(v, str)
            ]

    class CustomMultipleField(CustomMultiplexingField):
        is_multiple = True

    class CustomNonMultipleField(CustomMultiplexingField):
        is_multiple = False

    # the request's query params are the input multidict
    web_request.query = input_dict

    # case 1: is_multiple=True
    if setting == "is_multiple_true":
        # the multidict should unpack to a list of strings
        #
        # order is not necessarily guaranteed by the multidict implementations, but
        # both values must be present
        args = {"foos": CustomMultipleField()}
        result = parser.parse(args, web_request, location="query")
        assert result["foos"] in (["a", "b"], ["b", "a"])
    # case 2: is_multiple=False
    elif setting == "is_multiple_false":
        # the multidict should unpack to a string
        #
        # either value may be returned, depending on the multidict implementation,
        # but not both
        args = {"foos": CustomNonMultipleField()}
        result = parser.parse(args, web_request, location="query")
        assert result["foos"] in ("a", "b")
    # case 3: is_multiple is not set
    elif setting == "is_multiple_notset":
        # this should be the same as is_multiple=False
        args = {"foos": CustomMultiplexingField()}
        result = parser.parse(args, web_request, location="query")
        assert result["foos"] in ("a", "b")
    # case 4: the field is a List (special case)
    elif setting == "list_field":
        # this should behave like the is_multiple=True case
        args = {"foos": fields.List(fields.Str())}
        result = parser.parse(args, web_request, location="query")
        assert result["foos"] in (["a", "b"], ["b", "a"])
    # case 5: the field is a Tuple (special case)
    elif setting == "tuple_field":
        # this should behave like the is_multiple=True case and produce a tuple
        args = {"foos": fields.Tuple((fields.Str, fields.Str))}
        result = parser.parse(args, web_request, location="query")
        assert result["foos"] in (("a", "b"), ("b", "a"))
    # case 6: the field is custom, but added to the known fields of the parser
    elif setting == "added_to_known":
        # if it's included in the known multifields and is_multiple is not set, behave
        # like is_multiple=True
        parser.KNOWN_MULTI_FIELDS.append(CustomMultiplexingField)
        args = {"foos": CustomMultiplexingField()}
        result = parser.parse(args, web_request, location="query")
        assert result["foos"] in (["a", "b"], ["b", "a"])
    else:
        raise NotImplementedError


def test_validation_errors_in_validator_are_passed_to_handle_error(parser, web_request):
    def validate(value):
        raise ValidationError("Something went wrong.")

    args = {"name": fields.Raw(validate=validate, metadata={"location": "json"})}
    web_request.json = {"name": "invalid"}
    with pytest.raises(ValidationError) as excinfo:
        parser.parse(args, web_request)
    exc = excinfo.value
    assert isinstance(exc, ValidationError)
    errors = exc.args[0]
    assert errors["name"] == ["Something went wrong."]


def test_parse_basic(web_request, parser):
    web_request.json = {"foo": "42"}
    args = {"foo": fields.Int()}
    result = parser.parse(args, web_request)
    assert result == {"foo": 42}


def test_parse_raises_validation_error_if_data_invalid(web_request, parser):
    args = {"email": fields.Email()}
    web_request.json = {"email": "invalid"}
    with pytest.raises(ValidationError):
        parser.parse(args, web_request)


def test_nested_field_from_dict():
    # webargs.fields.Nested implements dict handling
    argmap = {"nest": fields.Nested({"foo": fields.Raw()})}
    schema_cls = Schema.from_dict(argmap)
    assert issubclass(schema_cls, Schema)
    schema = schema_cls()
    assert "nest" in schema.fields
    assert type(schema.fields["nest"]) is fields.Nested
    assert "foo" in schema.fields["nest"].schema.fields


def test_is_json():
    assert is_json(None) is False
    assert is_json("application/json") is True
    assert is_json("application/xml") is False
    assert is_json("application/vnd.api+json") is True


def test_get_mimetype():
    assert get_mimetype("application/json") == "application/json"
    assert get_mimetype("application/json;charset=utf8") == "application/json"


class MockRequestParserWithErrorHandler(MockRequestParser):
    def handle_error(self, error, req, schema, *, error_status_code, error_headers):
        assert isinstance(error, ValidationError)
        assert isinstance(schema, Schema)
        raise MockHTTPError(error_status_code, error_headers)


def test_parse_with_error_status_code_and_headers(web_request):
    def always_fail(_):
        raise ValidationError("oops")

    parser = MockRequestParserWithErrorHandler()
    web_request.json = {"foo": 42}
    args = {"foo": fields.Raw(validate=always_fail)}
    with pytest.raises(MockHTTPError) as excinfo:
        parser.parse(
            args, web_request, error_status_code=418, error_headers={"X-Foo": "bar"}
        )
    error = excinfo.value
    assert error.status_code == 418
    assert error.headers == {"X-Foo": "bar"}


@mock.patch("webargs.core.Parser.load_json")
def test_custom_schema_class(load_json, web_request):
    class CustomSchema(Schema):
        @pre_load
        def pre_load(self, data, **kwargs):
            data["value"] += " world"
            return data

    load_json.return_value = {"value": "hello"}
    argmap = {"value": fields.Str()}
    p = Parser(schema_class=CustomSchema)
    ret = p.parse(argmap, web_request)
    assert ret == {"value": "hello world"}


@mock.patch("webargs.core.Parser.load_json")
def test_custom_default_schema_class(load_json, web_request):
    class CustomSchema(Schema):
        @pre_load
        def pre_load(self, data, **kwargs):
            data["value"] += " world"
            return data

    class CustomParser(Parser):
        DEFAULT_SCHEMA_CLASS = CustomSchema

    load_json.return_value = {"value": "hello"}
    argmap = {"value": fields.Str()}
    p = CustomParser()
    ret = p.parse(argmap, web_request)
    assert ret == {"value": "hello world"}


def test_parser_pre_load(web_request):
    class CustomParser(MockRequestParser):
        # pre-load hook to strip whitespace from query params
        def pre_load(self, data, *, schema, req, location):
            if location == "query":
                return {k: v.strip() for k, v in data.items()}
            return data

    parser = CustomParser()

    # mock data for both query and json
    web_request.query = web_request.json = {"value": " hello "}
    argmap = {"value": fields.Str()}

    # data gets through for 'json' just fine
    ret = parser.parse(argmap, web_request)
    assert ret == {"value": " hello "}

    # but for 'query', the pre_load hook changes things
    ret = parser.parse(argmap, web_request, location="query")
    assert ret == {"value": "hello"}


# this test is meant to be a run of the WhitspaceStrippingFlaskParser we give
# in the docs/advanced.rst examples for how to use pre_load
# this helps ensure that the example code is correct
# rather than a FlaskParser, we're working with the mock parser, but it's
# otherwise the same
def test_whitespace_stripping_parser_example(web_request):
    def _strip_whitespace(value):
        if isinstance(value, str):
            value = value.strip()
        elif isinstance(value, typing.Mapping):
            return {k: _strip_whitespace(value[k]) for k in value}
        elif isinstance(value, (list, tuple)):
            return type(value)(map(_strip_whitespace, value))
        return value

    class WhitspaceStrippingParser(MockRequestParser):
        def pre_load(self, location_data, *, schema, req, location):
            if location in ("query", "form"):
                ret = _strip_whitespace(location_data)
                return ret
            return location_data

    parser = WhitspaceStrippingParser()

    # mock data for query, form, and json
    web_request.form = web_request.query = web_request.json = {"value": " hello "}
    argmap = {"value": fields.Str()}

    # data gets through for 'json' just fine
    ret = parser.parse(argmap, web_request)
    assert ret == {"value": " hello "}

    # but for 'query' and 'form', the pre_load hook changes things
    for loc in ("query", "form"):
        ret = parser.parse(argmap, web_request, location=loc)
        assert ret == {"value": "hello"}

    # check that it applies in the case where the field is a list type
    # applied to an argument (logic for `tuple` is effectively the same)
    web_request.form = web_request.query = web_request.json = {
        "ids": [" 1", "3", " 4"],
        "values": [" foo  ", " bar"],
    }
    schema = Schema.from_dict(
        {"ids": fields.List(fields.Int), "values": fields.List(fields.Str)}
    )
    for loc in ("query", "form"):
        ret = parser.parse(schema, web_request, location=loc)
        assert ret == {"ids": [1, 3, 4], "values": ["foo", "bar"]}

    # json loading should also work even though the pre_load hook above
    # doesn't strip whitespace from JSON data
    #   - values=[" foo  ", ...]  will have whitespace preserved
    #   - ids=[" 1", ...]  will still parse okay because "  1" is valid for fields.Int
    ret = parser.parse(schema, web_request, location="json")
    assert ret == {"ids": [1, 3, 4], "values": [" foo  ", " bar"]}


def test_parse_allows_non_dict_argmap_mapping(parser, web_request):
    web_request.json = {"username": "dadams", "password": 42}
    # UserDict is dict-like in all meaningful ways, but not a subclass of `dict`
    # it will therefore need to be converted when used
    argmap = collections.UserDict(
        {"username": fields.String(), "password": fields.Raw()}
    )

    ret = parser.parse(argmap, web_request)
    assert ret == {"username": "dadams", "password": 42}


def test_use_args_allows_non_dict_argmap_mapping(parser, web_request):
    web_request.json = {"username": "dadams", "password": 42}
    # UserDict is dict-like in all meaningful ways, but not a subclass of `dict`
    # it will therefore need to be converted when used
    argmap = collections.UserDict(
        {"username": fields.String(), "password": fields.Raw()}
    )

    @parser.use_args(argmap, web_request)
    def viewfunc(args):
        return args

    assert viewfunc() == {"username": "dadams", "password": 42}


def test_parse_rejects_unknown_argmap_type(parser, web_request):
    web_request.json = {"username": "dadams", "password": 42}

    class MyType:
        pass

    with pytest.raises(TypeError, match="argmap was of unexpected type"):
        parser.parse(MyType(), web_request)


def test_parser_opt_out_positional_args(web_request):
    class OptOutParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

    parser = MockRequestParser()
    opt_out_parser = OptOutParser()
    web_request.json = {"foo": "bar"}

    # first, test the behavior of a base parser for comparison
    #
    # no specific arg name, default parser, everything works
    # works for 'args', 'json_args', or any other name
    @parser.use_args({"foo": fields.Raw()}, web_request)
    def viewfunc1(args):
        return args

    @parser.use_args({"foo": fields.Raw()}, web_request)
    def viewfunc2(json_args):
        return json_args

    assert viewfunc1() == {"foo": "bar"}
    assert viewfunc2() == {"foo": "bar"}

    # second, test the behavior of a parser which sets USE_ARGS_POSITIONAL=False
    #
    # `json_args` as the arg name works as a positional or keyword-only
    # but `args` as the arg name does not
    @opt_out_parser.use_args({"foo": fields.Raw()}, web_request)
    def viewfunc3(json_args):
        return json_args

    @opt_out_parser.use_args({"foo": fields.Raw()}, web_request)
    def viewfunc4(*, json_args):
        return json_args

    @opt_out_parser.use_args({"foo": fields.Raw()}, web_request)
    def viewfunc5(args):
        return args

    assert viewfunc3() == {"foo": "bar"}
    assert viewfunc4() == {"foo": "bar"}

    with pytest.raises(TypeError):
        assert viewfunc5()


def test_use_args_implicit_arg_names(web_request):
    class OptOutParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

    parser = OptOutParser()
    web_request.json = {"foo": "bar"}
    web_request.query = {"bar": "baz"}

    @parser.use_args({"foo": fields.Raw()}, web_request)
    @parser.use_args({"bar": fields.Raw()}, web_request, location="query")
    def viewfunc(*, json_args, query_args):
        return (json_args, query_args)

    assert viewfunc() == ({"foo": "bar"}, {"bar": "baz"})


@pytest.mark.parametrize("use_positional_setting", (True, False))
def test_use_args_explicit_arg_names(web_request, use_positional_setting):
    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = use_positional_setting

    parser = MyParser()
    web_request.json = {"foo": "bar"}
    web_request.query = {"bar": "baz"}

    @parser.use_args({"foo": fields.Raw()}, web_request, arg_name="j")
    @parser.use_args({"bar": fields.Raw()}, web_request, location="query", arg_name="q")
    def viewfunc(*, j, q):
        return (j, q)

    assert viewfunc() == ({"foo": "bar"}, {"bar": "baz"})


def test_use_args_errors_on_explicit_arg_name_conflict(web_request):
    parser = MockRequestParser()
    web_request.json = {"foo": "bar"}
    web_request.query = {"bar": "baz"}

    with pytest.raises(ValueError, match="Attempted to pass `arg_name='q'`"):

        @parser.use_args({"foo": fields.Raw()}, web_request, arg_name="q")
        @parser.use_args(
            {"bar": fields.Raw()}, web_request, location="query", arg_name="q"
        )
        def viewfunc(*, j, q):
            return (j, q)


def test_use_args_errors_on_implicit_arg_name_conflict(web_request):
    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

    parser = MyParser()
    web_request.json = {"foo": "bar"}

    with pytest.raises(ValueError, match="Attempted to pass `arg_name='json_args'`"):

        @parser.use_args({"foo": fields.Raw()}, web_request)
        @parser.use_args({"foo": fields.Raw()}, web_request)
        def viewfunc(*, j, q):
            return (j, q)


def test_use_args_with_arg_name_supports_multi_stacked_decorators(web_request):
    # this test case specifically explores the use-case in which a view function is
    # decorated with one `use_args` call, and then "permuted" by decorating it with
    # other `use_args` calls
    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

    parser = MyParser()

    # create two body variants of the same route, from a single function
    # they share the same query params
    with_body_foo = parser.use_args({"foo": fields.Raw()}, web_request)
    with_body_bar = parser.use_args({"bar": fields.Raw()}, web_request)

    @parser.use_args({"snork": fields.Raw()}, web_request, location="query")
    def mypartial(*, json_args, query_args):
        return (json_args, query_args)

    route_foo = with_body_foo(mypartial)
    route_bar = with_body_bar(mypartial)

    # first, test that these behave as expected
    web_request.json = {"foo": "bar"}
    web_request.query = {"snork": 2}
    assert route_foo() == ({"foo": "bar"}, {"snork": 2})

    web_request.json = {"bar": "baz"}
    assert route_bar() == ({"bar": "baz"}, {"snork": 2})

    # now, inspect their internal state
    assert mypartial.__webargs_argnames__ == ("query_args",)
    assert route_foo.__webargs_argnames__ == ("query_args", "json_args")
    assert route_bar.__webargs_argnames__ == ("query_args", "json_args")


def test_default_arg_name_pattern_is_customizable(web_request):
    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

        def get_default_arg_name(self, location, schema):
            if location == "json":
                return "body"
            elif location == "query":
                return "query"
            else:
                return super().get_default_arg_name(location, schema)

    parser = MyParser()

    @parser.use_args({"frob": fields.Raw()}, web_request, location="json")
    @parser.use_args({"snork": fields.Raw()}, web_request, location="query")
    def myview(*, body, query):
        return (body, query)

    web_request.json = {"frob": "demuddler"}
    web_request.query = {"snork": 2}
    assert myview() == ({"frob": "demuddler"}, {"snork": 2})


def test_default_arg_name_pattern_still_allows_conflict_detection():
    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

        def get_default_arg_name(self, location, schema):
            return "data"

    parser = MyParser()

    with pytest.raises(ValueError, match="Attempted to pass `arg_name='data'`"):

        @parser.use_args({"frob": fields.Raw()}, web_request, location="json")
        @parser.use_args({"snork": fields.Raw()}, web_request, location="query")
        def myview(*, body, query):
            return (body, query)


def test_parse_with_dict_passes_schema_to_argname_derivation(web_request):
    default_argname_was_called = False

    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

        def get_default_arg_name(self, location, schema):
            assert isinstance(schema, Schema)
            nonlocal default_argname_was_called
            default_argname_was_called = True
            return super().get_default_arg_name(location, schema)

    parser = MyParser()

    @parser.use_args({"foo": fields.Raw()}, web_request, location="json")
    def myview(*, json_args):
        return json_args

    web_request.json = {"foo": 42}
    assert myview() == {"foo": 42}
    assert default_argname_was_called


def test_default_arg_name_pattern_can_pull_schema_attribute(web_request):
    # this test matches a documentation example exactly
    class RectangleSchema(Schema):
        _webargs_arg_name = "rectangle"
        length = fields.Integer()
        width = fields.Integer()

    class MyParser(MockRequestParser):
        USE_ARGS_POSITIONAL = False

        def get_default_arg_name(self, location, schema):
            assert schema is not None
            if hasattr(schema, "_webargs_arg_name"):
                if isinstance(schema._webargs_arg_name, str):
                    return schema._webargs_arg_name
            return super().get_default_arg_name(location, schema)

    parser = MyParser()

    @parser.use_args(RectangleSchema, web_request, location="json")
    def area(*, rectangle):
        return rectangle["length"] * rectangle["width"]

    web_request.json = {"length": 6, "width": 7}
    assert area() == 42
