File: test_commands.py

package info (click to toggle)
nutcracker 0.5.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 3,192 kB
  • sloc: ansic: 15,221; sh: 5,284; python: 1,230; php: 300; makefile: 130
file content (103 lines) | stat: -rw-r--r-- 2,337 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
#coding: utf-8

from .common import *

def test_linsert():
    r = getconn()

    r.rpush('mylist', 'Hello')
    r.rpush('mylist', 'World')
    r.linsert('mylist', 'BEFORE', 'World', 'There')

    rst = r.lrange('mylist', 0, -1)
    assert_equal([b'Hello', b'There', b'World'], rst)

def test_exists():
    r = getconn()

    r.set('exists1', 'foo')
    assert_equal(1, r.exists('exists1'))
    assert_equal(0, r.exists('doesnotexist'))

def test_lpush_lrange():
    r = getconn()

    vals = [b'vvv-%d' % i for i in range(10) ]
    assert([] == r.lrange('mylist', 0, -1))

    r.lpush('mylist', *vals)
    rst = r.lrange('mylist', 0, -1)

    assert(10 == len(rst))

def test_hscan():
    r = getconn()

    kv = {b'kkk-%d' % i : b'vvv-%d' % i for i in range(10)}
    r.hmset('a', kv)

    cursor, dic = r.hscan('a')
    assert(str(cursor) == '0')
    assert(dic == kv)

    cursor, dic = r.hscan('a', match='kkk-5')
    assert(str(cursor) == '0')
    assert(dic == {b'kkk-5': b'vvv-5'})

def test_hscan_large():
    r = getconn()

    kv = {b'x'* 100 + (b'kkk-%d' % i) : (b'vvv-%d' % i)
          for i in range(1000)}
    r.hmset('a', kv)

    cursor = '0'
    dic = {}
    while True:
        cursor, t = r.hscan('a', cursor, count=10)
        for k, v in list(t.items()):
            dic[k] = v

        if '0' == str(cursor):
            break

    assert(dic == kv)

    cursor, dic = r.hscan('a', '0', match='*kkk-5*', count=1000)
    if str(cursor) == '0':
        assert(len(dic) == 111)
    else:
        assert(len(dic) == 111)

        #again.
        cursor, dic = r.hscan('a', cursor, match='*kkk-5*', count=1000)
        assert(str(cursor) == '0')
        assert(len(dic) == 0)

def test_zscan():
    r = getconn()

    r.zadd('a', {'a': 1, 'b': 2, 'c': 3})

    cursor, pairs = r.zscan('a')
    assert_equal(0, cursor)
    assert_equal({(b'a', 1), (b'b', 2), (b'c', 3)}, set(pairs))

    cursor, pairs = r.zscan('a', match='a')
    assert_equal(0, cursor)
    assert_equal({(b'a', 1)}, set(pairs))

def test_sscan():
    r = getconn()

    r.sadd('a', 1, 2, 3)

    cursor, members = r.sscan('a')
    assert_equal(0, cursor)
    assert_equal({b'1', b'2', b'3'}, set(members))

    cursor, members = r.sscan('a', match='1')
    assert_equal('0', str(cursor))
    assert_equal({b'1'}, set(members))