1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
import os
import unittest
import multiprocess
from molotov.shared.counter import Counter, Counters
# pre-forked variable
_DATA = Counters("test")
def run_worker(value):
_DATA["test"] += value
_DATA["test"] -= value
_DATA["test"] += value
class TestCounters(unittest.TestCase):
def test_operators(self):
c1 = Counter("ok")
c2 = Counter("ok")
c1.value = 4
c2.value = 5
self.assertTrue(c1 <= c2)
self.assertTrue(c1 < c2)
self.assertTrue(c1 >= 2)
self.assertTrue(c1 > 2)
self.assertTrue(c1 == 4)
self.assertTrue(c1 != 5)
c2.value = 4
c2 += Counter("ok")
self.assertTrue(c1 == c2)
repr(c1)
str(c1)
def _t():
c = Counter("ok")
c += 6.2
self.assertRaises(NotImplementedError, _t)
def _c():
Counter("ok") != 6.3 # noqa
self.assertRaises(TypeError, _c)
def test_interface(self):
data = Counters("one", "two")
self.assertTrue("one" in data)
self.assertEqual(len(data.keys()), 2)
for key in data:
data[key] = 0
self.assertTrue(data[key], 0)
for key, value in data.items():
data[key] = value
self.assertTrue(data[key], value)
data.values()
repr(data)
str(data)
self.assertRaises(KeyError, data.__setitem__, "meh", 1)
self.assertRaises(TypeError, data.__setitem__, "one", "1")
def test_mapping(self):
# making sure it works like a defaultdict(int)
data = Counters("one", "two")
self.assertTrue(data["one"].value == 0)
data["one"] += 10
data["one"] -= 1
data["two"] = 4
self.assertTrue(data["one"].value, 9)
self.assertTrue(data["two"].value, 4)
@unittest.skipIf(os.name == "nt", "win32")
def test_multiprocess(self):
# now let's try with several processes
pool = multiprocess.Pool(10)
try:
inputs = [1] * 3000
pool.map(run_worker, inputs)
self.assertEqual(_DATA["test"].value, 3000)
finally:
pool.close()
|