File: test.py

package info (click to toggle)
golang-github-lunny-nodb 0.0~git20160621.0.fc1ef06-5
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 424 kB
  • sloc: python: 259; sh: 50; makefile: 3
file content (120 lines) | stat: -rw-r--r-- 2,664 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#coding: utf-8

import random, string

import redis
import ledis

from redis_import import copy, scan, set_ttl

rds = redis.Redis()
lds = ledis.Ledis(port=6380)


def random_word(words, length):
    return ''.join(random.choice(words) for i in range(length))


def get_words():
    word_file = "/usr/share/dict/words"
    words = open(word_file).read().splitlines()
    return words[:1000]


def get_mapping(words, length=1000):
    d = {}
    for word in words:
        d[word] = random.randint(1, length)
    return d


def random_string(client, words, length=1000):
    d = get_mapping(words, length)
    client.mset(d)


def random_hash(client, words, length=1000):
    d = get_mapping(words, length)
    client.hmset("hashName", d)


def random_list(client, words, length=1000):
    client.lpush("listName", *words)


def random_zset(client, words, length=1000):
    d = get_mapping(words, length)
    client.zadd("zsetName", **d)


def test():
    words = get_words()
    print "Flush all redis data before insert new."
    rds.flushall()

    random_string(rds, words)
    print "random_string done"

    random_hash(rds, words)
    print "random_hash done"
    
    random_list(rds, words)
    print "random_list done"

    random_zset(rds, words)
    print "random_zset done"


    lds.lclear("listName")
    lds.hclear("hashName")
    lds.zclear("zsetName")
    copy(rds, lds, convert=True)

    # for all keys
    keys = scan(rds, 1000)
    for key in keys:
        if rds.type(key) == "string" and not lds.exists(key):
            print key
            print "String data not consistent"

    # for list
    l1 = rds.lrange("listName", 0, -1)
    l2 = lds.lrange("listName", 0, -1)
    assert l1 == l2
   
    #for hash
    for key in keys:
        if rds.type(key) == "hash":
            assert rds.hgetall(key) == lds.hgetall(key)
            assert sorted(rds.hkeys(key)) == sorted(lds.hkeys(key))
            assert sorted(rds.hvals(key)) == sorted(lds.hvals(key))

    # for zset
    z1 = rds.zrange("zsetName", 0, -1, withscores=True)
    z2 = lds.zrange("zsetName", 0, -1, withscores=True)
    assert z1 == z2


def ledis_ttl(ledis_client, key, k_type):
    ttls = {
        "string": lds.ttl,
        "list": lds.lttl,
        "hash": lds.httl,
        "zset": lds.zttl,
    }
    return ttls[k_type](key)


def test_ttl():
    keys, total = scan(rds, 1000)
    for key in keys:
        k_type = rds.type(key)
        rds.expire(key, (60 * 60 * 24))
        set_ttl(rds, lds, key, k_type)
        if rds.ttl(key):
            assert ledis_ttl(lds, key, k_type) > 0

if __name__ == "__main__":
    test()
    test_ttl()
    print "Test passed."