File: test.py

package info (click to toggle)
pyuvm 4.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 992 kB
  • sloc: python: 8,662; makefile: 238; vhdl: 206; sh: 7
file content (190 lines) | stat: -rw-r--r-- 5,153 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import FallingEdge

from pyuvm import *
from pyuvm import utility_classes


class CocotbProxy:
    def __init__(self, dut, label):
        self.dut = dut
        ConfigDB().set(None, "*", label, self)
        self.driver_queue = UVMQueue(maxsize=1)
        self.cmd_mon_queue = UVMQueue(maxsize=0)
        self.result_mon_queue = UVMQueue(maxsize=0)
        self.done = cocotb.triggers.Event(name="Done")

    def send_op(self, aa, bb, op):
        self.driver_queue.put((aa, bb, op))

    def get_cmd(self):
        return self.cmd_mon_queue.get()

    def get_result(self):
        return self.result_mon_queue.get()

    async def reset(self):
        await FallingEdge(self.dut.clk)
        self.dut.reset_n = 0
        self.dut.A = 0
        self.dut.B = 0
        self.dut.op = 0
        await FallingEdge(self.dut.clk)
        self.dut.reset_n = 1
        await FallingEdge(self.dut.clk)

    async def driver_bfm(self):
        self.dut.start = self.dut.A = self.dut.B = 0
        self.dut.op = 0
        while True:
            await FallingEdge(self.dut.clk)
            if self.dut.start == 0 and self.dut.done == 0:
                try:
                    (aa, bb, op) = self.driver_queue.get_nowait()
                    self.dut.A = aa
                    self.dut.B = bb
                    self.dut.op = op
                    self.dut.start = 1
                except queue.Empty:
                    pass
            elif self.dut.start == 1:
                if self.dut.done.value == 1:
                    self.dut.start = 0

    async def cmd_mon_bfm(self):
        prev_start = 0
        while True:
            await FallingEdge(self.dut.clk)
            try:
                start = int(self.dut.start.value)
            except ValueError:
                start = 0
            if start == 1 and prev_start == 0:
                self.cmd_mon_queue.put_nowait(
                    (int(self.dut.A), int(self.dut.B), int(self.dut.op))
                )
            prev_start = start

    async def result_mon_bfm(self):
        prev_done = 0
        while True:
            await FallingEdge(self.dut.clk)
            try:
                done = int(self.dut.done)
            except ValueError:
                done = 0

            if done == 1 and prev_done == 0:
                self.result_mon_queue.put_nowait(int(self.dut.result.value))
            prev_done = done


def run_uvm_test(test_name):
    root = uvm_root()
    root.run_test(test_name)


# noinspection PyArgumentList,PyAsyncCall
# @cocotb.test()
async def test_alu(dut):
    clock = Clock(dut.clk, 2, "us")
    cocotb.start_soon(clock.start())
    proxy = CocotbProxy(dut, "PROXY")
    await proxy.reset()
    cocotb.start_soon(proxy.driver_bfm())
    cocotb.start_soon(proxy.cmd_mon_bfm())
    cocotb.start_soon(proxy.result_mon_bfm())
    await FallingEdge(dut.clk)
    test_thread = threading.Thread(
        target=run_uvm_test, args=("CocotbAluTest",), name="run_test"
    )
    test_thread.start()
    await proxy.done.wait()
    await FallingEdge(dut.clk)


# noinspection PyArgumentList,PyAsyncCall
@cocotb.test()
async def test_queue(dut):
    "Test basic QUEUE functions"
    qq = utility_classes.UVMQueue(maxsize=1)
    await qq.put(5)
    got = await qq.get()
    assert got == 5
    await qq.put("x")
    peeked = await qq.peek()
    assert "x" == peeked
    got = await qq.get()
    qq.put_nowait(5)
    got = qq.get_nowait()
    assert got == 5
    qq.put_nowait("x")
    peeked = qq.peek_nowait()
    assert "x" == peeked
    got = qq.get_nowait()
    assert got == peeked


async def delay_put(qq, delay, data):
    for dd in data:
        await qq.put(dd)


async def delay_get(qq, delay):
    ret_dat = []
    datum = await qq.get()
    ret_dat.append(datum)
    while datum is not None:
        datum = await qq.get()
        ret_dat.append(datum)
    return ret_dat


async def delay_peek(qq, delay):
    return await qq.peek()


@cocotb.test()
async def wait_on_queue(dut):
    """Test put and get with waits"""
    clock = Clock(dut.clk, 2, "us")  # make the simulator wait
    cocotb.start_soon(clock.start())
    qq = utility_classes.UVMQueue(maxsize=1)
    send_data = [0.01, "two", 3, None]
    cocotb.start_soon(delay_put(qq, 0.01, send_data))
    got_data = await delay_peek(qq, 0.01)
    assert got_data == 0.01
    got_data = await delay_get(qq, 0.01)


@cocotb.test()
async def nowait_tests(dut):
    """Test the various nowait flavors"""
    qq = utility_classes.UVMQueue(maxsize=1)
    await qq.put(5)
    try:
        qq.put_nowait(6)
        assert False
    except cocotb.queue.QueueFull:
        pass
    try:
        xx = qq.peek_nowait()
        assert xx == 5
    except cocotb.queue.QueueEmpty:
        assert False
    try:
        xx = qq.get_nowait()
        assert xx == 5
    except cocotb.queue.QueueEmpty:
        assert False
    try:
        xx = qq.peek_nowait()
        assert False
    except cocotb.queue.QueueEmpty:
        pass
    try:
        xx = qq.get_nowait()
        assert False
    except cocotb.queue.QueueEmpty:
        pass