Newer
Older
# -*- coding: utf-8 -*-
import sys
import os
import logging
import datetime
from lib.hardware import Hardware
from lib.su import Su
from lib.disk import Disk
from lib.gr.file_sink import FileSink
from lib.gr.noise_detect import NoiseFloorDetect
from lib.gui.update_progress_bar import UpdateProgressBar
from lib.gui.capture import Capture
from lib.gui.noise_floor_plot import NoiseFloorPlot
class SuGui(QtWidgets.QMainWindow):
"""
GUI class used to realise the main window.
"""
"""
Instantiate GUI class and open window.
GUI elements are directly loaded and converted from UI file.
"""
uic.loadUi("lib/gui/su_mainwindow.ui", self)
# setting some defaults
self.setWindowTitle(Su.name_and_version())
self._set_default_dump_file_path()
self.lineEdit_dump_file.setCursorPosition(0)
self.refresh_hw_button()
self.button_start.clicked.connect(self.start_button)
self.button_stop.clicked.connect(self.stop_button)
self.button_close.clicked.connect(self.close_button)
self.button_nf_detect.clicked.connect(self.nf_button)
self.button_hw_refresh.clicked.connect(self.refresh_hw_button)
self.comboBox_env.currentIndexChanged.connect(self.configure_env)
self.rb_868.toggled.connect(self._check_samp_rate)
self.rb_1mhz.toggled.connect(self._check_samp_rate)
# menubar
self.actionQuit.triggered.connect(self.close_button)
self.actionAbout.triggered.connect(self.about_button)
# edit lines
self.lineEdit_dump_file.editingFinished. \
connect(self.check_dump_file_path)
self.lineEdit_nf.editingFinished. \
connect(self.check_nf)
# radio buttons
self.radio_buttons_samp_rate_and_time = {
# samp_rate
self.rb_1mhz: 1e6,
self.rb_2mhz: 2e6,
self.rb_3mhz: 3e6,
# time to measure in seconds
self.rb_1m: 60,
self.rb_5m: 300,
self.rb_10m: 600,
self.rb_1h: 3600,
self.rb_24h: 86400
}
self.radio_buttons_freq = {
self.rb_433: 433e6,
self.rb_868: 868e6
}
self.comboBox_env.addItem("")
self.comboBox_env.addItem("EnOcean v1")
self.comboBox_env.addItem("HomeMatic")
# add released actions to radio buttons
for button in self.radio_buttons_samp_rate_and_time.keys():
button.released.connect(self._update_status_label)
self._check_samp_rate()
# labels
self._update_status_label()
self.label_file_overwrite.setText("change path")
# create thread class for progressbar update
self.progress_bar_thread = UpdateProgressBar()
self.capture_thread = Capture()
def start_button(self):
"""
Start button action in main window.
"""
if self._check_dependencies_before_start_capture():
options = self._extract_options()
print(options)
file_sink = FileSink(options['samp_rate'],
options['freq'],
options['capture_time'],
options['dump_file_path'])
self.button_stop.setEnabled(True)
self.button_start.setEnabled(False)
# progress_bar thread
self.progress_bar_thread.set_measure_time(options['capture_time'])
self.progress_bar_thread.progress.connect(self.update_progress_bar, \
QtCore.Qt.QueuedConnection)
self.progress_bar_thread.run_time.connect(self.update_run_time, \
QtCore.Qt.QueuedConnection)
self.progress_bar_thread.start()
self._set_setEnabled(False)
# capture thread
self.capture_thread.exit_code.connect(self.update_progress_bar, \
QtCore.Qt.QueuedConnection)
self.capture_thread.set_file_sink(file_sink)
self.capture_thread.start()
def update_progress_bar(self, text):
"""
Add the text that's given to this function to the
list_submissions QListWidget we have in our GUI and
increase the current value of progress bar by 1
:param text: text to add to the process bar
:type text: str
"""
self.progressBar.setValue(text)
def update_run_time(self, run_time):
capture_time = self._extract_options()['capture_time']
self.label_status. \
setText("Time: %ss of %ss" % (run_time, capture_time))
def stop_button(self):
"""
Stop button action in main window.
"""
# kill capture thread
self.capture_thread.terminate()
# kill progressbar thread
self.progress_bar_thread.terminate()
self.update_progress_bar(0)
# enable buttons again
self._set_setEnabled(True)
# disbale freq buttons if env is selected
if self.comboBox_env.currentIndex() != 0:
self._set_setEnabled(False, freq_button_only = True)
# disable stop button again
self.button_stop.setEnabled(False)
self.button_start.setEnabled(True)
def close_button(self):
"""
Close button action in main window.
"""
logging.info("Close application")
QtCore.QCoreApplication.instance().quit()
def refresh_hw_button(self):
"""
Refresh combo box with current connected hardware.
"""
self.comboBox_hardware.clear()
for device in Hardware().get_connected_supported_devices():
text = device['name'] + " (" + device['id'] + ")"
self.comboBox_hardware.addItem(self.tr(text))
def about_button(self):
"""
Show about widget.
"""
QtWidgets.QMessageBox.about(self, "About", \
Su.name_and_version() + "\n\n" \
"Contact: daniel.meissner@smail.inf.h-brs.de\n" \
"Source: https://git.fslab.de/dmeiss2s/su")
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def nf_button(self):
if len(Hardware.get_connected_supported_devices()) != 0:
options = self._extract_options()
measure_time = 60
nf_detect = NoiseFloorDetect(options['samp_rate'],
options['freq'],
measure_time)
QtWidgets.QMessageBox.about(self, "Noise floor detection started", \
"You have to wait " + str(measure_time) + "s after clicking 'Ok'.")
result = nf_detect.detect()
# plot results
self.lineEdit_nf.setText(str(result['threshold']))
self.nf_plot = NoiseFloorPlot(result['plot_path'].name + ".png")
string = ""
for key,value in result.iteritems():
if key == "plot_path":
continue
string += str(key) + ": " + str(value) + "\n"
self.nf_plot.label_data.setText(string)
self.nf_plot.show()
def nf_plot_result(self, result):
print("Returned nf detect thread.")
self.nf_result = result
def check_dump_file_path(self):
"""
Update labels for writable state and free disk space in path.
"""
free_disk_space = Disk.free_disk_space(self.lineEdit_dump_file.text())
if free_disk_space == None:
self.label_disk_space.setText("")
self.label_file_overwrite.setText("not writable")
self.label_file_overwrite.setStyleSheet('color: red')
else:
self.label_file_overwrite.setText("")
self.label_disk_space.setText("Free disk space:\n" \
"" + Disk.sizeof_fmt(free_disk_space))
if free_disk_space - self._calculate_needed_disk_space() < 0:
self.label_disk_space.setStyleSheet('color: red')
else:
self.label_disk_space.setStyleSheet('color: black')
def check_nf(self):
"""
"""
try:
nf = float(self.lineEdit_nf.text())
except:
QtWidgets.QMessageBox.about(self, "Error", \
"Threshold needs to be from type int or float.")
self.lineEdit_nf.setText("0.006")
def configure_env(self):
"""
Configure envirenment with selected home automatation system.
Name | Index
| 0
EnOcean | 1
HomeMatic | 2
"""
current_index = self.comboBox_env.currentIndex()
if current_index != 0:
self._set_setEnabled(False, freq_button_only = True)
# https://stackoverflow.com/questions/11479816/what-is-the-python-equivalent-for-a-case-switch-statement
if current_index == 1:
self._configure_enocean()
elif current_index == 2:
self._configure_homematic()
else:
self._set_setEnabled(True)
def _check_samp_rate(self):
if self.rb_868.isChecked():
self.rb_1mhz.setEnabled(False)
if self.rb_1mhz.isChecked():
self.rb_1mhz.setChecked(False)
self.rb_2mhz.setChecked(True)
else:
self.rb_1mhz.setEnabled(True)
def _configure_enocean(self):
self.rb_868.setChecked(True)
self.rb_433.setChecked(False)
def _configure_homematic(self):
self.rb_868.setChecked(True)
self.rb_433.setChecked(False)
def _set_setEnabled(self, status=True, freq_button_only=False):
"""
Enable or disable buttons the user can interact with.
param: status: boolean
"""
self.rb_433.setEnabled(status)
self.rb_868.setEnabled(status)
if freq_button_only == False:
self.rb_1mhz.setEnabled(status)
self.rb_2mhz.setEnabled(status)
self.rb_3mhz.setEnabled(status)
self.rb_1m.setEnabled(status)
self.rb_5m.setEnabled(status)
self.rb_10m.setEnabled(status)
self.rb_1h.setEnabled(status)
self.rb_24h.setEnabled(status)
self.lineEdit_dump_file.setEnabled(status)
self.comboBox_hardware.setEnabled(status)
self.comboBox_env.setEnabled(status)
self.button_hw_refresh.setEnabled(status)
self.button_nf_detect.setEnabled(status)
def _get_default_dump_file_path(self):
"""
Generate path for dump file. The default file path is defined as
dump_file with current timestamp located in the home of current user.
Example:
/home/user/su/dump_file-1942_05_23-13_37_23.iq
"""
dt = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
path = os.path.expanduser("~") + "/su/dump_file-" + dt + ".iq"
return path
def _extract_selected_hardware_device(self):
"""
Extract device key from combo box.
"""
device_id = re.search(r"\w{4}:\w{4}", self.comboBox_hardware. \
currentText()).group()
device_key = ''
for key, value in Hardware.SUPPORTED_HARDWARE.items():
if value == device_id:
device_key = key
return device_key
def _check_dependencies_before_start_capture(self):
"""
Private method to check needed dependencies before starting capturing
data. The following states needs to be passed:
1. A dump file path need to be befined
2. Supported RF hardware need to be connected
3. RF hardware need to be selected
It returns True or False.
"""
if len(self.lineEdit_dump_file.text()) == 0:
QtWidgets.QMessageBox.about(self, "Dumpfile path missing…", \
"Please add dump file path.")
return False
elif len(Hardware.get_connected_supported_devices()) == 0:
QtWidgets.QMessageBox.about(self, "No supported RF hardware " \
"connected…", \
"Please connect RF hardware.")
self.refresh_hw_button()
return False
elif len(self.comboBox_hardware.currentText()) == 0:
QtWidgets.QMessageBox.about(self, "No supported RF hardware " \
"selected…", \
"Select RF hardware.")
self.refresh_hw_button()
return False
else:
return True
def _update_status_label(self):
"""
Update status label.
"""
needed_disk_space = Disk.sizeof_fmt(self._calculate_needed_disk_space())
self.label_status. \
setText("Needed disk space:\n" \
"" + needed_disk_space)
self.check_dump_file_path()
def _calculate_needed_disk_space(self):
"""
Calculate needed disk space based on current check box status. Every
sample needs 8 byte of disk space (32 bit I, 32 bit Q).
Returned integer value.
"""
time, samp_rate = self._get_checked_time_and_samp_rate_radio_buttons().values()
return int((time*samp_rate) * 8)
def _get_checked_time_and_samp_rate_radio_buttons(self):
"""
Return a list of all checked radio buttons.
"""
checked_buttons = {}
for radio_button, value in \
self.radio_buttons_samp_rate_and_time.iteritems():
if radio_button.isChecked():
checked_buttons[radio_button] = value
return checked_buttons
def _get_checked_freq_button(self):
"""
Get checked freq button.
"""
for radio_button, value in self.radio_buttons_freq.iteritems():
if radio_button.isChecked():
return {radio_button: value}
def _set_default_dump_file_path(self):
"""
Set default dump file path.
"""
if "SU_DUMP_FILE_PATH" in os.environ:
self.lineEdit_dump_file.setText(os.environ['SU_DUMP_FILE_PATH'])
else:
self.lineEdit_dump_file.setText(self._get_default_dump_file_path())
def _extract_options(self):
"""
Extract selected options from gui.
"""
freq_button = self._get_checked_freq_button()
time_and_samp_rate_buttons = self._get_checked_time_and_samp_rate_radio_buttons()
# radio buttons
options = {}
#
for button, value in time_and_samp_rate_buttons.iteritems():
if re.search(r"MHz", button.text()):
options['samp_rate'] = value
else:
options['capture_time'] = value
options['freq'] = freq_button.values()[0]
# line inputs
options['dump_file_path'] = self.lineEdit_dump_file.text()
# dropdown boxes
options['device'] = self._extract_selected_hardware_device()
return options