1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import FallingEdge
from pyuvm import *
from pyuvm import utility_classes
class CocotbProxy:
def __init__(self, dut, label):
self.dut = dut
ConfigDB().set(None, "*", label, self)
self.driver_queue = UVMQueue(maxsize=1)
self.cmd_mon_queue = UVMQueue(maxsize=0)
self.result_mon_queue = UVMQueue(maxsize=0)
self.done = cocotb.triggers.Event(name="Done")
def send_op(self, aa, bb, op):
self.driver_queue.put((aa, bb, op))
def get_cmd(self):
return self.cmd_mon_queue.get()
def get_result(self):
return self.result_mon_queue.get()
async def reset(self):
await FallingEdge(self.dut.clk)
self.dut.reset_n = 0
self.dut.A = 0
self.dut.B = 0
self.dut.op = 0
await FallingEdge(self.dut.clk)
self.dut.reset_n = 1
await FallingEdge(self.dut.clk)
async def driver_bfm(self):
self.dut.start = self.dut.A = self.dut.B = 0
self.dut.op = 0
while True:
await FallingEdge(self.dut.clk)
if self.dut.start == 0 and self.dut.done == 0:
try:
(aa, bb, op) = self.driver_queue.get_nowait()
self.dut.A = aa
self.dut.B = bb
self.dut.op = op
self.dut.start = 1
except queue.Empty:
pass
elif self.dut.start == 1:
if self.dut.done.value == 1:
self.dut.start = 0
async def cmd_mon_bfm(self):
prev_start = 0
while True:
await FallingEdge(self.dut.clk)
try:
start = int(self.dut.start.value)
except ValueError:
start = 0
if start == 1 and prev_start == 0:
self.cmd_mon_queue.put_nowait(
(int(self.dut.A), int(self.dut.B), int(self.dut.op))
)
prev_start = start
async def result_mon_bfm(self):
prev_done = 0
while True:
await FallingEdge(self.dut.clk)
try:
done = int(self.dut.done)
except ValueError:
done = 0
if done == 1 and prev_done == 0:
self.result_mon_queue.put_nowait(int(self.dut.result.value))
prev_done = done
def run_uvm_test(test_name):
root = uvm_root()
root.run_test(test_name)
# noinspection PyArgumentList,PyAsyncCall
# @cocotb.test()
async def test_alu(dut):
clock = Clock(dut.clk, 2, "us")
cocotb.start_soon(clock.start())
proxy = CocotbProxy(dut, "PROXY")
await proxy.reset()
cocotb.start_soon(proxy.driver_bfm())
cocotb.start_soon(proxy.cmd_mon_bfm())
cocotb.start_soon(proxy.result_mon_bfm())
await FallingEdge(dut.clk)
test_thread = threading.Thread(
target=run_uvm_test, args=("CocotbAluTest",), name="run_test"
)
test_thread.start()
await proxy.done.wait()
await FallingEdge(dut.clk)
# noinspection PyArgumentList,PyAsyncCall
@cocotb.test()
async def test_queue(dut):
"Test basic QUEUE functions"
qq = utility_classes.UVMQueue(maxsize=1)
await qq.put(5)
got = await qq.get()
assert got == 5
await qq.put("x")
peeked = await qq.peek()
assert "x" == peeked
got = await qq.get()
qq.put_nowait(5)
got = qq.get_nowait()
assert got == 5
qq.put_nowait("x")
peeked = qq.peek_nowait()
assert "x" == peeked
got = qq.get_nowait()
assert got == peeked
async def delay_put(qq, delay, data):
for dd in data:
await qq.put(dd)
async def delay_get(qq, delay):
ret_dat = []
datum = await qq.get()
ret_dat.append(datum)
while datum is not None:
datum = await qq.get()
ret_dat.append(datum)
return ret_dat
async def delay_peek(qq, delay):
return await qq.peek()
@cocotb.test()
async def wait_on_queue(dut):
"""Test put and get with waits"""
clock = Clock(dut.clk, 2, "us") # make the simulator wait
cocotb.start_soon(clock.start())
qq = utility_classes.UVMQueue(maxsize=1)
send_data = [0.01, "two", 3, None]
cocotb.start_soon(delay_put(qq, 0.01, send_data))
got_data = await delay_peek(qq, 0.01)
assert got_data == 0.01
got_data = await delay_get(qq, 0.01)
@cocotb.test()
async def nowait_tests(dut):
"""Test the various nowait flavors"""
qq = utility_classes.UVMQueue(maxsize=1)
await qq.put(5)
try:
qq.put_nowait(6)
assert False
except cocotb.queue.QueueFull:
pass
try:
xx = qq.peek_nowait()
assert xx == 5
except cocotb.queue.QueueEmpty:
assert False
try:
xx = qq.get_nowait()
assert xx == 5
except cocotb.queue.QueueEmpty:
assert False
try:
xx = qq.peek_nowait()
assert False
except cocotb.queue.QueueEmpty:
pass
try:
xx = qq.get_nowait()
assert False
except cocotb.queue.QueueEmpty:
pass
|