Skip to content
Snippets Groups Projects
noise_detect.py 5.17 KiB
Newer Older
import sys
import os
import tempfile

import numpy as np
import matplotlib.pyplot as plt

from gnuradio import blocks
from gnuradio import gr
import osmosdr

class NoiseFloorDetect(gr.top_block):

    def __init__(self,
                 samp_rate,
                 freq,
                 capture_time,
                 device=None
    ):
        gr.top_block.__init__(self, "Top Block")

        self.samp_rate = samp_rate
        self.freq = freq
        self.capture_time = int(capture_time)
        self.device = device

        self.msgq_out = self.blocks_message_sink_0_msgq_out = gr.msg_queue(0)
        self._build_blocks()

    def detect(self):
        """

        """
        self.start()

        samples_count = 0
        sample_blocks = []

        minimum = 999999999.9
        maximum = 0.0

        while True:
            # pull from message queue
            msg = self.msgq_out.delete_head()
            msg_string = msg.to_string()

            # convert string with floats into an numpy array
            samples = np.fromstring(msg_string, dtype='float32')

            sample_blocks.append(samples)

            samples_count += len(samples)
            if samples_count >= self.capture_time*self.samp_rate:
                break

        samples = np.concatenate(sample_blocks)
        statistic_results = self._extract_statistic_data_from_samples(samples)
        plot_results = self._plot_data(samples, self.samp_rate)

        results = statistic_results
        results['plot_path'] = plot_results['plot_path']
        results['threshold'] = plot_results['threshold']

        return results

    def get_samp_rate(self):
        return self.samp_rate

    def set_samp_rate(self, samp_rate):
        self.samp_rate = samp_rate
        self.blocks_throttle_0.set_sample_rate(self.samp_rate)

    def _plot_data(self, samples, samp_rate=2e6):
        result = {}

        # 1. split samples into 1s sections and extract max value
        # 2. threshold is calculated as median + min value
        max_sections = []
        for i in range(0,int(len(samples)/2e6)-1):
            max = np.amax(samples[i*int(samp_rate):(i+1)*int(samp_rate)])
            max_sections.append(max)

        median = np.median(max_sections)
        min = np.amin(max_sections)
        result['threshold'] = round(median+min, 6)

        plt.plot(samples)
        plt.plot([0, len(samples)-1], \
                 [result['threshold'], result['threshold']], \
                 label="threshold " + str(result['threshold']), \
                 linewidth=1.5, linestyle="-", color="red")

        plt.legend(loc='upper right')

        plt.xlabel("Samples")
        plt.ylabel("Magnitude")

        result['plot_path'] = self._create_image_tmp_file()
        plt.savefig(result['plot_path'].name)
        plt.close()

        return result

    def _create_image_tmp_file(self):
        return tempfile.NamedTemporaryFile()


    def _extract_statistic_data_from_samples(self, samples):
        measurements = {}
        measurements['min'] = np.amin(samples)
        measurements['max'] = np.amax(samples)
        measurements['average'] = np.average(samples)
        measurements['median'] = np.median(samples)
        measurements['std'] = np.std(samples)
        measurements['seconds'] = len(samples)/self.samp_rate

        return measurements

    def _build_blocks(self):
        self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + "osmosdr=0" )
        self.osmosdr_source_0.set_sample_rate(self.samp_rate)
        self.osmosdr_source_0.set_center_freq(self.freq, 0)
        self.osmosdr_source_0.set_freq_corr(0, 0)
        self.osmosdr_source_0.set_dc_offset_mode(0, 0)
        self.osmosdr_source_0.set_iq_balance_mode(0, 0)
        self.osmosdr_source_0.set_gain_mode(False, 0)
        self.osmosdr_source_0.set_gain(0, 0)
        self.osmosdr_source_0.set_if_gain(20, 0)
        self.osmosdr_source_0.set_bb_gain(20, 0)
        self.osmosdr_source_0.set_antenna("", 0)
        self.osmosdr_source_0.set_bandwidth(0, 0)

        self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, \
                                         int(self.samp_rate)*int(self.capture_time))
        self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex*1, \
                                                 10*self.samp_rate, \
                                                 True)
        self.blocks_message_sink_0 = blocks.message_sink(gr.sizeof_float*1, \
                                                         self.blocks_message_sink_0_msgq_out, \
                                                         False)
        self.blocks_file_source_0 = self.osmosdr_source_0
        self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1)

        ##################################################
        # Connections
        ##################################################
        self.connect((self.blocks_file_source_0, 0), (self.blocks_throttle_0, 0))
        self.connect((self.blocks_throttle_0, 0), (self.blocks_head_0, 0))
        self.connect((self.blocks_head_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
        self.connect((self.blocks_complex_to_mag_squared_0, 0), \
                     (self.blocks_message_sink_0, 0))