1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
import json
import warnings
from django.contrib.postgres.fields import ArrayField
from django.db.models import Aggregate, BooleanField, JSONField, TextField, Value
from django.utils.deprecation import RemovedInDjango50Warning, RemovedInDjango51Warning
from .mixins import OrderableAggMixin
__all__ = [
"ArrayAgg",
"BitAnd",
"BitOr",
"BitXor",
"BoolAnd",
"BoolOr",
"JSONBAgg",
"StringAgg",
]
# RemovedInDjango50Warning
NOT_PROVIDED = object()
class DeprecatedConvertValueMixin:
def __init__(self, *expressions, default=NOT_PROVIDED, **extra):
if default is NOT_PROVIDED:
default = None
self._default_provided = False
else:
self._default_provided = True
super().__init__(*expressions, default=default, **extra)
def resolve_expression(self, *args, **kwargs):
resolved = super().resolve_expression(*args, **kwargs)
if not self._default_provided:
resolved.empty_result_set_value = getattr(
self, "deprecation_empty_result_set_value", self.deprecation_value
)
return resolved
def convert_value(self, value, expression, connection):
if value is None and not self._default_provided:
warnings.warn(self.deprecation_msg, category=RemovedInDjango50Warning)
return self.deprecation_value
return value
class ArrayAgg(DeprecatedConvertValueMixin, OrderableAggMixin, Aggregate):
function = "ARRAY_AGG"
template = "%(function)s(%(distinct)s%(expressions)s %(ordering)s)"
allow_distinct = True
# RemovedInDjango50Warning
deprecation_value = property(lambda self: [])
deprecation_msg = (
"In Django 5.0, ArrayAgg() will return None instead of an empty list "
"if there are no rows. Pass default=None to opt into the new behavior "
"and silence this warning or default=[] to keep the previous behavior."
)
@property
def output_field(self):
return ArrayField(self.source_expressions[0].output_field)
class BitAnd(Aggregate):
function = "BIT_AND"
class BitOr(Aggregate):
function = "BIT_OR"
class BitXor(Aggregate):
function = "BIT_XOR"
class BoolAnd(Aggregate):
function = "BOOL_AND"
output_field = BooleanField()
class BoolOr(Aggregate):
function = "BOOL_OR"
output_field = BooleanField()
class JSONBAgg(DeprecatedConvertValueMixin, OrderableAggMixin, Aggregate):
function = "JSONB_AGG"
template = "%(function)s(%(distinct)s%(expressions)s %(ordering)s)"
allow_distinct = True
output_field = JSONField()
# RemovedInDjango50Warning
deprecation_value = "[]"
deprecation_empty_result_set_value = property(lambda self: [])
deprecation_msg = (
"In Django 5.0, JSONBAgg() will return None instead of an empty list "
"if there are no rows. Pass default=None to opt into the new behavior "
"and silence this warning or default=[] to keep the previous "
"behavior."
)
# RemovedInDjango51Warning: When the deprecation ends, remove __init__().
#
# RemovedInDjango50Warning: When the deprecation ends, replace with:
# def __init__(self, *expressions, default=None, **extra):
def __init__(self, *expressions, default=NOT_PROVIDED, **extra):
super().__init__(*expressions, default=default, **extra)
if (
isinstance(default, Value)
and isinstance(default.value, str)
and not isinstance(default.output_field, JSONField)
):
value = default.value
try:
decoded = json.loads(value)
except json.JSONDecodeError:
warnings.warn(
"Passing a Value() with an output_field that isn't a JSONField as "
"JSONBAgg(default) is deprecated. Pass default="
f"Value({value!r}, output_field=JSONField()) instead.",
stacklevel=2,
category=RemovedInDjango51Warning,
)
self.default.output_field = self.output_field
else:
self.default = Value(decoded, self.output_field)
warnings.warn(
"Passing an encoded JSON string as JSONBAgg(default) is "
f"deprecated. Pass default={decoded!r} instead.",
stacklevel=2,
category=RemovedInDjango51Warning,
)
class StringAgg(DeprecatedConvertValueMixin, OrderableAggMixin, Aggregate):
function = "STRING_AGG"
template = "%(function)s(%(distinct)s%(expressions)s %(ordering)s)"
allow_distinct = True
output_field = TextField()
# RemovedInDjango50Warning
deprecation_value = ""
deprecation_msg = (
"In Django 5.0, StringAgg() will return None instead of an empty "
"string if there are no rows. Pass default=None to opt into the new "
'behavior and silence this warning or default="" to keep the previous behavior.'
)
def __init__(self, expression, delimiter, **extra):
delimiter_expr = Value(str(delimiter))
super().__init__(expression, delimiter_expr, **extra)
|