From: Simon Charette <charette.s@gmail.com>
Date: Wed, 1 Dec 2021 00:43:39 -0500
Subject: Fixed #33282 -- Fixed a crash when OR'ing subquery and aggregation
 lookups.

As a QuerySet resolves to Query the outer column references grouping logic
should be defined on the latter and proxied from Subquery for the cases where
get_group_by_cols is called on unresolved expressions.

Thanks Antonio Terceiro for the report and initial patch.

(cherry picked from commit e5a92d400acb4ca6a8e1375d1ab8121f2c7220be)
---
 django/db/models/expressions.py |  9 +++++----
 django/db/models/sql/query.py   |  8 ++++++++
 tests/aggregation/tests.py      | 35 +++++++++++++++++++++++++++++++----
 3 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/django/db/models/expressions.py b/django/db/models/expressions.py
index 08ee5fe18bff..1feff855d3c2 100644
--- a/django/db/models/expressions.py
+++ b/django/db/models/expressions.py
@@ -1131,12 +1131,13 @@ class Subquery(BaseExpression, Combinable):
         return sql, sql_params
 
     def get_group_by_cols(self, alias=None):
+        # If this expression is referenced by an alias for an explicit GROUP BY
+        # through values() a reference to this expression and not the
+        # underlying .query must be returned to ensure external column
+        # references are not grouped against as well.
         if alias:
             return [Ref(alias, self)]
-        external_cols = self.get_external_cols()
-        if any(col.possibly_multivalued for col in external_cols):
-            return [self]
-        return external_cols
+        return self.query.get_group_by_cols()
 
 
 class Exists(Subquery):
diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py
index db52a4976951..817e0bb7ffc7 100644
--- a/django/db/models/sql/query.py
+++ b/django/db/models/sql/query.py
@@ -1098,6 +1098,14 @@ class Query(BaseExpression):
             if col.alias in self.external_aliases
         ]
 
+    def get_group_by_cols(self, alias=None):
+        if alias:
+            return [Ref(alias, self)]
+        external_cols = self.get_external_cols()
+        if any(col.possibly_multivalued for col in external_cols):
+            return [self]
+        return external_cols
+
     def as_sql(self, compiler, connection):
         sql, params = self.get_compiler(connection=connection).as_sql()
         if self.subquery:
diff --git a/tests/aggregation/tests.py b/tests/aggregation/tests.py
index d3d744c3d47d..48502c4a4a25 100644
--- a/tests/aggregation/tests.py
+++ b/tests/aggregation/tests.py
@@ -7,6 +7,7 @@ from django.db import connection
 from django.db.models import (
     Avg, Case, Count, DecimalField, DurationField, Exists, F, FloatField,
     IntegerField, Max, Min, OuterRef, Subquery, Sum, Value, When,
+    Q
 )
 from django.db.models.functions import Coalesce, Greatest
 from django.test import TestCase
@@ -1265,10 +1266,17 @@ class AggregateTestCase(TestCase):
         ).values(
             'publisher'
         ).annotate(count=Count('pk')).values('count')
-        long_books_count_breakdown = Publisher.objects.values_list(
-            Subquery(long_books_count_qs, IntegerField()),
-        ).annotate(total=Count('*'))
-        self.assertEqual(dict(long_books_count_breakdown), {None: 1, 1: 4})
+        groups = [
+            Subquery(long_books_count_qs),
+            long_books_count_qs,
+            long_books_count_qs.query,
+        ]
+        for group in groups:
+            with self.subTest(group=group.__class__.__name__):
+                long_books_count_breakdown = Publisher.objects.values_list(
+                    group,
+                ).annotate(total=Count('*'))
+                self.assertEqual(dict(long_books_count_breakdown), {None: 1, 1: 4})
 
     @skipUnlessDBFeature('supports_subqueries_in_group_by')
     def test_group_by_exists_annotation(self):
@@ -1326,6 +1334,25 @@ class AggregateTestCase(TestCase):
         ).values_list('publisher_count', flat=True)
         self.assertSequenceEqual(books_breakdown, [1] * 6)
 
+    def test_filter_in_subquery_or_aggregation(self):
+        """
+        Filtering against an aggregate requires the usage of the HAVING clause.
+
+        If such a filter is unionized to a non-aggregate one the latter will
+        also need to be moved to the HAVING clause and have its grouping
+        columns used in the GROUP BY.
+
+        When this is done with a subquery the specialized logic in charge of
+        using outer reference columns to group should be used instead of the
+        subquery itself as the latter might return multiple rows.
+        """
+        authors = Author.objects.annotate(
+            Count('book'),
+        ).filter(
+            Q(book__count__gt=0) | Q(pk__in=Book.objects.values('authors'))
+        )
+        self.assertQuerysetEqual(authors, Author.objects.all(), ordered=False)
+
     def test_aggregation_random_ordering(self):
         """Random() is not included in the GROUP BY when used for ordering."""
         authors = Author.objects.annotate(contact_count=Count('book')).order_by('?')
