File: spies.py

package info (click to toggle)
python-kgb 7.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 532 kB
  • sloc: python: 4,466; makefile: 3
file content (1151 lines) | stat: -rw-r--r-- 43,873 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
from __future__ import absolute_import, unicode_literals

import copy
import inspect
import types

from kgb.calls import SpyCall
from kgb.errors import (ExistingSpyError,
                        IncompatibleFunctionError,
                        InternalKGBError)
from kgb.pycompat import iterkeys, pyver
from kgb.signature import FunctionSig, _UNSET_ARG
from kgb.utils import is_attr_defined_on_ancestor


class FunctionSpy(object):
    """A spy infiltrating a function.

    A FunctionSpy takes the place of another function. It will record any
    calls made to the function for later inspection.

    By default, a FunctionSpy will allow the call to go through to the
    original function. This can be disabled by passing call_original=False
    when initiating the spy. If disabled, the original function will never be
    called.

    This can also be passed a call_fake parameter pointing to another
    function to call instead of the original. If passed, this will take
    precedence over call_original.
    """

    #: The spy represents a standard function.
    TYPE_FUNCTION = FunctionSig.TYPE_FUNCTION

    #: The spy represents a bound method.
    #:
    #: Bound methods are functions on an instance of a class, or classmethods.
    TYPE_BOUND_METHOD = FunctionSig.TYPE_BOUND_METHOD

    #: The spy represents an unbound method.
    #:
    #: Unbound methods are standard methods on a class.
    TYPE_UNBOUND_METHOD = FunctionSig.TYPE_UNBOUND_METHOD

    _PROXY_METHODS = [
        'call_original', 'called_with', 'last_called_with',
        'raised', 'last_raised', 'returned', 'last_returned',
        'raised_with_message', 'last_raised_with_message',
        'reset_calls', 'unspy',
    ]

    _FUNC_ATTR_DEFAULTS = {
        'calls': [],
        'called': False,
        'last_call': None,
    }

    _spy_map = {}

    def __init__(self, agency, func, call_fake=None, call_original=True,
                 op=None, owner=_UNSET_ARG, func_name=None):
        """Initialize the spy.

        This will begin spying on the provided function or method, injecting
        new code into the function to help record how it was called and
        what it returned, and adding methods and state onto the function
        for callers to access in order to get those results.

        Version Added:
            7.0:
            Added support for specifying an explicit function name using
            ``func_name=``.

        Version Added:
            5.0:
            Added support for specifying an instance in ``owner`` when spying
            on bound methods using decorators that return plain functions.

        Args:
            agency (kgb.agency.SpyAgency):
                The spy agency that manages this spy.

            func (callable):
                The function or method to spy on.

            call_fake (callable, optional):
                The optional function to call when this function is invoked.

                This cannot be specified if ``op`` is provided.

            call_original (bool, optional):
                Whether to call the original function when the spy is
                invoked. If ``False``, no function will be called.

                This is ignored if ``call_fake`` or ``op`` are provided.

            op (kgb.spies.BaseOperation, optional):
                An operation to perform.

                This cannot be specified if ``call_fake`` is provided.

            owner (type or object, optional):
                The owner of the function or method.

                If spying on an unbound method, this **must** be set to the
                class that owns it.

                If spying on a bound method that identifies as a plain
                function (which may happen if the method is decorated and
                dynamically returns a new function on access), this should
                be the instance of the object you're spying on.

            func_name (str, optional):
                An explicit name for the function. This will be used instead
                of the function's specified name, and is usually a sign of a
                bad decorator.

                Version Added:
                    7.0
        """
        # Start off by grabbing the current frame. This will be needed for
        # some errors.
        self.init_frame = inspect.currentframe()

        # Check the parameters passed to make sure that invalid data wasn't
        # provided.
        if op is not None and call_fake is not None:
            raise ValueError('op and call_fake cannot both be provided.')

        if hasattr(func, 'spy'):
            raise ExistingSpyError(func)

        if (not callable(func) or
            not hasattr(func, FunctionSig.FUNC_NAME_ATTR) or
            not (hasattr(func, FunctionSig.METHOD_SELF_ATTR) or
                 hasattr(func, FunctionSig.FUNC_GLOBALS_ATTR))):
            raise ValueError('%r cannot be spied on. It does not appear to '
                             'be a valid function or method.'
                             % func)

        # Construct a signature for the function and begin closely inspecting
        # the parameters, making sure everything will be compatible so we
        # don't have unexpected breakages when setting up or calling spies.
        sig = FunctionSig(func=func,
                          owner=owner,
                          func_name=func_name)
        self._sig = sig

        # If the caller passed an explicit owner, check to see if it's at all
        # valid. Note that it may have been handled above (for unbound
        # methods).
        if owner is not _UNSET_ARG and owner is not self.owner:
            if self.func_type == self.TYPE_FUNCTION:
                raise ValueError(
                    'This function has no owner, but an owner was passed '
                    'to spy_on().')
            else:
                if not hasattr(owner, self.func_name):
                    raise ValueError('The owner passed does not contain the '
                                     'spied method.')
                elif (self.func_type == self.TYPE_BOUND_METHOD or
                      (pyver[0] == 2 and
                       self.func_type == self.TYPE_UNBOUND_METHOD)):
                    raise ValueError(
                        'The owner passed does not match the actual owner of '
                        'the bound method.')

        # We cannot currently spy on unbound methods that result in slippery
        # functions, so check for that and bail early.
        if (sig.is_slippery and
            self.func_type == self.TYPE_UNBOUND_METHOD):
            raise ValueError('Unable to spy on unbound slippery methods '
                             '(those that return a new function on each '
                             'attribute access). Please spy on an instance '
                             'instead.')

        # If call_fake was provided, check that it's valid and has a
        # compatible function signature.
        if op is not None:
            # We've already checked this above, but check it again.
            assert call_fake is None

            call_fake = op.setup(self)
            assert call_fake is not None

        if call_fake is not None:
            if not callable(call_fake):
                raise ValueError('%r cannot be used for call_fake. It does '
                                 'not appear to be a valid function or method.'
                                 % call_fake)

            call_fake_sig = FunctionSig(call_fake,
                                        func_name=func_name)

            if not sig.is_compatible_with(call_fake_sig):
                raise IncompatibleFunctionError(
                    func=func,
                    func_sig=sig,
                    incompatible_func=call_fake,
                    incompatible_func_sig=call_fake_sig)

        # Now that we're done validating, we can start setting state and
        # patching things.
        self.agency = agency
        self.orig_func = func
        self._real_func = sig.real_func
        self._call_orig_func = self._clone_function(self.orig_func)

        if self._get_owner_needs_patching():
            # We need to store the original attribute value for the function,
            # as defined in the class that owns it. That may be the provided
            # or calculated owner, or a parent of it.
            #
            # This is needed because the function provided may not actually be
            # what's defined on the class. What's defined might be a decorator
            # that returns a function, and it might not even be the same
            # function each time it's accessed.
            self._owner_func_attr_value = \
                self.owner.__dict__.get(self.func_name)

            # Now we can patch the owner to prevent conflicts between spies.
            self._patch_owner()
        else:
            self._owner_func_attr_value = self.orig_func

        # Determine what we're going to invoke when the spy is called.
        if call_fake:
            self.func = call_fake
        elif call_original:
            self.func = self.orig_func
        else:
            self.func = None

        # Build our proxy function. This is the spy itself, the function that
        # will actually be invoked when the spied-on function is called.
        self._build_proxy_func(func)

        # If we're calling the original function above, we need to replace what
        # we're calling with something that acts like the original function.
        # Otherwise, we'll just call the forwarding_call above in an infinite
        # loop.
        if self.func is self.orig_func:
            self.func = self._clone_function(self.func,
                                             code=self._old_code)

    @property
    def func_type(self):
        """The type of function being spied on.

        This will be one of :py:attr:`TYPE_FUNCTION`,
        :py:attr:`TYPE_UNBOUND_METHOD`, or :py:attr:`TYPE_BOUND_METHOD`.

        Type:
            int
        """
        return self._sig.func_type

    @property
    def func_name(self):
        """The name of the function being spied on.

        Type:
            str
        """
        return self._sig.func_name

    @property
    def owner(self):
        """The owner of the method, if a bound or unbound method.

        This will be ``None`` if there is no owner.

        Type:
            type
        """
        return self._sig.owner

    @property
    def called(self):
        """Whether or not the spy was ever called."""
        try:
            return self._real_func.called
        except AttributeError:
            return False

    @property
    def calls(self):
        """The list of calls made to the function.

        Each is an instance of :py:class:`SpyCall`.
        """
        try:
            return self._real_func.calls
        except AttributeError:
            return []

    @property
    def last_call(self):
        """The last call made to this function.

        If a spy hasn't been called yet, this will be ``None``.
        """
        try:
            return self._real_func.last_call
        except AttributeError:
            return None

    def unspy(self, unregister=True):
        """Remove the spy from the function, restoring the original.

        The spy will, by default, be removed from the registry's
        list of spies. This can be disabled by passing ``unregister=False``,
        but don't do that. That's for internal use.

        Args:
            unregister (bool, optional):
                Whether to unregister the spy from the associated agency.
        """
        real_func = self._real_func
        owner = self.owner

        assert hasattr(real_func, 'spy')

        del FunctionSpy._spy_map[id(self)]
        del real_func.spy

        for attr_name in iterkeys(self._FUNC_ATTR_DEFAULTS):
            delattr(real_func, attr_name)

        for func_name in self._PROXY_METHODS:
            delattr(real_func, func_name)

        setattr(real_func, FunctionSig.FUNC_CODE_ATTR, self._old_code)

        if owner is not None:
            self._set_method(owner, self.func_name,
                             self._owner_func_attr_value)

        if unregister:
            self.agency.spies.remove(self)

    def call_original(self, *args, **kwargs):
        """Call the original function being spied on.

        The function will behave as normal, and will not trigger any spied
        behavior or call tracking.

        Args:
            *args (tuple):
                The positional arguments to pass to the function.

            **kwargs (dict):
                The keyword arguments to pass to the function.

        Returns:
            object:
            The return value of the function.

        Raises:
            Exception:
                Any exceptions raised by the function.
        """
        if self.func_type == self.TYPE_BOUND_METHOD:
            return self._call_orig_func(self.owner, *args, **kwargs)
        else:
            if self.func_type == self.TYPE_UNBOUND_METHOD:
                if not args or not isinstance(args[0], self.owner):
                    raise TypeError(
                        'The first argument to %s.call_original() must be '
                        'an instance of %s.%s, since this is an unbound '
                        'method.'
                        % (self._call_orig_func.__name__,
                           self.owner.__module__,
                           self.owner.__name__))

            return self._call_orig_func(*args, **kwargs)

    def called_with(self, *args, **kwargs):
        """Return whether the spy was ever called with the given arguments.

        This will check each and every recorded call to see if the arguments
        and keyword arguments match up. If at least one call does match, this
        will return ``True``.

        Not every argument and keyword argument made in the call must be
        provided to this method. These can be a subset of the positional and
        keyword arguments in the call, but cannot contain any arguments not
        made in the call.

        Args:
            *args (tuple):
                The positional arguments made in the call, or a subset of
                those arguments (starting with the first argument).

            **kwargs (dict):
                The keyword arguments made in the call, or a subset of those
                arguments.

        Returns:
            bool:
            ``True`` if there's at least one call matching these arguments.
            ``False`` if no call matches.
        """
        return any(
            call.called_with(*args, **kwargs)
            for call in self.calls
        )

    def last_called_with(self, *args, **kwargs):
        """Return whether the spy was last called with the given arguments.

        Not every argument and keyword argument made in the call must be
        provided to this method. These can be a subset of the positional and
        keyword arguments in the call, but cannot contain any arguments not
        made in the call.

        Args:
            *args (tuple):
                The positional arguments made in the call, or a subset of
                those arguments (starting with the first argument).

            **kwargs (dict):
                The keyword arguments made in the call, or a subset of those
                arguments.

        Returns:
            bool:
            ``True`` if the last call's arguments match the provided arguments.
            ``False`` if they do not.
        """
        call = self.last_call

        return call is not None and call.called_with(*args, **kwargs)

    def returned(self, value):
        """Return whether the spy was ever called and returned the given value.

        This will check each and every recorded call to see if any of them
        returned the given value.  If at least one call did, this will return
        ``True``.

        Args:
            value (object):
                The expected returned value from the call.

        Returns:
            bool:
            ``True`` if there's at least one call that returned this value.
            ``False`` if no call returned the value.
        """
        return any(
            call.returned(value)
            for call in self.calls
        )

    def last_returned(self, value):
        """Return whether the spy's last call returned the given value.

        Args:
            value (object):
                The expected returned value from the call.

        Returns:
            bool:
            ``True`` if the last call returned this value. ``False`` if it
            did not.
        """
        call = self.last_call

        return call is not None and call.returned(value)

    def raised(self, exception_cls):
        """Return whether the spy was ever called and raised this exception.

        This will check each and every recorded call to see if any of them
        raised an exception of a given type. If at least one call does match,
        this will return ``True``.

        Args:
            exception_cls (type):
                The expected type of exception raised by a call.

        Returns:
            bool:
            ``True`` if there's at least one call raising the given exception
            type. ``False`` if no call matches.
        """
        return any(
            call.raised(exception_cls)
            for call in self.calls
        )

    def last_raised(self, exception_cls):
        """Return whether the spy's last call raised this exception.

        Args:
            exception_cls (type):
                The expected type of exception raised by a call.

        Returns:
            bool:
            ``True`` if the last call raised the given exception type.
            ``False`` if it did not.
        """
        call = self.last_call

        return call is not None and call.raised(exception_cls)

    def raised_with_message(self, exception_cls, message):
        """Return whether the spy's calls ever raised this exception/message.

        This will check each and every recorded call to see if any of them
        raised an exception of a given type with the given message. If at least
        one call does match, this will return ``True``.

        Args:
            exception_cls (type):
                The expected type of exception raised by a call.

            message (unicode):
                The expected message from the exception.

        Returns:
            bool:
            ``True`` if there's at least one call raising the given exception
            type and message. ``False`` if no call matches.
        """
        return any(
            call.raised_with_message(exception_cls, message)
            for call in self.calls
        )

    def last_raised_with_message(self, exception_cls, message):
        """Return whether the spy's last call raised this exception/message.

        Args:
            exception_cls (type):
                The expected type of exception raised by a call.

            message (unicode):
                The expected message from the exception.

        Returns:
            bool:
            ``True`` if the last call raised the given exception type and
            message. ``False`` if it did not.
        """
        call = self.last_call

        return (call is not None and
                call.raised_with_message(exception_cls, message))

    def reset_calls(self):
        """Reset the list of calls recorded by this spy."""
        self._real_func.calls = []
        self._real_func.called = False
        self._real_func.last_call = None

    def __call__(self, *args, **kwargs):
        """Call the original function or fake function for the spy.

        This will be called automatically when calling the spied function,
        recording the call and the results from the call.

        Args:
            *args (tuple):
                Positional arguments passed to the function.

            **kwargs (dict):
                All dictionary arguments either passed to the function or
                default values for unspecified keyword arguments in the
                function signature.

        Returns:
            object:
            The result of the function call.
        """
        record_args = args

        if self.func_type in (self.TYPE_BOUND_METHOD,
                              self.TYPE_UNBOUND_METHOD):
            record_args = record_args[1:]

        sig = self._sig
        real_func = self._real_func
        func = self.func

        call = SpyCall(self, record_args, kwargs)
        real_func.calls.append(call)
        real_func.called = True
        real_func.last_call = call

        if func is None:
            result = None
        else:
            try:
                if sig.has_getter:
                    # This isn't a standard function. It's a descriptor with
                    # a __get__() method. We need to fetch the value it
                    # returns.
                    result = sig.defined_func.__get__(self.owner)

                    if sig.is_slippery:
                        # Since we know this represents a slippery function,
                        # we need to take the function from the descriptor's
                        # result and call it.
                        result = result(*args, **kwargs)
                else:
                    # This is a typical function/method. We can call it
                    # directly.
                    result = func(*args, **kwargs)
            except Exception as e:
                call.exception = e
                raise

            call.return_value = result

        return result

    def __repr__(self):
        """Return a string representation of the spy.

        This is mainly used for debugging information. It will show some
        details on the spied function and call log.

        Returns:
            unicode:
            The resulting string representation.
        """
        func_type = self.func_type

        if func_type == self.TYPE_FUNCTION:
            func_type_str = 'function'
            qualname = self.func_name
        else:
            owner = self.owner

            if func_type == self.TYPE_BOUND_METHOD:
                # It's important we use __class__ instead of type(), because
                # we may be dealing with an old-style class.
                owner_cls = self.owner.__class__

                if owner_cls is type:
                    class_name = owner.__name__
                    func_type_str = 'classmethod'
                else:
                    class_name = owner_cls.__name__
                    func_type_str = 'bound method'
            elif func_type == self.TYPE_UNBOUND_METHOD:
                class_name = owner.__name__
                func_type_str = 'unbound method'

            qualname = '%s.%s of %r' % (class_name, self.func_name, owner)

        call_count = len(self.calls)

        if call_count == 1:
            calls_str = 'call'
        else:
            calls_str = 'calls'

        return '<Spy for %s %s (%d %s)>' % (func_type_str, qualname,
                                            len(self.calls), calls_str)

    def _get_owner_needs_patching(self):
        """Return whether the owner (if any) needs to be patched.

        Owners need patching if they're an instance, if the function is
        slippery, or if the function is defined on an ancestor of the class
        and not the class itself.

        See :py:meth:`_patch_owner` for what patching entails.

        Returns:
            bool:
            ``True`` if the owner needs patching. ``False`` if it does not.
        """
        owner = self.owner

        return (owner is not None and
                (not inspect.isclass(owner) or
                 self._sig.is_slippery or
                 is_attr_defined_on_ancestor(owner, self.func_name)))

    def _patch_owner(self):
        """Patch the owner.

        This will create a new method in place of an existing one on the
        owner, in order to ensure that the owner has its own unique copy
        for spying purposes.

        Patching the owner will avoid collisions between spies in the event
        that the method being spied on is defined by a parent of the owner,
        rather than the owner itself.

        See :py:meth:`_get_owner_needs_patching` the conditions under which
        patching will occur.
        """
        # Construct a replacement function for this method, and
        # re-assign it to the owner. We do this in order to prevent
        # two spies on the same method on two separate instances
        # of the class, or two subclasses of a common class owning the
        # method from conflicting with each other.
        real_func = self._clone_function(self._real_func)
        owner = self.owner

        if self.func_type == self.TYPE_BOUND_METHOD:
            method_type_args = [real_func, owner]

            if pyver[0] >= 3:
                method_type_args.append(owner)

            self._set_method(owner, self.func_name,
                             types.MethodType(real_func, self.owner))
        else:
            self._set_method(owner, self.func_name, real_func)

        self._real_func = real_func

    def _build_proxy_func(self, func):
        """Build the proxy function used to forward calls to this spy.

        This will construct a new function compatible with the signature of
        the provided function, which will call this spy whenever it's called.
        The bytecode of the provided function will be set to that of the
        generated proxy function. See the comment within this function for
        details on how this works.

        Args:
            func (callable):
                The function to proxy.
        """
        # Prior to kgb 2.0, we attempted to optimistically replace
        # methods on a class with a FunctionSpy, forwarding on calls to the
        # fake or original function. This was the design since kgb 1.0, but
        # wasn't sufficient. We realized in the first release that this
        # wouldn't work for standard functions, and so we had two designs:
        # One for methods, one for standard functions.
        #
        # In kgb 2.0, in an effort to standardize behavior, we moved fully
        # to the method originally used for standard functions (largely due
        # to the fact that in Python 3, unbound methods are just standard
        # functions).
        #
        # Standard functions can't be replaced. Unlike a bound function,
        # we can't reliably figure out what dictionary it lives in (it
        # could be a locals() inside another function), and even if we
        # replace that, we can't replace all the copies that have been
        # imported up to this point.
        #
        # The only option is to change what happens when we call the
        # function. That's easier said than done. We can't just replace
        # the __call__ method on it, like you could on a fake method for
        # a class.
        #
        # What we must do is replace the code backing it. This must be
        # done carefully. The "co_freevars" and "co_cellvars" fields must
        # remain the same between the old code and the new one. The
        # actual bytecode and most of the rest of the fields can be taken
        # from another function (the "forwarding_call" function defined
        # inline below).
        #
        # Unfortunately, we no longer have access to "self" (since we
        # replaced "co_freevars"). Instead, we store a global mapping
        # of codes to spies.
        #
        # We also must build the function dynamically, using exec().
        # The reason is that we want to accurately mimic the function
        # signature of the original function (in terms of specifying
        # the correct positional and keyword arguments). The way we format
        # arguments depends on the version of Python. We maintain
        # compatibility through the FunctionSig.format_arg_spec() methods
        # (which has implementations for both Python 2 and 3).
        #
        # We do use different values for the default keyword arguments,
        # which is actually okay. Within the function, these will all be
        # set to a special value (_UNSET_ARG), which is used later for
        # determining which keyword arguments were provided and which
        # were not. Anything attempting to inspect this function with
        # getargspec(), getfullargspec(), or inspect.Signature will get the
        # defaults from the original function, by way of the
        # original func.func_defaults attribute (on Python 2) or
        # __defaults__ (on Python 3).
        #
        # This forwarding function then needs to call the forwarded
        # function in exactly the same manner as it was called. That is,
        # if a keyword argument's value was passed in as a positional
        # argument, or a positional argument was specified as a keyword
        # argument in the call, then the forwarded function must be
        # called the same way, for argument tracking and signature
        # compatibility.
        #
        # In order to do this, we have to find out how forwarding_call was
        # called. This can be done by inspecting the bytecode of the
        # call in the parent frame and getting the number of positional
        # and keyword arguments used. From there, we can determine which
        # argument slots were specified and start looking for any keyword
        # arguments not set to _UNSET_ARG, passing them through to the
        # original function in the same order. Doing this requires
        # another exec() call in order to build out those arguments.
        #
        # Within the function, all imports and variables are prefixed to
        # avoid the possibility of collisions with arguments.
        #
        # Since we're only overriding the code, all other attributes (like
        # func_defaults, __doc__, etc.) will make use of those from
        # the original function.
        #
        # The result is that we've completely hijacked the original
        # function, making it call our own forwarding function instead.
        # It's a wonderful bag of tricks that are fully legal, but really
        # dirty. Somehow, it all really fits in with the idea of spies,
        # though.
        sig = self._sig
        spy_id = id(self)
        real_func = self._real_func

        forwarding_call = self._compile_forwarding_call_func(
            func=func,
            sig=sig,
            spy_id=spy_id)

        old_code, new_code = self._build_spy_code(func, forwarding_call)
        self._old_code = old_code
        setattr(real_func, FunctionSig.FUNC_CODE_ATTR, new_code)

        # Update our spy lookup map so the proxy function can easily find
        # the spy instance.
        FunctionSpy._spy_map[spy_id] = self

        # Update the attributes on the function. we'll be placing all spy
        # state and some proxy methods pointing to this spy, so that we can
        # easily access them through the function.
        real_func.spy = self
        real_func.__dict__.update(copy.deepcopy(self._FUNC_ATTR_DEFAULTS))

        for proxy_func_name in self._PROXY_METHODS:
            assert not hasattr(real_func, proxy_func_name)
            setattr(real_func, proxy_func_name, getattr(self, proxy_func_name))

    def _compile_forwarding_call_func(self, func, sig, spy_id):
        """Compile a forwarding call function for the spy.

        This will build the Python code for a function that approximates the
        function we're spying on, with the same function definition and
        closure behavior.

        Version Added:
            7.1

        Args:
            func (callable):
                The function being spied on.

            sig (kgb.signature.BaseFunctionSig):
                The function signature to use for this function.

            spy_id (int):
                The ID used for the spy registration.

        Returns:
            callable:
            The resulting forwarding function.
        """
        closure_vars = func.__code__.co_freevars
        use_closure = bool(closure_vars)

        # If the function is in a closure, we'll need to mirror the closure
        # state by using the referenced variables within _kgb_forwarding_call
        # and by defining those variables within a closure.
        #
        # Start by setting up a string that will use each closure.
        if use_closure:
            # This is an efficient way of referencing each variable without
            # side effects (at least in Python 2.7 through 3.11). Tuple
            # operations are fast and compact, and don't risk any inadvertent
            # invocation of the variables.
            use_closure_vars_str = (
                '        (%s)\n'
                % ', '.join(func.__code__.co_freevars)
            )
        else:
            # No closure, so nothing to set up.
            use_closure_vars_str = ''

        # Now define the forwarding call. This will always be nested within
        # either a closure of an if statement, letting us build a single
        # version at the right indentation level, keeping this as fast and
        # portable as possible.
        forwarding_call_str = (
            '    def _kgb_forwarding_call(%(params)s):\n'
            '        from kgb.spies import FunctionSpy as _kgb_cls\n'
            '%(use_closure_vars)s'
            '        _kgb_l = locals()\n'
            '        return _kgb_cls._spy_map[%(spy_id)s](%(call_args)s)\n'
            % {
                'call_args': sig.format_forward_call_args(),
                'params': sig.format_arg_spec(),
                'spy_id': spy_id,
                'use_closure_vars': use_closure_vars_str,
            }
        )

        if use_closure:
            # We now need to put _kgb_forwarding_call in a closure, to mirror
            # the behavior of the spied function. The closure will provide
            # the closure variables, and will return the function we can
            # later use.
            func_code_str = (
                'def _kgb_forwarding_call_closure(%(params)s):\n'
                '%(forwarding_call)s'
                '    return _kgb_forwarding_call\n'
                % {
                    'forwarding_call': forwarding_call_str,
                    'params': ', '.join(
                        '%s=None' % _var
                        for _var in closure_vars
                    )
                }
            )
        else:
            # No closure, so just define the function as-is. We will need to
            # wrap in an "if 1:" though, just to ensure indentation is fine.
            func_code_str = (
                'if 1:\n'
                '%s'
                % forwarding_call_str
            )

        # We can now build our function.
        exec_locals = {}

        try:
            eval(compile(func_code_str, '<string>', 'exec'),
                 globals(), exec_locals)
        except Exception as e:
            raise InternalKGBError(
                'Unable to compile a spy function for %(func)r: %(error)s'
                '\n\n'
                '%(code)s'
                % {
                    'code': func_code_str,
                    'error': e,
                    'func': func,
                })

        # Grab the resulting compiled function out of the locals.
        if use_closure:
            # It's in our closure, so call that and get the result.
            forwarding_call = exec_locals['_kgb_forwarding_call_closure']()
        else:
            forwarding_call = exec_locals['_kgb_forwarding_call']

        assert forwarding_call is not None

        return forwarding_call

    def _build_spy_code(self, func, forwarding_call):
        """Build a CodeType to inject into the spied function.

        This will create a function bytecode object that contains a mix of
        attributes from the original function and the forwarding call. The
        result can be injected directly into the spied function, containing
        just the right data to impersonate the function and call our own
        logic.

        Version Added:
            7.1

        Args:
            func (callable):
                The function being spied on.

            forwarding_call (callable):
                The spy forwarding call we built.

        Returns:
            tuple:
            A 2-tuple containing:

            1. The spied function's code object (:py:class:`types.CodeType`).
            1. The new spy code object (:py:class:`types.CodeType`).
        """
        old_code = getattr(func, FunctionSig.FUNC_CODE_ATTR)
        temp_code = getattr(forwarding_call, FunctionSig.FUNC_CODE_ATTR)

        assert old_code != temp_code

        if hasattr(old_code, 'replace'):
            # Python >= 3.8
            #
            # It's important we replace the code instead of building a new
            # one when possible. On Python 3.11, this will ensure that
            # state needed for exceptions (co_positions()) will be set
            # correctly.
            #
            # NOTE: Prior to kgb 7.2, we had set co_freevars and co_vellvars
            #       here. This caused crashes with Python 3.13 beta 2 (the
            #       latest release as of this writing -- June 19, 2024). We
            #       don't appear to actually need or want to set these on
            #       Python 3, so we removed this.
            replace_kwargs = {
                'co_name': old_code.co_name,
            }

            if pyver >= (3, 11):
                replace_kwargs['co_qualname'] = old_code.co_qualname

            # Python 3.13+ includes a warning when assigning a code object if
            # the generator flag doesn't match. The actual bug that caused them
            # to add this warning can be found here:
            #
            #     https://github.com/python/cpython/issues/81137
            #
            # This is a pretty isolated, unique case where the user was
            # replacing the code object for a lambda with the gi_code from a
            # generator expression, causing CPython to crash. This is purely
            # an issue with the gi_code object, and does not affect generator
            # functions (or even a lambda that returns a generator expression)
            # in the same way, so kgb's use is fine. We therefore mirror these
            # flags onto our replacement code object to prevent the warning.
            #
            # Our forwarding function won't have any of these flags set, so we
            # can just copy the positive case for all of them if they exist on
            # the function we're replacing.
            #
            # We only do this on 3.11+ because on earlier versions this causes
            # the spy to not execute.
            if pyver >= (3, 11):
                mirror_flags = old_code.co_flags & (
                    inspect.CO_GENERATOR |
                    inspect.CO_COROUTINE |
                    inspect.CO_ASYNC_GENERATOR)
                replace_kwargs['co_flags'] = temp_code.co_flags | mirror_flags

            new_code = temp_code.replace(**replace_kwargs)
        else:
            # Python <= 3.7
            #
            # We have to build this manually, using a combination of the
            # two. We won't bother with anything newer than Python 3.7.
            code_args = [temp_code.co_argcount]

            if pyver >= (3, 0):
                code_args.append(temp_code.co_kwonlyargcount)

            code_args += [
                temp_code.co_nlocals,
                temp_code.co_stacksize,
                temp_code.co_flags,
                temp_code.co_code,
                temp_code.co_consts,
                temp_code.co_names,
                temp_code.co_varnames,
                temp_code.co_filename,
                old_code.co_name,
                temp_code.co_firstlineno,
                temp_code.co_lnotab,
                old_code.co_freevars,
                old_code.co_cellvars,
            ]

            new_code = types.CodeType(*code_args)

        assert new_code != old_code
        assert new_code != temp_code

        return old_code, new_code

    def _clone_function(self, func, code=None):
        """Clone a function, optionally providing new bytecode.

        This will create a new function that contains all the state of the
        original (including annotations and any default argument values).

        Args:
            func (types.FunctionType):
                The function to clone.

            code (types.CodeType, optional):
                The new bytecode for the function. If not specified, the
                original function's bytecode will be used.

        Returns:
            types.FunctionType:
            The new function.
        """
        cloned_func = types.FunctionType(
            code or getattr(func, FunctionSig.FUNC_CODE_ATTR),
            getattr(func, FunctionSig.FUNC_GLOBALS_ATTR),
            getattr(func, FunctionSig.FUNC_NAME_ATTR),
            getattr(func, FunctionSig.FUNC_DEFAULTS_ATTR),
            getattr(func, FunctionSig.FUNC_CLOSURE_ATTR))

        if pyver[0] >= 3:
            # Python 3.x doesn't support providing any of the new
            # metadata introduced in Python 3.x to the constructor of
            # FunctionType. We have to set those manually.
            for attr in ('__annotations__', '__kwdefaults__'):
                setattr(cloned_func, attr, copy.deepcopy(getattr(func, attr)))

        return cloned_func

    def _set_method(self, owner, name, method):
        """Set a new method on an object.

        This will set the method (or delete the attribute for one if setting
        ``None``).

        If setting on a class, this will use a standard
        :py:func:`setattr`/:py:func:`delattr`.

        If setting on an instance, this will use a standard
        :py:meth:`object.__setattr__`/:py:meth:`object.__delattr__` (in order
        to avoid triggering a subclass-defined version of
        :py:meth:`~object.__setattr__`/:py:meth:`~object.__delattr__`, which
        might lose or override our spy).

        Args:
            owner (type or object):
                The class or instance to set the method on.

            name (unicode):
                The name of the attribute to set for the method.

            method (types.MethodType):
                The method to set (or ``None`` to delete).
        """
        if inspect.isclass(owner):
            if method is None:
                delattr(owner, name)
            else:
                setattr(owner, name, method)
        elif method is None:
            try:
                object.__delattr__(owner, name)
            except TypeError as e:
                if str(e) == "can't apply this __delattr__ to instance object":
                    # This is likely Python 2.6, or early 2.7, where we can't
                    # run object.__delattr__ on old-style classes. We have to
                    # fall back to modifying __dict__. It's not ideal but
                    # doable.
                    del owner.__dict__[name]
        else:
            try:
                object.__setattr__(owner, name, method)
            except TypeError as e:
                if str(e) == "can't apply this __setattr__ to instance object":
                    # Similarly as above, we have to default to dict
                    # manipulation on this version of Python.
                    owner.__dict__[name] = method