1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
class circbuf:
"""A circular buffer of Python values.
Examples::
>>> cb = circbuf(3)
>>> cb.is_empty()
1
>>> cb.put('first')
>>> cb.is_empty()
0
>>> cb.put('second')
>>> cb.put('third')
>>> cb.is_full()
1
>>> cb.put('fourth')
>>> cb.get()
'second'
>>> cb.get()
'third'
>>> cb.get()
'fourth'
>>> cb.is_empty()
1
inv::
# there can be from 0 to max items inclusive on the buffer
0 <= self.len <= len(self.buf)
# g is a valid index into buf
0 <= self.g < len(self.buf)
# p is also a valid index into buf
0 <= self.p < len(self.buf)
# there are len items between get and put
(self.p - self.g) % len(self.buf) == self.len % len(self.buf)
"""
def __init__(self, leng):
"""Construct an empty circular buffer.
pre::
leng > 0
post[self]::
self.is_empty() and len(self.buf) == leng
"""
self.buf = [None for x in range(leng)]
self.len = self.g = self.p = 0
def is_empty(self):
"""Returns true only if circbuf has no items.
post[]::
__return__ == (self.len == 0)
"""
return self.len == 0
def is_full(self):
"""Returns true only if circbuf has no space.
post[]::
__return__ == (self.len == len(self.buf))
"""
return self.len == len(self.buf)
def get(self):
"""Retrieves an item from a non-empty circular buffer.
pre::
not self.is_empty()
post[self.len, self.g]::
self.len == __old__.self.len - 1
__return__ == self.buf[__old__.self.g]
"""
result = self.buf[self.g]
self.g = (self.g + 1) % len(self.buf)
self.len -= 1
return result
def put(self, item):
"""Puts an item onto a circular buffer.
post[self.len, self.p, self.g, self.buf]::
# if the circbuf was full on entry, then an entry
# was bumped
implies(__old__.self.len == len(self.buf),
self.len == __old__.self.len,
self.len == __old__.self.len + 1 and \
self.g == __old__.self.g)
# item is now in the buffer
self.buf[__old__.self.p] == item
# but no other part of the buffer has changed
self.buf[:__old__.self.p] == __old__.self.buf[:__old__.self.p]
self.buf[__old__.self.p+1:] == __old__.self.buf[__old__.self.p+1:]
__return__ is None
"""
self.buf[self.p] = item
self.p = (self.p + 1) % len(self.buf)
if self.len == len(self.buf):
self.g = self.p
else:
self.len += 1
# enable contract checking
import contract
contract.checkmod(__name__)
def _test():
import doctest, circbuf
return doctest.testmod(circbuf)
if __name__ == '__main__':
_test()
|