Building Energy Boot Camp 2018 - Day 9
Today we worked on our projects, and I finished mine, as promised. To kill my threads, I decided to give the request threads their own class: RequestThread
. To make them killable, I gave them two methods - stop()
and stopped()
. Using these methods, I could get the thread to stop itself by adding some if
statements into all of its code to detect a stoppage. I also wrote a variation of get_air_values_df()
: get_air_value_df()
. The main difference is that get_air_value_df()
only gets one room at a time, as opposed to an entire wing at a time. This allows me to add a delay between every ping, helping to decrease the chances of a request timeout. To understand how my project works, please refer to the explanation below (on mobile devices, it is best experienced in horizontal orientation):
While running this code, I noticed that I would occasionally get errors from the server, which would send a response saying I’d hit the maximum number of retries. To fix this, I rewrote some of code for get_value_and_units()
, telling it to wait and retry the request if it encounters this error.
# -*- coding: utf-8 -*-
from tkinter import *
import pandas as pd
import numpy
import os
import threading
import time
pd.options.mode.chained_assignment = None # Stop chained assignment warnings - I know what I'm doing
DEFAULT_DATA_PATH = os.path.join('CSVs', 'default_data.csv')
ROOM_SENSOR_PATH = os.path.join('CSVs', 'ahs_air.csv')
SAVED_DATA_PATH = os.path.join('CSVs', 'ahs_air_data.csv')
HOSTNAME = '10.12.4.98'
PORT = '8000'
request_thread = None
air_values = None
background_data_update = True
def save_data():
if air_values is not None:
print('Saving session data... please wait')
air_values.to_csv(SAVED_DATA_PATH)
def stop():
import sys
save_data()
sys.exit()
def get_air_value_df(hostname, port, selected_room, req_thread):
df_dictionary = {
'Date / Time': [],
'Room': [],
'Temperature': [],
'Temperature Units': [],
'CO2 Level': [],
'CO2 Units': [],
'Floor': [],
'Wing': []
}
if req_thread.stopped():
return None
try:
import argparse
from bacnet_gateway_requests import get_value_and_units
import datetime as dt
# Read spreadsheet into a DataFrame.
# Each row contains the following:
# - Location
# - Instance ID of CO2 sensor
# - Instance ID of temperature sensor
if not os.path.isfile(ROOM_SENSOR_PATH):
print('Error:\nCouldn\'t find ' + ROOM_SENSOR_PATH + '!\nShutting down program...')
root.destroy()
df = pd.read_csv(ROOM_SENSOR_PATH, na_filter=False, comment='#')
if req_thread.stopped():
return None
chosen_room = df['Label'] == selected_room
if req_thread.stopped():
return None
filtered_room = df[chosen_room]
if req_thread.stopped():
return None
try:
for row_index, row in filtered_room.iterrows():
# Retrieve data
temp_value, temp_units = get_value_and_units(row['Facility'], row['Temperature'], hostname, port)
if req_thread.stopped():
return None
co2_value, co2_units = get_value_and_units(row['Facility'], row['CO2'], hostname, port)
if req_thread.stopped():
return None
# Prepare to print
temp_value = round(int(temp_value)) if temp_value else ''
temp_units = temp_units.replace('deg ', '°') if temp_units else ''
co2_value = round(int(co2_value)) if co2_value else ''
co2_units = co2_units if co2_units else ''
if req_thread.stopped():
return None
# Update dictionary
df_dictionary['Date / Time'].append(dt.datetime.now().strftime("%m/%d/%Y %H:%M"))
if req_thread.stopped():
return None
df_dictionary['Room'].append(row['Label'])
if req_thread.stopped():
return None
df_dictionary['Temperature'].append(temp_value)
if req_thread.stopped():
return None
df_dictionary['Temperature Units'].append(temp_units)
if req_thread.stopped():
return None
df_dictionary['CO2 Level'].append(co2_value)
if req_thread.stopped():
return None
df_dictionary['CO2 Units'].append(co2_units)
if req_thread.stopped():
return None
df_dictionary['Floor'].append(row['Floor'])
if req_thread.stopped():
return None
df_dictionary['Wing'].append(row['Wing'])
if req_thread.stopped():
return None
break
if req_thread.stopped():
return None
return pd.DataFrame.from_dict(df_dictionary)
except KeyboardInterrupt:
stop()
except KeyboardInterrupt:
stop()
def get_air_values_df(hostname, port, selected_floor, selected_wing, background_updater):
df_dictionary = {
'Date / Time': [],
'Room': [],
'Temperature': [],
'Temperature Units': [],
'CO2 Level': [],
'CO2 Units': [],
'Floor': [],
'Wing': []
}
try:
import argparse
from bacnet_gateway_requests import get_value_and_units
import datetime as dt
# Read spreadsheet into a DataFrame.
# Each row contains the following:
# - Location
# - Instance ID of CO2 sensor
# - Instance ID of temperature sensor
if not os.path.isfile(ROOM_SENSOR_PATH):
print('Error:\nCouldn\'t find ' + ROOM_SENSOR_PATH + '!\nShutting down program...')
root.destroy()
df = pd.read_csv(ROOM_SENSOR_PATH, na_filter=False, comment='#')
matching_floor = df['Floor'] == str(selected_floor)
matching_wing = df['Wing'] == selected_wing
filtered_rooms = df[matching_floor & matching_wing]
# Iterate over the rows of the DataFrame, getting temperature and CO2 values for each location
for row_index, row in filtered_rooms.iterrows():
while True:
if background_updater and not background_data_update:
time.sleep(5)
continue
else:
try:
# Retrieve data
temp_value, temp_units = get_value_and_units(row['Facility'], row['Temperature'], hostname,
port)
co2_value, co2_units = get_value_and_units(row['Facility'], row['CO2'], hostname, port)
# Prepare to print
temp_value = round(int(temp_value)) if temp_value else ''
temp_units = temp_units.replace('deg ', '°') if temp_units else ''
co2_value = round(int(co2_value)) if co2_value else ''
co2_units = co2_units if co2_units else ''
# Update dictionary
df_dictionary['Date / Time'].append(dt.datetime.now().strftime("%m/%d/%Y %H:%M"))
df_dictionary['Room'].append(row['Label'])
df_dictionary['Temperature'].append(temp_value)
df_dictionary['Temperature Units'].append(temp_units)
df_dictionary['CO2 Level'].append(co2_value)
df_dictionary['CO2 Units'].append(co2_units)
df_dictionary['Floor'].append(row['Floor'])
df_dictionary['Wing'].append(row['Wing'])
break
except KeyboardInterrupt:
stop()
return pd.DataFrame.from_dict(df_dictionary)
except KeyboardInterrupt:
stop()
def update_loaded_data(updated_df):
global air_values
air_values = updated_df
if air_values is not None:
air_values['Wing'] = air_values['Wing'].to_string()
fill_fields(floor.get(), str(wing.get()), measurement.get())
def add_to_cache(new_data_df):
global air_values
if air_values is None:
air_values = new_data_df
else:
air_values = pd.concat([air_values, new_data_df], ignore_index=True)
if air_values is not None:
if air_values['Floor'].dtype != numpy.float64:
air_values['Floor'] = air_values['Floor'].astype(str).astype(int)
fill_fields(floor.get(), str(wing.get()), measurement.get())
class RequestThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, selected_floor, selected_wing):
super(RequestThread, self).__init__()
self._stop_event = threading.Event()
self.selected_floor = selected_floor
self.selected_wing = selected_wing
req_thread = threading.Thread(target=self.request_data, args=())
req_thread.daemon = True # Daemonize thread
req_thread.start() # Start the execution
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def request_data(self):
global background_data_update
background_data_update = False
if not os.path.isfile(ROOM_SENSOR_PATH):
print('Error:\nCouldn\'t find ' + ROOM_SENSOR_PATH + '!\nShutting down program...')
root.destroy()
rooms = pd.read_csv(ROOM_SENSOR_PATH, na_filter=False, comment='#')
if self.stopped():
background_data_update = True
return
matching_floor = rooms['Floor'] == str(self.selected_floor)
if self.stopped():
background_data_update = True
return
matching_wing = rooms['Wing'] == self.selected_wing
if self.stopped():
background_data_update = True
return
rooms = rooms[matching_floor & matching_wing]
if self.stopped():
background_data_update = True
return
if self.stopped():
background_data_update = True
return
data_df = None
if self.stopped():
background_data_update = True
return
for row_index, row in rooms.iterrows():
if not self.stopped():
df = get_air_value_df(HOSTNAME, PORT, row['Label'], self)
if df is None:
self.stop()
continue
data_df = pd.concat([data_df, df], ignore_index=True) if data_df is not None else df
time.sleep(1)
else:
background_data_update = True
return
add_to_cache(data_df)
if self.stopped():
background_data_update = True
return
background_data_update = True
class BACnetThread(object):
"""
The run() method will be started and it will run in the background
until the application exits.
"""
def __init__(self, interval=10):
""" Constructor
:type interval: int
:param interval: Check interval, in seconds
"""
self.interval = interval
self.used_combos = None
self.updated_values = None
thread = threading.Thread(target=self.run, args=())
thread.daemon = True # Daemonize thread
def run(self):
""" Method that runs forever """
while True:
# Updates the already-requested rooms
if air_values is not None:
# Find which floor-wing combinations have been used so far
self.used_combos = air_values.groupby(
['Wing', 'Floor']).size().reset_index()
for row_index, row in self.used_combos.iterrows():
if row['Floor'] == '' and row['Wing'] == '':
self.used_combos.drop(row_index)
continue
df = get_air_values_df(HOSTNAME, PORT, row['Floor'], row['Wing'], True)
self.updated_values = pd.concat([self.updated_values, df],
ignore_index=True) if self.updated_values is not None else df
time.sleep(1)
update_loaded_data(self.updated_values)
self.updated_values = None
time.sleep(self.interval)
class BACnetThread(object):
"""
The run() method will be started and it will run in the background
until the application exits.
"""
def __init__(self, interval=10):
""" Constructor
:type interval: int
:param interval: Check interval, in seconds
"""
self.interval = interval
self.used_combos = None
self.updated_values = None
thread = threading.Thread(target=self.run, args=())
thread.daemon = True # Daemonize thread
thread.start()
def run(self):
""" Method that runs forever """
while True:
# Updates the already-requested rooms
if air_values is not None:
# Find which floor-wing combinations have been used so far
self.used_combos = air_values.groupby(
['Wing', 'Floor']).size().reset_index()
for row_index, row in self.used_combos.iterrows():
if row['Floor'] == '' and row['Wing'] == '':
self.used_combos.drop(row_index)
continue
df = get_air_values_df(HOSTNAME, PORT, row['Floor'], row['Wing'], True)
self.updated_values = pd.concat([self.updated_values, df],
ignore_index=True) if self.updated_values is not None else df
update_loaded_data(self.updated_values)
self.updated_values = None
time.sleep(self.interval)
thread = BACnetThread()
def update_labels(avg_measure, max_measure, max_measure_room, unit, data_timestamp):
row_labels[0].config(text="Data last updated at: {0} EST".format(data_timestamp))
row_labels[1].config(text=(str(round(avg_measure, 2)) + ' ' + str(unit)))
row_labels[2].config(text=(str(round(max_measure, 2)) + ' ' + str(unit)))
row_labels[3].config(text=str(max_measure_room))
def fill_fields(selected_floor, selected_wing, selected_measurement):
enough_info = False
measurement_column = 'CO2 Level' if selected_measurement == 0 else 'Temperature'
unit_column = 'CO2 Units' if selected_measurement == 0 else 'Temperature Units'
selected_df = None
global request_thread
# Check if the session cache has data, request the data otherwise
if air_values is not None:
matching_floor = air_values['Floor'] == selected_floor
matching_wing = air_values['Wing'] == selected_wing
filtered_rooms = air_values[matching_floor & matching_wing]
if len(filtered_rooms) >= 1 and len(filtered_rooms[measurement_column]) != 0:
# The session cache has non-empty data for the wing
enough_info = True
selected_df = air_values
else:
if request_thread is None:
request_thread = RequestThread(selected_floor, selected_wing)
request_thread.start()
else:
request_thread.stop()
request_thread.join()
request_thread = RequestThread(selected_floor, selected_wing)
request_thread.start()
else:
if request_thread is None:
request_thread = RequestThread(selected_floor, selected_wing)
request_thread.start()
else:
request_thread.stop()
request_thread.join()
request_thread = RequestThread(selected_floor, selected_wing)
request_thread.start()
# Check if the output file from the last session has data
if not enough_info:
if os.path.isfile(SAVED_DATA_PATH):
df = pd.read_csv(SAVED_DATA_PATH, index_col=0)
if len(df[(df['Floor'] == selected_floor) & (df['Wing'] == selected_wing)]) >= 1 and len(
df[measurement_column]) != 0:
# The session cache has non-empty data for the wing
enough_info = True
selected_df = df
# Fallback to an emergency file
if not enough_info:
if os.path.isfile(DEFAULT_DATA_PATH):
df = pd.read_csv(DEFAULT_DATA_PATH, index_col=0)
if len(df[(df['Floor'] == selected_floor) & (df['Wing'] == selected_wing)]) >= 1 and len(
df[measurement_column]) != 0:
enough_info = True
selected_df = df
if enough_info:
matching_floor = selected_df['Floor'] == selected_floor
matching_wing = selected_df['Wing'] == selected_wing
filtered_rooms = selected_df[matching_floor & matching_wing]
if not filtered_rooms.empty:
filtered_rooms[measurement_column] = pd.to_numeric(filtered_rooms[measurement_column], errors='coerce')
filtered_rooms[unit_column] = filtered_rooms[unit_column].astype(str)
for row_index, row in filtered_rooms[measurement_column].iteritems():
if filtered_rooms[unit_column].get(row_index) == '' or filtered_rooms[unit_column].get(row_index) == 'nan':
continue
unit = filtered_rooms[unit_column].get(row_index)
break
avg_measure = filtered_rooms[measurement_column].mean()
max_measure = 0
max_measure_room = 'None'
for row_index, row in filtered_rooms.iterrows():
if row[measurement_column] > max_measure:
max_measure = row[measurement_column]
max_measure_room = row['Room']
data_timestamp = filtered_rooms['Date / Time'].get(filtered_rooms['Date / Time'].first_valid_index())
update_labels(avg_measure, max_measure, max_measure_room, unit, data_timestamp)
root = Tk()
root.title("AHS Air Data")
root.configure(background='white')
root.resizable(False, False)
wing = StringVar(root, value='A') # The selected wing
floor = IntVar(root, value=1) # The selected floor
measurement = IntVar(root, value=1) # The selected measurement
def set_wing():
fill_fields(floor.get(), str(wing.get()), measurement.get())
def set_floor():
for radio_index in range(1, len(wing_radios)):
if floor.get() == 1:
wing_radios[radio_index].grid_remove()
else:
wing_radios[radio_index].grid()
fill_fields(floor.get(), str(wing.get()), measurement.get())
def set_measurement():
fill_fields(floor.get(), str(wing.get()), measurement.get())
# Setup table layout
COLUMN_TITLES = ['Floor', 'Wing', 'Measurement', 'Average', 'Maximum', 'Room Number With Maximum']
col_number = 0
row_labels = []
wing_radios = []
for col in COLUMN_TITLES:
label = Label(text=col, fg="Blue", bg="White", width="30")
label.grid(row=0, column=col_number, pady=(10, 0), sticky='we', ipady="2")
# Add floor options
if col_number == 0:
FLOOR_NAMES = ['1st', '2nd', '3rd']
current_row = 1
for floor_name in FLOOR_NAMES:
Radiobutton(text=floor_name, fg="Black", bg="White", variable=floor, value=int(floor_name[0]),
command=set_floor).grid(row=current_row, column=col_number, sticky='we')
current_row += 1
# Add wing options
elif col_number == 1:
WING_LETTERS = ['A', 'B', 'C', 'D']
current_row = 1
for index, wing_letter in enumerate(WING_LETTERS):
if index == len(WING_LETTERS) - 1:
btn = Radiobutton(text=wing_letter, fg="Black", bg="White", variable=wing, value=wing_letter,
command=set_wing)
wing_radios.append(btn)
btn.grid(row=current_row, column=col_number, sticky='we', pady=(0, 10))
else:
btn = Radiobutton(text=wing_letter, fg="Black", bg="White", variable=wing, value=wing_letter,
command=set_wing)
wing_radios.append(btn)
btn.grid(row=current_row, column=col_number, sticky='we')
current_row += 1
row_label = Label(bg="White", fg="Blue", relief=FLAT, text="Data last updated at: 1/1/1970 00:00")
row_labels.append(row_label)
row_label.grid(row=current_row, column=0, columnspan=len(COLUMN_TITLES), sticky='we', ipady="2", padx=10,
pady=(0, 20))
# Add measurement options
elif col_number == 2:
MEASUREMENTS = ['CO2', 'Temperature']
current_row = 1
for index, measure in enumerate(MEASUREMENTS):
Radiobutton(text=measure, fg="Black", bg="White", variable=measurement, value=index,
command=set_measurement).grid(row=current_row, column=col_number, sticky='we')
current_row += 1
else:
# Create empty cell for value
row_label = Label(bg="White", fg="Black", relief=RIDGE, width="30")
row_labels.append(row_label)
row_label.grid(row=1, column=col_number, sticky='we', ipady="2", padx=5)
col_number += 1
root.grid_columnconfigure(0, weight=1)
fill_fields(floor.get(), str(wing.get()), measurement.get())
try:
for radio_index in range(1, len(wing_radios)):
wing_radios[radio_index].grid_remove() # Hide radios by default
root.protocol("WM_DELETE_WINDOW", stop)
root.mainloop()
except KeyboardInterrupt:
stop()
Once again, if you would like to see, download, or edit the code, check it out on GitHub here.