# -*- coding: utf-8 -*- import os import logging import osmosdr from gnuradio import eng_notation from gnuradio import gr from gnuradio import gr, blocks from gnuradio.eng_option import eng_option from gnuradio.filter import firdes from lib.disk import Disk class FileSink(gr.top_block): """ FileSink class to create I/Q sample binary dump files. """ def __init__(self, dump_file, freq, samp_rate, capture_time, device, parent=None ): gr.top_block.__init__(self, "file_sink_block") self.file_path = str(dump_file) self.samp_rate = samp_rate self.freq = freq self.capture_time = capture_time self.capture_device = \ osmosdr.source(args="numchan=" + str(1) + " " + "" ) self._build_blocks() def needed_disc_space(self): """ Calculate needed disc space for dump. """ pass def capture(self): """ Capture samples to dump file. """ logging.info("Start '%s' capture." % self.file_path) self.start() self.wait() logging.info("Capture '%s' done." % self.file_path) def get_file_path(self): return self.file_path def get_samp_rate(self): return samp_rate def get_capture_time(self): return self.capture_time def get_capture_device(self): return self.get_capture_device def _dump_file_writable(self): """ Check permissions for specified file dump path. """ Disk.path_writeble(self.file_path) def _build_blocks(self): # blocks # configure capture device self.device_source = self.capture_device self.device_source.set_sample_rate(self.samp_rate) self.device_source.set_center_freq(self.freq, 0) self.device_source.set_freq_corr(0, 0) self.device_source.set_dc_offset_mode(0, 0) self.device_source.set_iq_balance_mode(0, 0) self.device_source.set_gain_mode(False, 0) self.device_source.set_gain(0, 0) self.device_source.set_if_gain(20, 0) self.device_source.set_bb_gain(20, 0) self.device_source.set_antenna("", 0) self.device_source.set_bandwidth(0, 0) # configure additional blocks self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, \ int(self.samp_rate)*self.capture_time) self.blocks_file_meta_sink_0 = blocks.file_meta_sink( \ gr.sizeof_gr_complex*1, \ self.file_path, \ self.samp_rate, \ 1, \ blocks.GR_FILE_FLOAT, \ True, 1000000, "", False) self.blocks_file_meta_sink_0.set_unbuffered(False) # connect blocks self.connect((self.blocks_head_0, 0), (self.blocks_file_meta_sink_0, 0)) self.connect((self.device_source, 0), (self.blocks_head_0, 0))