1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
# Copyright (C) 2006-2010 Dan Pascu. See LICENSE for details.
#
"""Memory debugging helper.
Using this module:
1. at the beginning of your program import this module...
from application.debug.memory import *
2. call memory_dump() later, when you want to check the memory leaks
Note: when debugging for memory leaks is enabled by the inclusion of
this code, the garbage collector will be unable to properly free
circular references and they will show up in the garbage objects
list (this will make the application constantly grow). However
if this code is not loaded, the garbage collector will properly
detect and free the objects with circular references and the
size of the application will remain constant even in the
presence of circular references.
"""
__all__ = ['memory_dump']
import gc
import types
from collections import deque
class Node(object):
def __init__(self, object):
self.object = object
self.successors = None
self.visitable_successors = None
class Cycle(tuple):
def __init__(self, *args, **kwargs):
tuple.__init__(self)
self.collectable = all(not hasattr(obj, '__del__') for obj in self)
def __eq__(self, other):
if len(self) != len(other):
return False
for i, obj in enumerate(other):
if obj is self[0]:
if tuple.__eq__(self, (other[i:] + other[:i])):
return True
else:
return len(self) == 0
def __hash__(self):
return sum(id(node) for node in self)
def __repr__(self):
return '%s%s' % (self.__class__.__name__, tuple.__repr__(self))
def __str__(self):
index = max_priority = 0
for i, obj in enumerate(self):
if obj is getattr(self[i-1], '__dict__', None):
continue
if isinstance(obj, types.MethodType):
priority = 0
elif isinstance(obj, types.FunctionType):
priority = 2
elif type(obj).__module__ in ('__builtin__', 'builtins'):
priority = 1
elif isinstance(obj, (tuple, list, dict, set, frozenset, str, unicode)):
priority = 3
else:
priority = 4
if priority > max_priority:
index, max_priority = i, priority
cycle = deque(self[index:] + self[:index])
string = ''
firstobj = cycle[0] if cycle else None
while cycle:
obj = cycle.popleft()
string += repr(obj)
if cycle and cycle[0] is getattr(obj, '__dict__', None):
d = cycle.popleft()
try:
if cycle:
string += ' .%s' % (key for key, value in d.iteritems() if value is cycle[0]).next()
else:
string += ' .%s' % (key for key, value in d.iteritems() if value is firstobj).next()
except StopIteration:
string += ' .__dict__ -> %s' % repr(d)
string += ' -> '
string += repr(firstobj)
return string
def memory_dump(show_cycles=True, show_objects=False):
print "\nGARBAGE:"
gc.collect()
garbage = gc.garbage[:]
if show_cycles:
nodes = dict((id(obj), Node(obj)) for obj in garbage)
for obj in garbage:
nodes[id(obj)].successors = tuple(nodes[id(s)] for s in gc.get_referents(obj) if id(s) in nodes)
nodes[id(obj)].visitable_successors = deque(nodes[id(obj)].successors)
cycles = set()
remaining_nodes = nodes.copy()
while remaining_nodes:
path = [remaining_nodes.itervalues().next()]
while path:
node = path[-1]
remaining_nodes.pop(id(node.object), None)
if node.visitable_successors:
succ = node.visitable_successors.pop()
if succ in path:
cycles.add(Cycle(n.object for n in path[path.index(succ):]))
else:
path.append(succ)
else:
node.visitable_successors = deque(node.successors)
path.pop(-1)
for node in nodes.itervalues():
node.successors = node.visitable_successors = None
print "\nCOLLECTABLE CYCLES:"
for cycle in (c for c in cycles if c.collectable):
print cycle
print "\nUNCOLLECTABLE CYCLES:"
for cycle in (c for c in cycles if not c.collectable):
print cycle
if show_objects:
try:
import fcntl, struct, sys, termios
console_width = struct.unpack('HHHH', fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, struct.pack('HHHH', 0, 0, 0, 0)))[1]
except:
console_width = 80
print "\nGARBAGE OBJECTS:"
for x in garbage:
s = str(x)
if len(s) > console_width-2:
s = s[:console_width-5] + '...'
print "%s\n %s" % (type(x), s)
gc.enable()
gc.collect() ## Ignore collectable garbage up to this point
gc.set_debug(gc.DEBUG_LEAK)
|