import sys import os import tempfile import numpy as np import matplotlib.pyplot as plt from gnuradio import blocks from gnuradio import gr import osmosdr class NoiseFloorDetect(gr.top_block): def __init__(self, samp_rate, freq, capture_time, device=None ): gr.top_block.__init__(self, "Top Block") self.samp_rate = samp_rate self.freq = freq self.capture_time = int(capture_time) self.device = device self.msgq_out = self.blocks_message_sink_0_msgq_out = gr.msg_queue(0) self._build_blocks() def detect(self): """ """ self.start() samples_count = 0 sample_blocks = [] minimum = 999999999.9 maximum = 0.0 while True: # pull from message queue msg = self.msgq_out.delete_head() msg_string = msg.to_string() # convert string with floats into an numpy array samples = np.fromstring(msg_string, dtype='float32') sample_blocks.append(samples) samples_count += len(samples) if samples_count >= self.capture_time*self.samp_rate: break samples = np.concatenate(sample_blocks) statistic_results = self._extract_statistic_data_from_samples(samples) plot_results = self._plot_data(samples, self.samp_rate) results = statistic_results results['plot_path'] = plot_results['plot_path'] results['threshold'] = plot_results['threshold'] return results def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.blocks_throttle_0.set_sample_rate(self.samp_rate) def _plot_data(self, samples, samp_rate=2e6): result = {} # 1. split samples into 1s sections and extract max value # 2. threshold is calculated as median + min value max_sections = [] for i in range(0,int(len(samples)/2e6)-1): max = np.amax(samples[i*int(samp_rate):(i+1)*int(samp_rate)]) max_sections.append(max) median = np.median(max_sections) min = np.amin(max_sections) result['threshold'] = round(median+min, 6) plt.plot(samples) plt.plot([0, len(samples)-1], \ [result['threshold'], result['threshold']], \ label="threshold " + str(result['threshold']), \ linewidth=1.5, linestyle="-", color="red") plt.legend(loc='upper right') plt.xlabel("Samples") plt.ylabel("Magnitude") result['plot_path'] = self._create_image_tmp_file() plt.savefig(result['plot_path'].name) plt.close() return result def _create_image_tmp_file(self): return tempfile.NamedTemporaryFile() def _extract_statistic_data_from_samples(self, samples): measurements = {} measurements['min'] = np.amin(samples) measurements['max'] = np.amax(samples) measurements['average'] = np.average(samples) measurements['median'] = np.median(samples) measurements['std'] = np.std(samples) measurements['seconds'] = len(samples)/self.samp_rate return measurements def _build_blocks(self): self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + "osmosdr=0" ) self.osmosdr_source_0.set_sample_rate(self.samp_rate) self.osmosdr_source_0.set_center_freq(self.freq, 0) self.osmosdr_source_0.set_freq_corr(0, 0) self.osmosdr_source_0.set_dc_offset_mode(0, 0) self.osmosdr_source_0.set_iq_balance_mode(0, 0) self.osmosdr_source_0.set_gain_mode(False, 0) self.osmosdr_source_0.set_gain(0, 0) self.osmosdr_source_0.set_if_gain(20, 0) self.osmosdr_source_0.set_bb_gain(20, 0) self.osmosdr_source_0.set_antenna("", 0) self.osmosdr_source_0.set_bandwidth(0, 0) self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, \ int(self.samp_rate)*int(self.capture_time)) self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, \ 10*self.samp_rate, \ True) self.blocks_message_sink_0 = blocks.message_sink(gr.sizeof_float*1, \ self.blocks_message_sink_0_msgq_out, \ False) self.blocks_file_source_0 = self.osmosdr_source_0 self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1) ################################################## # Connections ################################################## self.connect((self.blocks_file_source_0, 0), (self.blocks_throttle_0, 0)) self.connect((self.blocks_throttle_0, 0), (self.blocks_head_0, 0)) self.connect((self.blocks_head_0, 0), (self.blocks_complex_to_mag_squared_0, 0)) self.connect((self.blocks_complex_to_mag_squared_0, 0), \ (self.blocks_message_sink_0, 0))