import json
from datetime import date, datetime, timezone
from decimal import Decimal
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.text import MIMEText

from django.core import mail
from django.test import override_settings, tag
from django.utils.timezone import (
    get_fixed_timezone,
    override as override_current_timezone,
)

from anymail.exceptions import (
    AnymailAPIError,
    AnymailConfigurationError,
    AnymailRecipientsRefused,
    AnymailSerializationError,
    AnymailUnsupportedFeature,
)
from anymail.message import AnymailMessage, attach_inline_image_file

from .mock_requests_backend import RequestsBackendMockAPITestCase
from .utils import (
    SAMPLE_IMAGE_FILENAME,
    decode_att,
    sample_image_content,
    sample_image_path,
)


@tag("sparkpost")
@override_settings(
    EMAIL_BACKEND="anymail.backends.sparkpost.EmailBackend",
    ANYMAIL={"SPARKPOST_API_KEY": "test_api_key"},
)
class SparkPostBackendMockAPITestCase(RequestsBackendMockAPITestCase):
    """TestCase that uses SparkPostEmailBackend with a mocked transmissions.send API"""

    DEFAULT_RAW_RESPONSE = b"""{
        "results": {
            "id": "12345678901234567890",
            "total_accepted_recipients": 1,
            "total_rejected_recipients": 0
        }
    }"""

    def setUp(self):
        super().setUp()
        # Simple message useful for many tests
        self.message = mail.EmailMultiAlternatives(
            "Subject", "Text Body", "from@example.com", ["to@example.com"]
        )

    def set_mock_result(self, accepted=1, rejected=0, id="12345678901234567890"):
        """Set a mock response that reflects count of accepted/rejected recipients"""
        raw = json.dumps(
            {
                "results": {
                    "id": id,
                    "total_accepted_recipients": accepted,
                    "total_rejected_recipients": rejected,
                }
            }
        ).encode("utf-8")
        self.set_mock_response(raw=raw)
        return raw


@tag("sparkpost")
class SparkPostBackendStandardEmailTests(SparkPostBackendMockAPITestCase):
    """Test backend support for Django standard email features"""

    def test_send_mail(self):
        """Test basic API for simple send"""
        mail.send_mail(
            "Subject here",
            "Here is the message.",
            "from@example.com",
            ["to@example.com"],
            fail_silently=False,
        )

        self.assert_esp_called("/api/v1/transmissions/")

        headers = self.get_api_call_headers()
        self.assertEqual("test_api_key", headers["Authorization"])

        data = self.get_api_call_json()
        self.assertEqual(data["content"]["subject"], "Subject here")
        self.assertEqual(data["content"]["text"], "Here is the message.")
        self.assertEqual(data["content"]["from"], "from@example.com")
        self.assertEqual(
            data["recipients"],
            [{"address": {"email": "to@example.com", "header_to": "to@example.com"}}],
        )

    def test_name_addr(self):
        """Make sure RFC2822 name-addr format (with display-name) is allowed

        (Test both sender and recipient addresses)
        """
        self.set_mock_result(accepted=6)
        msg = mail.EmailMessage(
            "Subject",
            "Message",
            "From Name <from@example.com>",
            ["Recipient #1 <to1@example.com>", "to2@example.com"],
            cc=["Carbon Copy <cc1@example.com>", "cc2@example.com"],
            bcc=["Blind Copy <bcc1@example.com>", "bcc2@example.com"],
        )
        msg.send()

        data = self.get_api_call_json()
        self.assertEqual(data["content"]["from"], "From Name <from@example.com>")
        # This also checks recipient generation for cc and bcc. Because it's *not* a
        # batch send, each recipient should see a To header reflecting all To addresses.
        self.assertCountEqual(
            data["recipients"],
            [
                {
                    "address": {
                        "email": "to1@example.com",
                        "header_to": "Recipient #1 <to1@example.com>, to2@example.com",
                    }
                },
                {
                    "address": {
                        "email": "to2@example.com",
                        "header_to": "Recipient #1 <to1@example.com>, to2@example.com",
                    }
                },
                # cc and bcc must be explicitly specified as recipients
                {
                    "address": {
                        "email": "cc1@example.com",
                        "header_to": "Recipient #1 <to1@example.com>, to2@example.com",
                    }
                },
                {
                    "address": {
                        "email": "cc2@example.com",
                        "header_to": "Recipient #1 <to1@example.com>, to2@example.com",
                    }
                },
                {
                    "address": {
                        "email": "bcc1@example.com",
                        "header_to": "Recipient #1 <to1@example.com>, to2@example.com",
                    }
                },
                {
                    "address": {
                        "email": "bcc2@example.com",
                        "header_to": "Recipient #1 <to1@example.com>, to2@example.com",
                    }
                },
            ],
        )
        # Make sure we added a formatted Cc header visible to recipients
        # (and not a Bcc header)
        self.assertEqual(
            data["content"]["headers"],
            {"Cc": "Carbon Copy <cc1@example.com>, cc2@example.com"},
        )

    def test_custom_headers(self):
        self.set_mock_result(accepted=6)
        email = mail.EmailMessage(
            "Subject",
            "Body goes here",
            "from@example.com",
            ["to1@example.com"],
            cc=["cc1@example.com"],
            headers={
                "Reply-To": "another@example.com",
                "X-MyHeader": "my value",
                "Message-ID": "mycustommsgid@example.com",
            },
        )
        email.send()

        data = self.get_api_call_json()
        self.assertEqual(
            data["content"]["headers"],
            {
                # Reply-To moved to separate param (below)
                "X-MyHeader": "my value",
                "Message-ID": "mycustommsgid@example.com",
                "Cc": "cc1@example.com",  # Cc header added
            },
        )
        self.assertEqual(data["content"]["reply_to"], "another@example.com")

    def test_html_message(self):
        text_content = "This is an important message."
        html_content = "<p>This is an <strong>important</strong> message.</p>"
        email = mail.EmailMultiAlternatives(
            "Subject", text_content, "from@example.com", ["to@example.com"]
        )
        email.attach_alternative(html_content, "text/html")
        email.send()

        data = self.get_api_call_json()
        self.assertEqual(data["content"]["text"], text_content)
        self.assertEqual(data["content"]["html"], html_content)
        # Don't accidentally send the html part as an attachment:
        self.assertNotIn("attachments", data["content"])

    def test_html_only_message(self):
        html_content = "<p>This is an <strong>important</strong> message.</p>"
        email = mail.EmailMessage(
            "Subject", html_content, "from@example.com", ["to@example.com"]
        )
        email.content_subtype = "html"  # Main content is now text/html
        email.send()

        data = self.get_api_call_json()
        self.assertNotIn("text", data["content"])
        self.assertEqual(data["content"]["html"], html_content)

    def test_reply_to(self):
        email = mail.EmailMessage(
            "Subject",
            "Body goes here",
            "from@example.com",
            ["to1@example.com"],
            reply_to=["reply@example.com", "Other <reply2@example.com>"],
            headers={"X-Other": "Keep"},
        )
        email.send()

        data = self.get_api_call_json()
        self.assertEqual(
            data["content"]["reply_to"], "reply@example.com, Other <reply2@example.com>"
        )
        # don't lose other headers:
        self.assertEqual(data["content"]["headers"], {"X-Other": "Keep"})

    def test_attachments(self):
        text_content = "* Item one\n* Item two\n* Item three"
        self.message.attach(
            filename="test.txt", content=text_content, mimetype="text/plain"
        )

        # Should guess mimetype if not provided...
        png_content = b"PNG\xb4 pretend this is the contents of a png file"
        self.message.attach(filename="test.png", content=png_content)

        # Should work with a MIMEBase object (also tests no filename)...
        pdf_content = b"PDF\xb4 pretend this is valid pdf params"
        mimeattachment = MIMEBase("application", "pdf")
        mimeattachment.set_payload(pdf_content)
        self.message.attach(mimeattachment)

        self.message.send()
        data = self.get_api_call_json()
        attachments = data["content"]["attachments"]
        self.assertEqual(len(attachments), 3)
        self.assertEqual(attachments[0]["type"], "text/plain")
        self.assertEqual(attachments[0]["name"], "test.txt")
        self.assertEqual(
            decode_att(attachments[0]["data"]).decode("ascii"), text_content
        )
        self.assertEqual(attachments[1]["type"], "image/png")  # inferred from filename
        self.assertEqual(attachments[1]["name"], "test.png")
        self.assertEqual(decode_att(attachments[1]["data"]), png_content)
        self.assertEqual(attachments[2]["type"], "application/pdf")
        self.assertEqual(attachments[2]["name"], "")  # none
        self.assertEqual(decode_att(attachments[2]["data"]), pdf_content)
        # Make sure the image attachment is not treated as embedded:
        self.assertNotIn("inline_images", data["content"])

    def test_unicode_attachment_correctly_decoded(self):
        # Slight modification from the Django unicode docs:
        # http://django.readthedocs.org/en/latest/ref/unicode.html#email
        self.message.attach(
            "Une pièce jointe.html", "<p>\u2019</p>", mimetype="text/html"
        )
        self.message.send()
        data = self.get_api_call_json()
        attachments = data["content"]["attachments"]
        self.assertEqual(len(attachments), 1)

    def test_attachment_charset(self):
        # SparkPost allows charset param in attachment type
        self.message.attach(MIMEText("Une pièce jointe", "plain", "iso8859-1"))
        self.message.send()
        data = self.get_api_call_json()
        attachment = data["content"]["attachments"][0]
        self.assertEqual(attachment["type"], 'text/plain; charset="iso8859-1"')
        self.assertEqual(
            decode_att(attachment["data"]), "Une pièce jointe".encode("iso8859-1")
        )

    def test_embedded_images(self):
        image_filename = SAMPLE_IMAGE_FILENAME
        image_path = sample_image_path(image_filename)
        image_data = sample_image_content(image_filename)

        cid = attach_inline_image_file(self.message, image_path)
        html_content = (
            '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
        )
        self.message.attach_alternative(html_content, "text/html")

        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["content"]["html"], html_content)

        self.assertEqual(len(data["content"]["inline_images"]), 1)
        self.assertEqual(data["content"]["inline_images"][0]["type"], "image/png")
        self.assertEqual(data["content"]["inline_images"][0]["name"], cid)
        self.assertEqual(
            decode_att(data["content"]["inline_images"][0]["data"]), image_data
        )
        # Make sure neither the html nor the inline image is treated as an attachment:
        self.assertNotIn("attachments", data["content"])

    def test_attached_images(self):
        image_filename = SAMPLE_IMAGE_FILENAME
        image_path = sample_image_path(image_filename)
        image_data = sample_image_content(image_filename)

        # option 1: attach as a file
        self.message.attach_file(image_path)

        # option 2: construct the MIMEImage and attach it directly
        image = MIMEImage(image_data)
        self.message.attach(image)

        self.message.send()
        data = self.get_api_call_json()
        attachments = data["content"]["attachments"]
        self.assertEqual(len(attachments), 2)
        self.assertEqual(attachments[0]["type"], "image/png")
        self.assertEqual(attachments[0]["name"], image_filename)
        self.assertEqual(decode_att(attachments[0]["data"]), image_data)
        self.assertEqual(attachments[1]["type"], "image/png")
        self.assertEqual(attachments[1]["name"], "")  # unknown -- not attached as file
        self.assertEqual(decode_att(attachments[1]["data"]), image_data)
        # Make sure the image attachments are not treated as embedded:
        self.assertNotIn("inline_images", data["content"])

    def test_multiple_html_alternatives(self):
        # Multiple text/html alternatives not allowed
        self.message.attach_alternative("<p>First html is OK</p>", "text/html")
        self.message.attach_alternative("<p>But not second html</p>", "text/html")
        with self.assertRaises(AnymailUnsupportedFeature):
            self.message.send()

    def test_amp_html_alternative(self):
        # SparkPost *does* support text/x-amp-html alongside text/html
        self.message.attach_alternative("<p>HTML</p>", "text/html")
        self.message.attach_alternative("<p>And AMP HTML</p>", "text/x-amp-html")
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["content"]["html"], "<p>HTML</p>")
        self.assertEqual(data["content"]["amp_html"], "<p>And AMP HTML</p>")

    def test_html_alternative(self):
        # Only html alternatives allowed
        self.message.attach_alternative("{'not': 'allowed'}", "application/json")
        with self.assertRaises(AnymailUnsupportedFeature):
            self.message.send()

    def test_alternatives_fail_silently(self):
        # Make sure fail_silently is respected
        self.message.attach_alternative("{'not': 'allowed'}", "application/json")
        sent = self.message.send(fail_silently=True)
        self.assert_esp_not_called("API should not be called when send fails silently")
        self.assertEqual(sent, 0)

    def test_suppress_empty_address_lists(self):
        """Empty to, cc, bcc, and reply_to shouldn't generate empty headers"""
        self.message.send()
        data = self.get_api_call_json()
        self.assertNotIn("headers", data["content"])  # No Cc, Bcc or Reply-To header
        self.assertNotIn("reply_to", data["content"])

    def test_empty_to(self):
        # Test empty `to`--but send requires at least one recipient somewhere (like cc)
        self.message.to = []
        self.message.cc = ["cc@example.com"]
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(
            data["recipients"],
            [
                {
                    "address": {
                        "email": "cc@example.com",
                        # This results in a message with an empty To header, as desired:
                        "header_to": "",
                    },
                }
            ],
        )

    def test_api_failure(self):
        self.set_mock_response(status_code=400)
        with self.assertRaisesMessage(AnymailAPIError, "SparkPost API response 400"):
            self.message.send()

    def test_api_failure_fail_silently(self):
        # Make sure fail_silently is respected
        self.set_mock_response(status_code=400)
        sent = self.message.send(fail_silently=True)
        self.assertEqual(sent, 0)

    def test_api_error_includes_details(self):
        """AnymailAPIError should include ESP's error message"""
        failure_response = b"""{
            "errors": [{
                "message": "Helpful explanation from your ESP"
            }]
        }"""
        self.set_mock_response(status_code=400, raw=failure_response)
        with self.assertRaisesMessage(
            AnymailAPIError, "Helpful explanation from your ESP"
        ):
            self.message.send()


@tag("sparkpost")
class SparkPostBackendAnymailFeatureTests(SparkPostBackendMockAPITestCase):
    """Test backend support for Anymail added features"""

    def test_envelope_sender(self):
        self.message.envelope_sender = "bounce-handler@bounces.example.com"
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["return_path"], "bounce-handler@bounces.example.com")

    def test_metadata(self):
        self.message.metadata = {"user_id": "12345", "items": "spark, post"}
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["metadata"], {"user_id": "12345", "items": "spark, post"})

    def test_send_at(self):
        utc_plus_6 = get_fixed_timezone(6 * 60)
        utc_minus_8 = get_fixed_timezone(-8 * 60)

        # SparkPost expects ISO-8601 YYYY-MM-DDTHH:MM:SS+-HH:MM
        with override_current_timezone(utc_plus_6):
            # Timezone-aware datetime converted to UTC:
            self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=utc_minus_8)
            self.message.send()
            data = self.get_api_call_json()
            self.assertEqual(data["options"]["start_time"], "2016-03-04T05:06:07-08:00")

            # Explicit UTC:
            self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=timezone.utc)
            self.message.send()
            data = self.get_api_call_json()
            self.assertEqual(data["options"]["start_time"], "2016-03-04T05:06:07+00:00")

            # Timezone-naive datetime assumed to be Django current_timezone
            # (also checks stripping microseconds)
            self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567)
            self.message.send()
            data = self.get_api_call_json()
            self.assertEqual(data["options"]["start_time"], "2022-10-11T12:13:14+06:00")

            # Date-only treated as midnight in current timezone
            self.message.send_at = date(2022, 10, 22)
            self.message.send()
            data = self.get_api_call_json()
            self.assertEqual(data["options"]["start_time"], "2022-10-22T00:00:00+06:00")

            # POSIX timestamp
            self.message.send_at = 1651820889  # 2022-05-06 07:08:09 UTC
            self.message.send()
            data = self.get_api_call_json()
            self.assertEqual(data["options"]["start_time"], "2022-05-06T07:08:09+00:00")

            # String passed unchanged (this is *not* portable between ESPs)
            self.message.send_at = "2022-10-13T18:02:00-11:30"
            self.message.send()
            data = self.get_api_call_json()
            self.assertEqual(data["options"]["start_time"], "2022-10-13T18:02:00-11:30")

    def test_tags(self):
        self.message.tags = ["receipt"]
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["campaign_id"], "receipt")

        self.message.tags = ["receipt", "repeat-user"]
        with self.assertRaisesMessage(AnymailUnsupportedFeature, "multiple tags"):
            self.message.send()

    def test_tracking(self):
        # Test one way...
        self.message.track_opens = True
        self.message.track_clicks = False
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["options"]["open_tracking"], True)
        self.assertEqual(data["options"]["click_tracking"], False)

        # ...and the opposite way
        self.message.track_opens = False
        self.message.track_clicks = True
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["options"]["open_tracking"], False)
        self.assertEqual(data["options"]["click_tracking"], True)

    def test_template_id(self):
        message = mail.EmailMultiAlternatives(to=["to@example.com"])
        message.from_email = None
        message.template_id = "welcome_template"
        message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["content"]["template_id"], "welcome_template")
        # SparkPost disallows all content (even empty strings) with stored template:
        self.assertNotIn("subject", data["content"])
        self.assertNotIn("text", data["content"])
        self.assertNotIn("html", data["content"])
        self.assertNotIn("from", data["content"])

    def test_template_id_ignores_default_from_email(self):
        # No from_email, or from_email=None in constructor,
        # uses settings.DEFAULT_FROM_EMAIL. We strip that with a template_id:
        message = AnymailMessage(to=["to@example.com"], template_id="welcome_template")
        self.assertIsNotNone(message.from_email)  # because it's DEFAULT_FROM_EMAIL
        message.send()
        data = self.get_api_call_json()
        self.assertNotIn("from", data["content"])

    def test_unsupported_content_with_template_id(self):
        # Make sure we raise an error for options that SparkPost
        # silently ignores when sending with a template_id
        message = AnymailMessage(
            to=["to@example.com"],
            from_email="non-default-from@example.com",
            reply_to=["reply@example.com"],
            headers={"X-Custom": "header"},
            template_id="welcome_template",
        )
        message.attach(filename="test.txt", content="attachment", mimetype="text/plain")
        with self.assertRaisesMessage(
            AnymailUnsupportedFeature,
            "template_id with attachments, extra headers and/or cc recipients,"
            " from_email, reply_to",
        ):
            message.send()

    def test_merge_data(self):
        self.set_mock_result(accepted=4)  # two 'to' plus one 'cc' for each 'to'
        self.message.to = ["alice@example.com", "Bob <bob@example.com>"]
        self.message.cc = ["cc@example.com"]
        self.message.body = "Hi {{address.name}}. Welcome to {{group}} at {{site}}."
        self.message.merge_data = {
            "alice@example.com": {"name": "Alice", "group": "Developers"},
            "bob@example.com": {"name": "Bob"},  # and leave group undefined
            "nobody@example.com": {"name": "Not a recipient for this message"},
        }
        self.message.merge_global_data = {"group": "Users", "site": "ExampleCo"}

        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(
            {"group": "Users", "site": "ExampleCo"}, data["substitution_data"]
        )
        self.assertEqual(
            [
                {
                    "address": {
                        "email": "alice@example.com",
                        "header_to": "alice@example.com",
                    },
                    "substitution_data": {"name": "Alice", "group": "Developers"},
                },
                {
                    "address": {
                        "email": "bob@example.com",
                        "header_to": "Bob <bob@example.com>",
                    },
                    "substitution_data": {"name": "Bob"},
                },
                {  # duplicated for cc recipients...
                    "address": {
                        "email": "cc@example.com",
                        "header_to": "alice@example.com",
                    },
                    "substitution_data": {"name": "Alice", "group": "Developers"},
                },
                {
                    "address": {
                        "email": "cc@example.com",
                        "header_to": "Bob <bob@example.com>",
                    },
                    "substitution_data": {"name": "Bob"},
                },
            ],
            data["recipients"],
        )

    def test_merge_metadata(self):
        self.set_mock_result(accepted=2)
        self.message.to = ["alice@example.com", "Bob <bob@example.com>"]
        self.message.merge_metadata = {
            "alice@example.com": {"order_id": 123},
            "bob@example.com": {"order_id": 678, "tier": "premium"},
        }
        self.message.metadata = {"notification_batch": "zx912"}

        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(
            [
                {
                    "address": {
                        "email": "alice@example.com",
                        "header_to": "alice@example.com",
                    },
                    "metadata": {"order_id": 123},
                },
                {
                    "address": {
                        "email": "bob@example.com",
                        "header_to": "Bob <bob@example.com>",
                    },
                    "metadata": {"order_id": 678, "tier": "premium"},
                },
            ],
            data["recipients"],
        )
        self.assertEqual(data["metadata"], {"notification_batch": "zx912"})

    def test_merge_headers(self):
        self.set_mock_result(accepted=2)
        self.message.to = ["alice@example.com", "Bob <bob@example.com>"]
        self.message.extra_headers = {
            "X-Custom-1": "custom 1",
            "X-Custom-2": "custom 2 (default)",
        }
        self.message.merge_headers = {
            "alice@example.com": {
                "X-Custom-2": "custom 2 alice",
                "X-Custom-3": "custom 3 alice",
            },
            "bob@example.com": {"X-Custom-2": "custom 2 bob"},
        }

        self.message.send()
        data = self.get_api_call_json()
        recipients = data["recipients"]
        self.assertEqual(len(recipients), 2)
        self.assertEqual(recipients[0]["address"]["email"], "alice@example.com")
        self.assertEqual(
            recipients[0]["substitution_data"],
            {
                "Header__X_Custom_2": "custom 2 alice",
                "Header__X_Custom_3": "custom 3 alice",
            },
        )
        self.assertEqual(recipients[1]["address"]["email"], "bob@example.com")
        self.assertEqual(
            recipients[1]["substitution_data"],
            {
                "Header__X_Custom_2": "custom 2 bob",
            },
        )
        # Indirect merge_headers through template substitutions:
        self.assertEqual(
            data["content"]["headers"],
            {
                "X-Custom-1": "custom 1",  # (not a merge_header, value unchanged)
                "X-Custom-2": "{{Header__X_Custom_2}}",
                "X-Custom-3": "{{Header__X_Custom_3}}",
            },
        )
        # Defaults for merge_headers in global substitution_data:
        self.assertEqual(
            data["substitution_data"],
            {
                "Header__X_Custom_2": "custom 2 (default)",
                # No default specified for X-Custom-3; SparkPost will use empty string
            },
        )

    def test_default_omits_options(self):
        """Make sure by default we don't send any ESP-specific options.

        Options not specified by the caller should be omitted entirely from
        the API call (*not* sent as False or empty). This ensures
        that your ESP account settings apply by default.
        """
        self.message.send()
        data = self.get_api_call_json()
        self.assertNotIn("campaign_id", data)
        self.assertNotIn("metadata", data)
        # covers start_time, click_tracking, open_tracking:
        self.assertNotIn("options", data)
        self.assertNotIn("substitution_data", data)
        self.assertNotIn("template_id", data["content"])

    def test_esp_extra(self):
        self.message.esp_extra = {
            "description": "The description",
            "options": {
                "transactional": True,
            },
            "content": {
                "use_draft_template": True,
                "ab_test_id": "highlight_support_links",
            },
        }
        self.message.track_clicks = True
        self.message.send()
        data = self.get_api_call_json()
        self.assertEqual(data["description"], "The description")
        self.assertEqual(
            data["options"],
            {
                "transactional": True,
                "click_tracking": True,  # deep merge
            },
        )
        self.assertDictMatches(
            {
                "use_draft_template": True,
                "ab_test_id": "highlight_support_links",
                "text": "Text Body",  # deep merge
                "subject": "Subject",  # deep merge
            },
            data["content"],
        )

    def test_send_attaches_anymail_status(self):
        """The anymail_status should be attached to the message when it is sent"""
        response_content = self.set_mock_result(accepted=1, rejected=0, id="9876543210")
        msg = mail.EmailMessage(
            "Subject",
            "Message",
            "from@example.com",
            ["to1@example.com"],
        )
        sent = msg.send()
        self.assertEqual(sent, 1)
        self.assertEqual(msg.anymail_status.status, {"queued"})
        self.assertEqual(msg.anymail_status.message_id, "9876543210")
        self.assertEqual(
            msg.anymail_status.recipients["to1@example.com"].status, "queued"
        )
        self.assertEqual(
            msg.anymail_status.recipients["to1@example.com"].message_id, "9876543210"
        )
        self.assertEqual(msg.anymail_status.esp_response.content, response_content)

    @override_settings(
        ANYMAIL_IGNORE_RECIPIENT_STATUS=True  # exception is tested later
    )
    def test_send_all_rejected(self):
        """The anymail_status should be 'rejected' when all recipients rejected"""
        self.set_mock_result(accepted=0, rejected=2)
        msg = mail.EmailMessage(
            "Subject",
            "Message",
            "from@example.com",
            ["to1@example.com", "to2@example.com"],
        )
        msg.send()
        self.assertEqual(msg.anymail_status.status, {"rejected"})
        self.assertEqual(
            msg.anymail_status.recipients["to1@example.com"].status, "rejected"
        )
        self.assertEqual(
            msg.anymail_status.recipients["to2@example.com"].status, "rejected"
        )

    def test_send_some_rejected(self):
        """
        The anymail_status should be 'unknown'
        when some recipients accepted and some rejected
        """
        self.set_mock_result(accepted=1, rejected=1)
        msg = mail.EmailMessage(
            "Subject",
            "Message",
            "from@example.com",
            ["to1@example.com", "to2@example.com"],
        )
        msg.send()
        self.assertEqual(msg.anymail_status.status, {"unknown"})
        self.assertEqual(
            msg.anymail_status.recipients["to1@example.com"].status, "unknown"
        )
        self.assertEqual(
            msg.anymail_status.recipients["to2@example.com"].status, "unknown"
        )

    def test_send_unexpected_count(self):
        """The anymail_status should be 'unknown' when the total result count
        doesn't match the number of recipients"""
        self.set_mock_result(accepted=3, rejected=0)  # but only 2 in the to-list
        msg = mail.EmailMessage(
            "Subject",
            "Message",
            "from@example.com",
            ["to1@example.com", "to2@example.com"],
        )
        msg.send()
        self.assertEqual(msg.anymail_status.status, {"unknown"})
        self.assertEqual(
            msg.anymail_status.recipients["to1@example.com"].status, "unknown"
        )
        self.assertEqual(
            msg.anymail_status.recipients["to2@example.com"].status, "unknown"
        )

    # noinspection PyUnresolvedReferences
    def test_send_failed_anymail_status(self):
        """If the send fails, anymail_status should contain initial values"""
        self.set_mock_response(status_code=400)
        sent = self.message.send(fail_silently=True)
        self.assertEqual(sent, 0)
        self.assertIsNone(self.message.anymail_status.status)
        self.assertIsNone(self.message.anymail_status.message_id)
        self.assertEqual(self.message.anymail_status.recipients, {})
        self.assertIsNone(self.message.anymail_status.esp_response)

    # noinspection PyUnresolvedReferences
    def test_send_unparsable_response(self):
        """
        If the send succeeds, but result is unexpected format,
        should raise an API exception
        """
        response_content = b"""{"wrong": "format"}"""
        self.set_mock_response(raw=response_content)
        with self.assertRaises(AnymailAPIError):
            self.message.send()
        self.assertIsNone(self.message.anymail_status.status)
        self.assertIsNone(self.message.anymail_status.message_id)
        self.assertEqual(self.message.anymail_status.recipients, {})
        self.assertEqual(
            self.message.anymail_status.esp_response.content, response_content
        )

    def test_json_serialization_errors(self):
        """Try to provide more information about non-json-serializable data"""
        self.message.tags = [Decimal("19.99")]  # yeah, don't do this
        with self.assertRaises(AnymailSerializationError) as cm:
            self.message.send()
            print(self.get_api_call_json())
        err = cm.exception
        self.assertIsInstance(err, TypeError)  # compatibility with json.dumps
        # our added context:
        self.assertIn("Don't know how to send this data to SparkPost", str(err))
        # original message:
        self.assertRegex(str(err), r"Decimal.*is not JSON serializable")


@tag("sparkpost")
class SparkPostBackendRecipientsRefusedTests(SparkPostBackendMockAPITestCase):
    """
    Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid
    """

    def test_recipients_refused(self):
        self.set_mock_result(accepted=0, rejected=2)
        msg = mail.EmailMessage(
            "Subject",
            "Body",
            "from@example.com",
            ["invalid@localhost", "reject@example.com"],
        )
        with self.assertRaises(AnymailRecipientsRefused):
            msg.send()

    def test_fail_silently(self):
        self.set_mock_result(accepted=0, rejected=2)
        sent = mail.send_mail(
            "Subject",
            "Body",
            "from@example.com",
            ["invalid@localhost", "reject@example.com"],
            fail_silently=True,
        )
        self.assertEqual(sent, 0)

    def test_mixed_response(self):
        """If *any* recipients are valid or queued, no exception is raised"""
        self.set_mock_result(accepted=2, rejected=2)
        msg = mail.EmailMessage(
            "Subject",
            "Body",
            "from@example.com",
            [
                "invalid@localhost",
                "valid@example.com",
                "reject@example.com",
                "also.valid@example.com",
            ],
        )
        sent = msg.send()
        # one message sent, successfully, to 2 of 4 recipients:
        self.assertEqual(sent, 1)
        status = msg.anymail_status
        # We don't know which recipients were rejected
        self.assertEqual(status.recipients["invalid@localhost"].status, "unknown")
        self.assertEqual(status.recipients["valid@example.com"].status, "unknown")
        self.assertEqual(status.recipients["reject@example.com"].status, "unknown")
        self.assertEqual(status.recipients["also.valid@example.com"].status, "unknown")

    @override_settings(ANYMAIL_IGNORE_RECIPIENT_STATUS=True)
    def test_settings_override(self):
        """No exception with ignore setting"""
        self.set_mock_result(accepted=0, rejected=2)
        sent = mail.send_mail(
            "Subject",
            "Body",
            "from@example.com",
            ["invalid@localhost", "reject@example.com"],
        )
        self.assertEqual(sent, 1)  # refused message is included in sent count


@tag("sparkpost")
class SparkPostBackendConfigurationTests(SparkPostBackendMockAPITestCase):
    """Test various SparkPost client options"""

    @override_settings(
        # clear SPARKPOST_API_KEY from SparkPostBackendMockAPITestCase:
        ANYMAIL={}
    )
    def test_missing_api_key(self):
        with self.assertRaises(AnymailConfigurationError) as cm:
            mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
        errmsg = str(cm.exception)
        # Make sure the error mentions the different places to set the key
        self.assertRegex(errmsg, r"\bSPARKPOST_API_KEY\b")
        self.assertRegex(errmsg, r"\bANYMAIL_SPARKPOST_API_KEY\b")

    @override_settings(
        ANYMAIL={
            "SPARKPOST_API_URL": "https://api.eu.sparkpost.com/api/v1",
            "SPARKPOST_API_KEY": "test_api_key",
        }
    )
    def test_sparkpost_api_url(self):
        mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
        self.assert_esp_called("https://api.eu.sparkpost.com/api/v1/transmissions/")

        # can also override on individual connection
        # (and even use non-versioned labs endpoint)
        connection = mail.get_connection(api_url="https://api.sparkpost.com/api/labs")
        mail.send_mail(
            "Subject",
            "Message",
            "from@example.com",
            ["to@example.com"],
            connection=connection,
        )
        self.assert_esp_called("https://api.sparkpost.com/api/labs/transmissions/")

    def test_subaccount(self):
        # A likely use case is supplying subaccount for a particular message.
        # (For all messages, just set SPARKPOST_SUBACCOUNT in ANYMAIL settings.)
        connection = mail.get_connection(subaccount=123)
        mail.send_mail(
            "Subject",
            "Message",
            "from@example.com",
            ["to@example.com"],
            connection=connection,
        )
        headers = self.get_api_call_headers()
        self.assertEqual(headers["X-MSYS-SUBACCOUNT"], 123)

        # Make sure we're not setting the header on non-subaccount sends
        mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
        headers = self.get_api_call_headers()
        self.assertNotIn("X-MSYS-SUBACCOUNT", headers)
