Skip to content
Snippets Groups Projects
gui.py 15.3 KiB
Newer Older
Daniel Meißner's avatar
Daniel Meißner committed
# -*- coding: utf-8 -*-

import sys
import os
import logging
Daniel Meißner's avatar
Daniel Meißner committed

from PyQt5 import QtCore, QtGui, QtWidgets, uic

from lib.hardware import Hardware
Daniel Meißner's avatar
Daniel Meißner committed

from lib.gr.file_sink import FileSink
from lib.gr.noise_detect import NoiseFloorDetect

from lib.gui.update_progress_bar import UpdateProgressBar
from lib.gui.capture import Capture
from lib.gui.noise_floor_plot import NoiseFloorPlot
class SuGui(QtWidgets.QMainWindow):
    """
    GUI class used to realise the main window.
    """
Daniel Meißner's avatar
Daniel Meißner committed
    def __init__(self):
        """
        Instantiate GUI class and open window.
        GUI elements are directly loaded and converted from UI file.
        """
Daniel Meißner's avatar
Daniel Meißner committed
        super(SuGui, self).__init__()
        uic.loadUi("lib/gui/su_mainwindow.ui", self)
Daniel Meißner's avatar
Daniel Meißner committed
        self.show()
        self.setWindowTitle(Su.name_and_version())
        self._set_default_dump_file_path()
        self.lineEdit_dump_file.setCursorPosition(0)
        self.refresh_hw_button()
        self.button_stop.setEnabled(False)
Daniel Meißner's avatar
Daniel Meißner committed
        # button events
        self.button_start.clicked.connect(self.start_button)
        self.button_stop.clicked.connect(self.stop_button)
        self.button_close.clicked.connect(self.close_button)
        self.button_nf_detect.clicked.connect(self.nf_button)
        self.button_hw_refresh.clicked.connect(self.refresh_hw_button)
        self.comboBox_env.currentIndexChanged.connect(self.configure_env)
        self.rb_868.toggled.connect(self._check_samp_rate)
        self.rb_1mhz.toggled.connect(self._check_samp_rate)
        # menubar
        self.actionQuit.triggered.connect(self.close_button)
        self.actionAbout.triggered.connect(self.about_button)
        # edit lines
        self.lineEdit_dump_file.editingFinished. \
            connect(self.check_dump_file_path)
        self.lineEdit_nf.editingFinished. \
            connect(self.check_nf)
        self.radio_buttons_samp_rate_and_time = {
            # samp_rate
            self.rb_1mhz: 1e6,
            self.rb_2mhz: 2e6,
            self.rb_3mhz: 3e6,
            # time to measure in seconds
            self.rb_1m: 60,
            self.rb_5m: 300,
            self.rb_10m: 600,
            self.rb_1h: 3600,
            self.rb_24h: 86400
        }
        self.radio_buttons_freq = {
            self.rb_433: 433e6,
            self.rb_868: 868e6
        }
        self.comboBox_env.addItem("")
        self.comboBox_env.addItem("EnOcean v1")
        self.comboBox_env.addItem("HomeMatic")
        # add released actions to radio buttons
        for button in self.radio_buttons_samp_rate_and_time.keys():
            button.released.connect(self._update_status_label)

        # labels
        self._update_status_label()
        self.label_file_overwrite.setText("change path")
        # create thread class for progressbar update
        self.progress_bar_thread = UpdateProgressBar()
        self.capture_thread = Capture()
    def start_button(self):
        """
        Start button action in main window.
        """
        if self._check_dependencies_before_start_capture():
            options = self._extract_options()
            print(options)
            file_sink = FileSink(options['samp_rate'],
                                 options['freq'],
                                 options['capture_time'],
                                 options['dump_file_path'])
            self.button_stop.setEnabled(True)
            self.button_start.setEnabled(False)
            self.progress_bar_thread.set_measure_time(options['capture_time'])
            self.progress_bar_thread.progress.connect(self.update_progress_bar, \
                                                      QtCore.Qt.QueuedConnection)
            self.progress_bar_thread.run_time.connect(self.update_run_time, \
                                                      QtCore.Qt.QueuedConnection)
            self.progress_bar_thread.start()
            self._set_setEnabled(False)
            # capture thread
            self.capture_thread.exit_code.connect(self.update_progress_bar, \
                                                  QtCore.Qt.QueuedConnection)
            self.capture_thread.set_file_sink(file_sink)
            self.capture_thread.start()

    def update_progress_bar(self, text):
        """
        Add the text that's given to this function to the
        list_submissions QListWidget we have in our GUI and
        increase the current value of progress bar by 1
        :param text: text to add to the process bar
        :type text: str
        """
        self.progressBar.setValue(text)
Daniel Meißner's avatar
Daniel Meißner committed

    def update_run_time(self, run_time):
        capture_time = self._extract_options()['capture_time']
        self.label_status. \
            setText("Time: %ss of %ss" % (run_time, capture_time))

    def stop_button(self):
        """
        Stop button action in main window.
        """
        # kill capture thread
        self.capture_thread.terminate()
        # kill progressbar thread
        self.progress_bar_thread.terminate()
        self.update_progress_bar(0)
        # enable buttons again
        self._set_setEnabled(True)
        # disbale freq buttons if env is selected
        if self.comboBox_env.currentIndex() != 0:
            self._set_setEnabled(False, freq_button_only = True)
        # disable stop button again
        self.button_stop.setEnabled(False)
        self.button_start.setEnabled(True)

Daniel Meißner's avatar
Daniel Meißner committed

    def close_button(self):
        """
        Close button action in main window.
        """
Daniel Meißner's avatar
Daniel Meißner committed
        logging.info("Close application")
        QtCore.QCoreApplication.instance().quit()

    def refresh_hw_button(self):
        """
        Refresh combo box with current connected hardware.
        """
        self.comboBox_hardware.clear()
        for device in Hardware().get_connected_supported_devices():
            text = device['name'] + " (" + device['id'] + ")"
            self.comboBox_hardware.addItem(self.tr(text))
    def about_button(self):
        """
        Show about widget.
        """
        QtWidgets.QMessageBox.about(self, "About", \
                Su.name_and_version() + "\n\n" \
                "Contact: daniel.meissner@smail.inf.h-brs.de\n" \
                "Source: https://git.fslab.de/dmeiss2s/su")
    def nf_button(self):
        if len(Hardware.get_connected_supported_devices()) != 0:
            options = self._extract_options()

            measure_time = 60

            nf_detect = NoiseFloorDetect(options['samp_rate'],
                                         options['freq'],
                                         measure_time)
            QtWidgets.QMessageBox.about(self, "Noise floor detection started", \
                "You have to wait " + str(measure_time) + "s after clicking 'Ok'.")

            result = nf_detect.detect()
            # plot results
            self.lineEdit_nf.setText(str(result['threshold']))
            self.nf_plot = NoiseFloorPlot(result['plot_path'].name + ".png")

            string = ""
            for key,value in result.iteritems():
                if key == "plot_path":
                    continue
                string += str(key) + ": " + str(value) + "\n"
            self.nf_plot.label_data.setText(string)
            self.nf_plot.show()

    def nf_plot_result(self, result):
        print("Returned nf detect thread.")
        self.nf_result = result

    def check_dump_file_path(self):
        """
        Update labels for writable state and free disk space in path.
        """
        free_disk_space = Disk.free_disk_space(self.lineEdit_dump_file.text())
        if free_disk_space == None:
            self.label_disk_space.setText("")
            self.label_file_overwrite.setText("not writable")
            self.label_file_overwrite.setStyleSheet('color: red')
        else:
            self.label_file_overwrite.setText("")
            self.label_disk_space.setText("Free disk space:\n" \
                                          "" + Disk.sizeof_fmt(free_disk_space))
            if free_disk_space - self._calculate_needed_disk_space() < 0:
                self.label_disk_space.setStyleSheet('color: red')
            else:
                self.label_disk_space.setStyleSheet('color: black')
    def check_nf(self):
        """

        """
        try:
            nf = float(self.lineEdit_nf.text())
        except:
            QtWidgets.QMessageBox.about(self, "Error", \
                        "Threshold needs to be from type int or float.")
            self.lineEdit_nf.setText("0.006")


    def configure_env(self):
        """
        Configure envirenment with selected home automatation system.

        Name      | Index
                  | 0
        EnOcean   | 1
        HomeMatic | 2
        """
        current_index = self.comboBox_env.currentIndex()
        if current_index != 0:
            self._set_setEnabled(False, freq_button_only = True)

            # https://stackoverflow.com/questions/11479816/what-is-the-python-equivalent-for-a-case-switch-statement
            if current_index == 1:
                self._configure_enocean()
            elif current_index == 2:
                self._configure_homematic()
        else:
            self._set_setEnabled(True)

    def _check_samp_rate(self):
        if self.rb_868.isChecked():
            self.rb_1mhz.setEnabled(False)

            if self.rb_1mhz.isChecked():
                self.rb_1mhz.setChecked(False)
                self.rb_2mhz.setChecked(True)
        else:
            self.rb_1mhz.setEnabled(True)

    def _configure_enocean(self):
        self.rb_868.setChecked(True)
        self.rb_433.setChecked(False)


    def _configure_homematic(self):
        self.rb_868.setChecked(True)
        self.rb_433.setChecked(False)

    def _set_setEnabled(self, status=True, freq_button_only=False):
        """
        Enable or disable buttons the user can interact with.

        param: status: boolean
        """
        self.rb_433.setEnabled(status)
        self.rb_868.setEnabled(status)
        if freq_button_only == False:
            self.rb_1mhz.setEnabled(status)
            self.rb_2mhz.setEnabled(status)
            self.rb_3mhz.setEnabled(status)
            self.rb_1m.setEnabled(status)
            self.rb_5m.setEnabled(status)
            self.rb_10m.setEnabled(status)
            self.rb_1h.setEnabled(status)
            self.rb_24h.setEnabled(status)
            self.lineEdit_dump_file.setEnabled(status)
            self.comboBox_hardware.setEnabled(status)
            self.comboBox_env.setEnabled(status)
            self.button_hw_refresh.setEnabled(status)
            self.button_nf_detect.setEnabled(status)
    def _get_default_dump_file_path(self):
        """
        Generate path for dump file. The default file path is defined as
        dump_file with current timestamp located in the home of current user.

        Example:
            /home/user/su/dump_file-1942_05_23-13_37_23.iq
        """
        dt = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
        path = os.path.expanduser("~") + "/su/dump_file-" + dt + ".iq"

        return path

    def _extract_selected_hardware_device(self):
        """
        Extract device key from combo box.
        """
        device_id = re.search(r"\w{4}:\w{4}", self.comboBox_hardware. \
                                                currentText()).group()

        device_key = ''
        for key, value in Hardware.SUPPORTED_HARDWARE.items():
            if value == device_id:
                device_key = key

        return device_key

    def _check_dependencies_before_start_capture(self):
        """
        Private method to check needed dependencies before starting capturing
        data. The following states needs to be passed:
            1. A dump file path need to be befined
            2. Supported RF hardware need to be connected
            3. RF hardware need to be selected

        It returns True or False.
        """
        if len(self.lineEdit_dump_file.text()) == 0:
            QtWidgets.QMessageBox.about(self, "Dumpfile path missing…",  \
                                              "Please add dump file path.")
            return False
        elif len(Hardware.get_connected_supported_devices()) == 0:
            QtWidgets.QMessageBox.about(self, "No supported RF hardware " \
                                              "connected…", \
                                              "Please connect RF hardware.")
            self.refresh_hw_button()
            return False
        elif len(self.comboBox_hardware.currentText()) == 0:
            QtWidgets.QMessageBox.about(self, "No supported RF hardware " \
                                              "selected…", \
                                              "Select RF hardware.")
            self.refresh_hw_button()
            return False
        else:
            return True
        """
        Update status label.
        """
        needed_disk_space = Disk.sizeof_fmt(self._calculate_needed_disk_space())
        self.label_status. \
            setText("Needed disk space:\n" \
                    "" + needed_disk_space)

    def _calculate_needed_disk_space(self):
        """
        Calculate needed disk space based on current check box status. Every
        sample needs 8 byte of disk space (32 bit I, 32 bit Q).

        Returned integer value.
        """
        time, samp_rate = self._get_checked_time_and_samp_rate_radio_buttons().values()
    def _get_checked_time_and_samp_rate_radio_buttons(self):
        """
        Return a list of all checked radio buttons.
        """
        checked_buttons = {}
        for radio_button, value in \
                self.radio_buttons_samp_rate_and_time.iteritems():
            if radio_button.isChecked():
                checked_buttons[radio_button] = value

        return checked_buttons
    def _get_checked_freq_button(self):
        """
        Get checked freq button.
        """
        for radio_button, value in self.radio_buttons_freq.iteritems():
            if radio_button.isChecked():
                return {radio_button: value}

    def _set_default_dump_file_path(self):
        """
        Set default dump file path.
        """
        if "SU_DUMP_FILE_PATH" in os.environ:
            self.lineEdit_dump_file.setText(os.environ['SU_DUMP_FILE_PATH'])
        else:
            self.lineEdit_dump_file.setText(self._get_default_dump_file_path())

    def _extract_options(self):
        """
        Extract selected options from gui.
        """
        freq_button = self._get_checked_freq_button()
        time_and_samp_rate_buttons = self._get_checked_time_and_samp_rate_radio_buttons()
        # radio buttons
        options = {}
        #
        for button, value in time_and_samp_rate_buttons.iteritems():
            if re.search(r"MHz", button.text()):
                options['samp_rate'] = value
            else:
                options['capture_time'] = value
        options['freq'] = freq_button.values()[0]
        # line inputs
        options['dump_file_path'] = self.lineEdit_dump_file.text()
        # dropdown boxes
        options['device'] = self._extract_selected_hardware_device()

        return options