Newer
Older
# -*- coding: utf-8 -*-
import os
import osmosdr
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio import gr, blocks
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from lib.disk import Disk
class FileSink(gr.top_block):
"""
FileSink class to create I/Q sample binary dump files.
"""
def __init__(self,
dump_file,
freq,
samp_rate,
capture_time,
device,
parent=None
):
gr.top_block.__init__(self, "file_sink_block")
self.capture_device = \
osmosdr.source(args="numchan=" + str(1) + " " + "" )
self._build_blocks()
def needed_disc_space(self):
"""
Calculate needed disc space for dump.
"""
pass
def capture(self):
"""
Capture samples to dump file.
"""
tb = top_block()
tb.start()
tb.stop()
tb.wait()
def get_file_path(self):
return self.file_path
def get_samp_rate(self):
return samp_rate
def get_capture_time(self):
return self.capture_time
def get_capture_device(self):
return self.get_capture_device
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def _dump_file_writable(self):
"""
Check permissions for specified file dump path.
"""
Disk.path_writeble(self.file_path)
def _build_blocks(self):
# blocks
# configure capture device
self.device_source = self.capture_device
self.device_source.set_sample_rate(self.samp_rate)
self.device_source.set_center_freq(self.freq, 0)
self.device_source.set_freq_corr(0, 0)
self.device_source.set_dc_offset_mode(0, 0)
self.device_source.set_iq_balance_mode(0, 0)
self.device_source.set_gain_mode(False, 0)
self.device_source.set_gain(0, 0)
self.device_source.set_if_gain(20, 0)
self.device_source.set_bb_gain(20, 0)
self.device_source.set_antenna("", 0)
self.device_source.set_bandwidth(0, 0)
# configure additional blocks
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, \
int(self.samp_rate)*self.capture_time)
self.blocks_file_meta_sink_0 = blocks.file_meta_sink(
gr.sizeof_gr_complex*1, \
self.file_path, \
self.samp_rate, \
1, \
blocks.GR_FILE_FLOAT, \
True, 1000000, "", False)
self.blocks_file_meta_sink_0.set_unbuffered(False)
# connect blocks
self.connect((self.blocks_head_0, 0), (self.blocks_file_meta_sink_0, 0))
self.connect((self.device_source, 0), (self.blocks_head_0, 0))