File: dab_tb.py

package info (click to toggle)
gr-dab 0.5-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,280 kB
  • sloc: python: 14,976; cpp: 6,738; ansic: 547; makefile: 17; sh: 11
file content (124 lines) | stat: -rw-r--r-- 5,031 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# _*_ coding: utf8 _*_

# Andreas Müller, 2008
# andrmuel@ee.ethz.ch
#
# this code may be freely used under GNU GPL conditions

"""
Complete DAB TX and RX with a software channel to simulate noise, frequency offset, etc.
"""

from gnuradio import gr, blks2
import gnuradio.dab as grdab
import math, random

DAB_SAMPLE_RATE=2048000

class dab_ofdm_testbench(gr.top_block):
	def __init__(self, autocorrect_sample_rate=False, input_filter=True, ber_sink=False):
		gr.top_block.__init__(self)
		self.dp = grdab.dab_parameters(1)
		self.rp = grdab.receiver_parameters(1, softbits=False, input_fft_filter=input_filter, autocorrect_sample_rate=autocorrect_sample_rate, correct_ffe=True, equalize_magnitude=False)
		self.ber_sink = ber_sink
		os.environ['GR_SCHEDULER'] = "STS" # need single threaded scheduler for use with concatenate_signals

	def setup_flowgraph(self, mode, ber_skipbytes=0):
		# parameters
		self.dp.set_mode(mode)
		self.rp.set_mode(mode)
		self.vlen = self.dp.num_carriers/4
		self.ber_skipbytes = ber_skipbytes

		# trigger signals
		frame_trigger = [1]+[0]*(self.dp.symbols_per_frame-2)
		self.frame_start = frame_trigger*(len(self.random_bytes)/(self.vlen*(self.dp.symbols_per_frame-1)))+frame_trigger[0:(len(self.random_bytes)/self.vlen)%(self.dp.symbols_per_frame-1)]

		# sources/sinks
		self.source    = gr.vector_source_b(self.random_bytes, False)
		self.trig      = gr.vector_source_b(self.frame_start, False)
		if self.ber_sink:
			self.sink = grdab.blocks.measure_ber_b()
		else:
			self.sink = gr.vector_sink_b()

		# self.noise_start      = gr.noise_source_c(gr.GR_GAUSSIAN, math.sqrt(2), random.randint(0,10000))
		# self.noise_start_head = gr.head(gr.sizeof_gr_complex, NOISE_SAMPLES_AT_START)
		# self.noise_end        = gr.noise_source_c(gr.GR_GAUSSIAN, math.sqrt(2), random.randint(0,10000))
		# self.noise_end_head   = gr.head(gr.sizeof_gr_complex, NOISE_SAMPLES_AT_END)
		
		# blocks
		self.s2v       = gr.stream_to_vector(gr.sizeof_char, self.vlen)
		self.v2s       = gr.vector_to_stream(gr.sizeof_char, self.vlen)
		if self.ber_sink:
			self.ber_skipbytes0 = gr.skiphead(gr.sizeof_char, self.ber_skipbytes)
			self.ber_skipbytes1 = gr.skiphead(gr.sizeof_char, self.ber_skipbytes+self.dp.bytes_per_frame)
		
		# more blocks (they have state, so better reinitialise them)
		self.mod       = grdab.ofdm_mod(self.dp, debug = False)
		self.rescale   = gr.multiply_const_cc(1)
		self.amp       = gr.multiply_const_cc(1)
		self.channel   = blks2.channel_model(noise_voltage=0, noise_seed=random.randint(0,10000))
		# self.cat       = grdab.concatenate_signals(gr.sizeof_gr_complex)
		self.demod     = grdab.ofdm_demod(self.dp, self.rp, debug = False, verbose = True)

		# connect it all
		if self.ber_sink:
			self.connect(self.source, self.s2v, (self.mod,0), self.rescale, self.amp, self.channel, (self.demod,0), self.v2s, self.ber_skipbytes0, self.sink)
			self.connect(self.source, self.ber_skipbytes1, (self.sink,1))
		else:
			self.connect(self.source, self.s2v, (self.mod,0), self.rescale, self.amp, self.channel, (self.demod,0), self.v2s, self.sink)
		self.connect(self.trig, (self.mod,1))

		# SNR calculation and prober
		self.probe_signal = gr.probe_avg_mag_sqrd_c(0,0.00001)
		self.probe_total  = gr.probe_avg_mag_sqrd_c(0,0.00001)
		self.connect(self.amp, self.probe_signal)
		self.connect(self.channel, self.probe_total)
        	
	def gen_random_bytes(self, num_bytes):
		self.random_bytes = [random.randint(0,255) for i in xrange(0,num_bytes)]
		
	def set_noise_energy(self, noise_energy):
		self.channel.set_noise_voltage(math.sqrt(noise_energy)/math.sqrt(2))

	def set_signal_energy(self, signal_energy):
		self.amp.set_k(math.sqrt(signal_energy))
	
	def set_carrier_frequency_offset(self, freq_offset):
		self.channel.set_frequency_offset(float(freq_offset)/self.dp.sample_rate)

	def set_sampling_frequency_offset(self, ratio):
		self.channel.set_timing_offset(ratio)	

	def set_multipath_taps(self, taps):
		self.channel.set_taps(taps)

	def set_power_correction(self, estimated_energy):
		self.estimated_signal_energy = estimated_energy
		sig_ratio = float(self.dp.num_carriers) / float(self.dp.fft_length) # not all subcarriers are occupied -> must be considered to calculate scale factor
		self.rescale.set_k((1/math.sqrt(self.estimated_signal_energy))*math.sqrt(sig_ratio))

	def rewind_sources(self):
		self.source.rewind()
		self.trig.rewind()
		# self.noise_start_head.rewind()
		# self.noise_end_head.rewind()
		
	def clear_sinks(self):
		self.sink.clear()
	
	def clear_state(self):
		if self.ber_sink:
			self.ber_skipbytes0.rewind()
			self.ber_skipbytes1.rewind()
		# self.cat.reset()
		self.demod.clear_state()

		# TODO some state is still left in the demod block - for now just make a new one
		self.disconnect(self.channel, self.demod)
		self.disconnect((self.demod,0), self.v2s)
		self.demod = grdab.ofdm_demod(self.dp, self.rp, debug = False, verbose = True)
		self.connect(self.channel, self.demod)
		self.connect((self.demod,0), self.v2s)